2017-03-06 3 views
0

나는 kafka에서 avro 메시지를 읽으려면 spring-cloud-stream-schema을 사용합니다. 나는 MessagesChannels에 입력 채널을 구성 :봄 구름 kafka 및 avro 직렬화 문제

@Input("topicName1") 
SubscribableChannel fromInput1(); 

내가 그와 같은 구성 파일이 :이 오류가 발생했습니다

@Configuration 
@EnableBinding(MessagesChannels.class) 
@EnableSchemaRegistryClient 
public class MessageConfiguration { 
    @Bean 
    public MessageConverter topic1MessageConverter() throws IOException { 
    return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); 
    } 
} 

그리고 내 소비자가

실제로 전송
fromInput1().subscribe(this::onMessage); 

void onMessage(Message message) { 
} 

메시지라고

:

nested exception is java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to [B 

실제로 가공되지 않은 바이트는 정확하게 org.apache.avro.generic.GenericData$Record으로 파싱됩니다. 하지만 봄에는 Message 클래스가 필요합니다. GenericData$RecordMessage으로 전송하는 방법 또는 GenericData$Record을 avro-tools 클래스에서 직접 생성하는 방법

자세한 내용 :

2017-03-06 11:23:10.695 ERROR 19690 --- [afka-listener-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 7979, CreateTime = 1488784987569, checksum = 623709057, serialized key size = -1, serialized value size = 36, key = null, value = {"foor": "bar"}) 

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cl[email protected]4bf9d802]; nested exception is java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to [B 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) 

답변

1

나는 당신으로 application/*+avro를 사용하는 수신 메시지 채널의 contentType을 설정할 필요가 있다고 생각 지정된 here

관련 문제