2016-11-02 3 views
1

카프카와 특히 혼란스러운 문제가 있습니다. 특히 메시지의 키를 얻으려고합니다.카프카 메시지 키 - 바이트 []와 문자열 동시에

Map<String, Integer> topicCount = new HashMap<>(); 
    topicCount.put(myConsumer.getTopic(), 1); 

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = myConsumer.getConsumer().createMessageStreams(topicCount); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(myConsumer.getTopic()); 
    System.out.println("Listening to topic " + myConsumer.getTopic()); 
    for (final KafkaStream stream : streams) { 
     ConsumerIterator<String, byte[]> it = stream.iterator(); 
     while (it.hasNext()) { 

      System.out.println("Message received from topic"); 

      MessageAndMetadata<String, byte[]> o = it.next(); 

      Object messageKey = o.key(); 
      System.out.println("messageKey is type: " + messageKey.getClass().getName()); 
      System.out.println("messageKey is type: " + messageKey.getClass().getCanonicalName()); 
      System.out.println("o keyDecoder: " + o.keyDecoder()); 

      System.out.println("Key from message: " + o.key()); //This throws exception - [B cannot be cast to java.lang.String 
      //System.out.println("Key as String: " + new String(o.key(), StandardCharsets.UTF_8)); //uncomment this compile Exception - no suitable constructor found for String(java.lang.String,java.nio.charset.Charset) 

      byte[] bytesIn = o.message();  //getting the bytes is fine 

      System.out.println("MessageAndMetadata: " + o); 

      ///other code cut 
     } 
    } 

예외 :

Listening to topic MyKafkaTopic 
Message received from topic 
messageKey is type: [B 
messageKey is type: byte[] 
o decoder: [email protected] 
[WARNING] 
java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String 
    at com.foo.bar.KafkaCFS.process(KafkaCFS.java:153) 
    at com.foo.bar.KafkaCFS.run(KafkaCFS.java:63) 
    at com.foo.bar.App.main(App.java:90) 
    ... 6 more  

메이븐 :

의 핵심은 문자열과 바이트는 []

다음 코드는 아래의 예외를 생산 모두 생각하는 것 같다

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 
나는 System.out에의 행의 주석을 해제하면

나는 심지어 컴파일 할 수 없습니다 :

컴파일러가 키를 생각하는 것을 어떻게
[ERROR] COMPILATION ERROR : 
[INFO] ------------------------------------------------------------- 
[ERROR] /C:/Dev/main/java/com/foo/bar/KafkaCFS.java:[152,56] no suitable constructor found for String(java.lang.String,java.nio.charset.Charset) 
    constructor java.lang.String.String(byte[],int) is not applicable 
    (argument mismatch; java.lang.String cannot be converted to byte[]) 

(나는 그것이 될 것으로 예상 것입니다)하지만 런타임이 바이트의 문자열입니다 정렬?

Key를 String으로 가져 오려면 어떻게해야합니까?

감사합니다,

KA.

답변

2

일치하지 않았습니다. 스트림을 KafkaStream<byte[], byte[]>으로 선언하면 ConsumerIterator<String, byte[]> it = stream.iterator();이 제네릭과 일치하도록 ConsumerIterator<byte[], byte[]> it = stream.iterator();이되어야합니다. 그런 다음 o.key()을 가져와 new String(o.key());을 통해 문자열을 만들 수 있습니다.

1

KafkaStream 일반 매개 변수 유형은 (byte [], byte [])로 설정하는 것이 좋습니다. 다음과 같이 코드를 변경하십시오.

ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
while (it.hasNext()) { 
    String key = new String(it.next().key()); 
    ... 
} 
관련 문제