0

수동으로 Zookeeper를 시작한 다음 Kafka 서버를 시작하고 마지막으로 Kafka-Rest 서버를 해당 속성 파일로 시작합니다. 다음으로는 톰캣 로그 추적에서 org.apache.kafka.common.KafkaException : kafka 소비자를 생성하지 못했습니다.

바람둥이

내 봄 부팅 응용 프로그램을 배포하고, 나는 오류 org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer을 얻고있다 내 응용 프로그램은

오류 로그

25-Dec-2017 15:00:32.508 SEVERE [localhost-startStop-1] org.apache.catalina.core.ContainerBase.addChildInternal ContainerBase.addChild: start: 
org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[/spring-kafka-webhook-service-0.0.1-SNAPSHOT]] 
     at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:167) 
     at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:752) 
     at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:728) 
     at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:734) 
     at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:986) 
     at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1857) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748) 
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) 
     at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) 
     at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) 
     at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) 
     at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) 
     at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) 
     at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:144) 
     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 
     at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) 
     at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) 
     at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) 
     at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) 
     at org.springframework.boot.web.support.SpringBootServletInitializer.run(SpringBootServletInitializer.java:154) 
     at org.springframework.boot.web.support.SpringBootServletInitializer.createRootApplicationContext(SpringBootServletInitializer.java:134) 
     at org.springframework.boot.web.support.SpringBootServletInitializer.onStartup(SpringBootServletInitializer.java:87) 
     at org.springframework.web.SpringServletContainerInitializer.onStartup(SpringServletContainerInitializer.java:169) 
     at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5196) 
     at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) 
     ... 10 more 
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) 
     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) 
     at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305) 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230) 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) 
     at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) 
     at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:126) 
     at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) 
     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) 
     at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) 
     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) 
     ... 27 more 
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/ClusterResourceListener 
     at java.lang.ClassLoader.defineClass1(Native Method) 
     at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
     at org.apache.catalina.loader.WebappClassLoaderBase.findClassInternal(WebappClassLoaderBase.java:2283) 
     at org.apache.catalina.loader.WebappClassLoaderBase.findClass(WebappClassLoaderBase.java:811) 
     at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1260) 
     at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119) 
     at java.lang.Class.forName0(Native Method) 
     at java.lang.Class.forName(Class.java:348) 
     at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332) 
     at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643) 
     ... 39 more 
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.ClusterResourceListener 
     at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1291) 
     at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119) 
     ... 51 more 
을 시작하지 실패

수신기 클래스

수신기 설정 클래스

@Configuration 
@EnableKafka 
public class InventoryReceiverConfig { 

    @Autowired 
    private KafkaConfig kafkaConfig; 

    @Bean 
    public static ConsumerFactory<String, InventoryEvent> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 
       new JsonDeserializer<>(InventoryEvent.class)); 
    } 

    @Bean 
    public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); 
     containerFactory.setConsumerFactory(consumerFactory()); 
     containerFactory.setConcurrency(3); 
     containerFactory.getContainerProperties().setPollTimeout(3000); 
     return containerFactory; 
    } 

    @Bean 
    public static Map<String, Object> consumerConfigs() { 
     Map<String, Object> consumerProps = new HashMap<>(); 
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers"); 
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); 
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); 
     consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); 
     return consumerProps; 
    } 

    @Bean 
    public InventoryEventReceiver receiver() { 
     return new InventoryEventReceiver(); 
    } 

} 

다음과 같이 내 클러스터 속성은 server.properties, consumer.properties 및 kafka-rest.properties에 대한 파일 :

server.properties를

# The id of the broker. This must be set to a unique integer for each broker. 
broker.id=0 

# Switch to enable topic deletion or not, default value is false 
delete.topic.enable=true 

############################# Socket Server Settings ############################# 

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured. 
# FORMAT: 
#  listeners = listener_name://host_name:port 
# EXAMPLE: 
#  listeners = PLAINTEXT://your.host.name:9092 
listeners=PLAINTEXT://:9092 

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured. Otherwise, it will use the value 
# returned from java.net.InetAddress.getCanonicalHostName(). 
advertised.listeners=PLAINTEXT://localhost:9092 
# The number of threads that the server uses for receiving requests from the network and sending responses to the network 
num.network.threads=3 

