2017-02-23 1 views
0

this reference guide 다음 Kafka 클라이언트로 Spring 부트 응용 프로그램을 시작하려고하는데 아래 오류가 나타납니다.
해결 방법을 알려주십시오.KafkaException : 클래스 JsonDeserializer를 인스턴스화 할 수 없습니다.

@Bean 
public Map<String, Object> consumerConfig() { 
    final HashMap<String, Object> result = new HashMap<>(); 
    result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap.servers")); 
    result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 
    result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
    return result; 
} 

@Bean 
public ConsumerFactory<Long, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfig()); 
} 

@Bean 
ConcurrentKafkaListenerContainerFactory<Long, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Long, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); 
    containerFactory.setConsumerFactory(consumerFactory()); 
    containerFactory.setConcurrency(3); 
    containerFactory.getContainerProperties().setPollTimeout(3000); 
    return containerFactory; 
} 

-

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:545) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE] 
    at com.ubs.wma.bmss.BmssConsumerApp.main(BmssConsumerApp.java:12) [classes/:na] 
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:124) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) ~[spring-kafka-1.1.2.RELEASE.jar:na] 
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    ... 12 common frames omitted 
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.springframework.kafka.support.serializer.JsonDeserializer 
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:316) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:203) ~[kafka-clients-0.10.1.1.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) ~[kafka-clients-0.10.1.1.jar:na] 
    ... 24 common frames omitted 
Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected" 
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121] 
    at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121] 
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na] 
    ... 26 common frames omitted 
+0

컨텍스트, 구성 및 코드없이 스택 추적을 덤프하지 않고 도움을 받으십시오. –

답변

1

그 문서에 따르면 우리가 가진 :

더 복잡하거나 특별한 경우에 대한

KafkaConsumer, 따라서 KafkaProducer(De)Serializer 인스턴스를 받아들이는 오버로드 된 생성자를 제공합니다 키 및/또는 값 각각에 대해.

이 API를 충족하기 위해 DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory(De)Serializer 사용자 지정을 Producer/Consumer에 삽입 할 수있는 속성을 제공합니다.

그리고 더 아파치 카프카의 JavaDocs는 :

/** 
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. 
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. 
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept 
* either the string "42" or the integer 42). 
* @param configs The producer configs 
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be 
*      called in the producer when the serializer is passed in directly. 
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't 
*       be called in the producer when the serializer is passed in directly. 
*/ 
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { 

그럼, 당신이 필요로하는 것은 다음과 같이이다 :

@Bean 
public ConsumerFactory<Long, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), null, new JsonDeserializer(Foo.class)); 
} 

JsonDeserializer는 반사에 의해 인스턴스화 할 수없는 문제는 그것이 필요하기 때문에 deserialize 할 특정 유형.

관련 문제