1

특정 주제를 듣고 소비 된 메시지를 JSON으로 처리하는 카프카 소비자를 만들려고합니다. 나는 스프링 문서 here에 주어진 접근법을 시도했지만 JSON으로 메시지를 얻을 수 없다.스프링 카프카 역 직렬화

이것은 수신기 구성에 대한 내 코드입니다 :

@Configuration 
@EnableKafka 
public class ReceiverConfig { 

@Value("${kafka.bootstrap.servers}") 
private String bootstrapServers; 

@Bean 
public Map consumerConfigs() { 
    Map props = new HashMap<>(); 
    // list of host:port pairs used for establishing the initial connections 
    // to the Kakfa cluster 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    // consumer groups allow a pool of processes to divide the work of 
    // consuming and processing records 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Waitlist"); 

    return props; 
} 

@Bean 
public ConsumerFactory consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 

    return factory; 
} 

@Bean 
public Receiver receiver() { 
    return new Receiver(); 
} 

@Bean 
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
     new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setMessageConverter(new StringJsonMessageConverter()); 
    return factory; 
} 
} 

소비자 :

public class Receiver { 

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); 

private CountDownLatch latch = new CountDownLatch(1); 

@KafkaListener(topics = "Reservation", 
     containerFactory = "kafkaJsonListenerContainerFactory") 
public void receiveMessage(Message<?> message) { 
    LOGGER.info("received message='{}'", message); 
    latch.countDown(); 
} 

public CountDownLatch getLatch() { 
    return latch; 
} 

} 

나는 다음과 같은 오류 얻을 원격 서버의 주제 게시 할 :

 2017-02-09 13:42:49.122 [1;31mERROR[0;39m [36mo.s.k.listener.LoggingErrorHandler[0;39m Error while processing: ConsumerRecord(topic = Reservation, partition = 0, offset = 3394, CreateTime = 1486626082480, checksum = 1777660938, serialized key size = -1, serialized value size = 2, key = null, value = hi) 
     org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:81) 
      at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:82) 
      at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:157) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:68) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:975) 
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
      at java.lang.Thread.run(Thread.java:745) 
     Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
      at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) 
      at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
      at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
      at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2880) 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:78) 
      ... 11 common frames omitted 

그러나 리스너에서 containerfactory을 제거하면 메시지를받을 수는 있지만 J에 없습니다. SON 형식이지만 문자열로는 :

2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='{' 
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_eventType":"Reservation",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_operation":"create",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "type":"excursion",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "reservationId":"46d353ac_9575_492a_9291_98d15bf4cc82",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventReservationLinkId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "master":true,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partySize":2,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "startTime":"2017-01-27T08:30:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "endTime":"2017-01-27T10:00:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "timeslotId":"c2304a34_b9ba_4f3c_8e45_3e3c7677d6c2",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "variantSku":"ocean_polar_1606_FLL-640B",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "guestId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "createdBy":"149673",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "purchaser":"143679",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventId":"ocean_polar_1606_FLL-640",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "scheduledEventId":"02c95434_3a99_452e_a2a8_51712683926c",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "resourceId":"",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "surpriseFlag":false,' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "venueId":"FLL001",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "status":"CONFIRMED",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "primaryId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partyId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2"' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='}' 

답변

0

귀하 처리 된 메일은 JSON 변환하기 위해 JSON 문서

received message='{' 
received message=' "_eventType":"Reservation",' 
received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
... 

의 각 조각은, 그것은 하나의 메시지에 캡슐화 될 필요가있다.

+0

이는 '제작자'쪽의 책임임을 의미합니다. 고마워요 @ 게리! – Kuber