# The number of threads that the server uses for processing requests, which may include disk I/O 
num.io.threads=8 

# The send buffer (SO_SNDBUF) used by the socket server 
socket.send.buffer.bytes=102400 

# The receive buffer (SO_RCVBUF) used by the socket server 
socket.receive.buffer.bytes=102400 

# The maximum size of a request that the socket server will accept (protection against OOM) 
socket.request.max.bytes=104857600 
# A comma seperated list of directories under which to store log files 
log.dirs=/tmp/kafka-logs 

# The default number of log partitions per topic. More partitions allow greater 
# parallelism for consumption, but this will also result in more files across 
# the brokers. 
num.partitions=1 

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. 
# This value is recommended to be increased for installations with data dirs located in RAID array. 
num.recovery.threads.per.data.dir=1 

############################# Internal Topic Settings ############################# 
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" 
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
# The minimum age of a log file to be eligible for deletion due to age 
log.retention.hours=168 
# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 
# Zookeeper connection string (see zookeeper docs for details). 
# This is a comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 
# You can also append an optional chroot string to the urls to specify the 
# root directory for all kafka znodes. 
zookeeper.connect=localhost:2181 

# Timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=6000 
##################### Confluent Proactive Support ###################### 
# If set to true, and confluent-support-metrics package is installed 
# then the feature to collect and report support metrics 
# ("Metrics") is enabled. If set to false, the feature is disabled. 
# 
confluent.support.metrics.enable=true 
############################# Group Coordinator Settings ############################# 

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. 
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. 
# The default value for this is 3 seconds. 
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. 
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. 
group.initial.rebalance.delay.ms=0 


# The customer ID under which support metrics will be collected and 
# reported. 
# 
# When the customer ID is set to "anonymous" (the default), then only a 
# reduced set of metrics is being collected and reported. 
# 
# Confluent customers 
# ------------------- 
# If you are a Confluent customer, then you should replace the default 
# value with your actual Confluent customer ID. Doing so will ensure 
# that additional support metrics will be collected and reported. 
# 
confluent.support.customer.id=anonymous 

consumer.properties

# Zookeeper connection string 
# comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" 
zookeeper.connect=127.0.0.1:2181 

# timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=6000 

#consumer group id 
group.id=test-consumer-group,inventory_consumers 

#consumer timeout 
#consumer.timeout.ms=5000 

kafka-rest.properties

id=kafka-rest-test-server 
schema.registry.url=http://localhost:8081 
zookeeper.connect=localhost:2181 
# 
# Configure interceptor classes for sending consumer and producer metrics to Confluent Control Center 
# Make sure that monitoring-interceptors-<version>.jar is on the Java class path 
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor 
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor 

의 pom.xml

,456,731,

내 수신자 및 발신자 클래스에는 @Component 또는 @Service과 같은 특수 효과가 주석으로 표시되지 않습니다. 어떤 차이가 있습니까?

@Configuration 
public class InventorySenderConfig 

@Configuration 
@EnableKafka 
public class InventoryReceiverConfig 

@Component 
public class KafkaConfig 

@Configuration 
public class ProducingChannelConfig 

@Configuration 
public class ConsumingChannelConfig 

@RestController 
public class KafkaWebhookController 

@Service("webhookService") 
public class KafkaServiceImpl 

@EnableIntegration 
@SpringBootApplication 
@ComponentScan("com.psl.kafka") 
public class SpringKafkaWebhookServiceApplication extends SpringBootServletInitializer 

이들은 내 클래스 주석입니다. 그들은 괜찮을 것 같습니까? 아니면 뭔가를 바꿀 필요가 있습니까? 0.10.1.1

2017-12-26 13:11:44.490 INFO 13444 --- [   main] o.a.kafka.common.utils.AppInfoParser  : Kafka version : 0.10.1.1 
2017-12-26 13:11:44.490 INFO 13444 --- [   main] o.a.kafka.common.utils.AppInfoParser  : Kafka commitId : f10ef2720b03b247 
2017-12-26 13:12:44.499 ERROR 13444 --- [   main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='inventory-events' and payload='Hello Spring Integration Kafka 0!' to topic inventory: 

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 

2017-12-26 13:12:44.501 WARN 13444 --- [   main] o.a.k.c.p.i.ProducerInterceptors   : Error executing interceptor onAcknowledgement callback 

java.lang.IllegalStateException: clusterResource is not defined 
    at io.confluent.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:174) ~[monitoring-interceptors-3.1.1.jar:na] 
    at io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor.onAcknowledgement(MonitoringProducerInterceptor.java:59) ~[monitoring-interceptors-3.1.1.jar:na] 
    at org.apache.kafka.clients.producer.internals.ProducerInterceptors.onSendError(ProducerInterceptors.java:116) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:489) [kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:436) [kafka-clients-0.10.1.1.jar:na] 

에 카프카 버전 업데이트 후

새로운 빌드 오류 내가 ProducingChannelConfig, ConsumingChannelConfig의 설정뿐만 아니라 InventoryReceiverConfig 클래스로 추가 한 인터셉터 클래스를 정의해야합니까?

+0

카프카 병의 버전은 무엇입니까 – user7294900

+0

pom.xml이 추가되었습니다 – somnathchakrabarti

답변

1

귀하의 카프카 버전이 이전 버전보다 오래되었습니다. 문제가되는 클래스 ClusterResourceListener은 버전이 0.10.1.0 이상이고 버전이 0.10.0.1보다 낮을 것으로 예상합니다.

카프카 항아리를 업그레이드하면 문제가 해결됩니다.

각 메타 데이터 응답 후에 onUpdate (ClusterResource)가 한 번 호출됩니다. Kafka 브로커 버전이 0.10.1.0 이하인 경우 클러스터 ID가 null 일 수 있습니다.

+0

나는 spring-kafka와 spring-kafka-test를' 1.1.7.RELEASE'를 "@Gary Russell"에 대한 주석에서 언급했듯이 "참조 된 라이브러리"에 자동으로 포함 된 종속성은 0.10.0.1입니다. 버전 불일치로 인해'pom.xml '에서 오류가 발생하여 강제로 변경할 수 없습니다. – somnathchakrabarti

+0

종속성으로 가져온 기본 카프카 병은'kafka_2.11-0.10.0.1.jar' 및'kafka_2.11-0.10 .0.1-test.jar' – somnathchakrabarti

+0

''kafka-clients-0.10.0.1'을 내'pom.xml '에 주석 처리 한 후에도 classpath에 들어있는 기본 의존성은'kafka-clients-0.10.0.1'입니다. jar'와'kafka-clients-0.10.0.1-test.jar' – somnathchakrabarti

3

발생 원인 : java.lang.ClassNotFoundException : org.apache.kafka.common.ClusterResourceListener

클래스 경로에서 kafka-clients 병이 없습니다. 의존성 관리를 위해 당신은 무엇을 사용하고 있습니까? 메이븐 (Maven)과 그레블 (gradle)은 자동으로이 항아리를 클래스 경로에 놓아야합니다.

+0

maven 사용. pom.xml이 추가되었습니다. – somnathchakrabarti

+0

클래스 경로에 충돌하는 버전이 있습니다. 그 클래스'ClusterResourceListener'가 0.10.2.x 클라이언트에 추가되었습니다. 아마도 당신은 바람둥이 디렉토리 중 하나에 최신 버전이 있습니까? –

+0

그런 (많은 의견) 그들을 삭제하고 대신 질문을 편집하지 마십시오. 그것은 당신의 설정과 아무 관련이 없습니다; 클래스 로더 문제입니다. '-verbose' JVM arg를 사용하여 어떤 jars 클래스가로드되고 있는지 확인하십시오. 또한, 왜 그런 오래된 버전을 사용하고 있습니까? spring-kafka의 1.1.x 줄의 현재 버전은 1.1.7입니다. –

관련 문제