2011-11-28 4 views
21

카프 카에서 읽고 쓰는 메시지에 Avro를 사용하려고합니다. 누구든지 Avro 바이너리 인코더를 사용하여 메시지 대기열에 넣을 데이터를 인코딩/디코딩하는 예가 있습니까?Avro 바이너리 인코더를 사용하여 카프카 메시지를 인코딩/디코딩하는 방법은 무엇입니까?

카프카 부품보다 Avro 부품이 필요합니다. 아니면 다른 해결책을 찾아야할까요? 기본적으로, 나는 공간에 관해서 JSON에보다 효율적인 솔루션을 찾으려고 노력하고있다. Avro는 JSON보다 더 작기 때문에 언급되었습니다.

답변

11

나는 마침내 Kafka 메일 링리스트에 물어 보니 기억에 남았습니다. 다음과 같은 대답이있었습니다. 완벽하게 작동했습니다.

예, 메시지를 바이트 배열로 보낼 수 있습니다. 당신이 메시지 클래스의 생성자 보면, 당신은 볼 것이다 -

을 데프이 (바이트 : 배열 [바이트]) 이제

, 생산자 전송을보고() API - 데프

V를 Message 유형으로, K를 키로 지정할 수 있습니다. (producerData : ProducerData [K, V] *) 키를 사용하여 파티셔닝을하지 않으려면 메시지 유형으로 설정하십시오.

감사합니다, 네하, 당신은 또한 단순히 데이터를 압축 고려할 수 대신에 브로의

2

; gzip (양호한 압축, 높은 CPU) 또는 LZF 또는 Snappy (훨씬 빠르며 조금 느린 압축).

또는 대안은 (this extension 포함) 잭슨 자바 지원도 Smile binary JSON있다 : JSON과 같은

ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
byte[] serialized = mapper.writeValueAsBytes(pojo); 
// or back 
SomeType pojo = mapper.readValue(serialized, SomeType.class); 

기본적으로 동일한 코드 제외 : 컴팩트 한 바이너리 형식 및 브로보다 사용하기 훨씬 쉽습니다 다른 포맷 공장을 통과하기위한 것. 데이터 크기 관점에서 스마일 (Smile) 또는 아브로 (Avro)가 더 컴팩트한지 여부는 유스 케이스의 세부 사항에 달려 있습니다. 둘 다 JSON보다 더 작습니다.

이 코드는 JSON과 Smile 모두에서 동일한 코드로 POJO 만 사용하면 효과적입니다. Avro와 비교하여 코드 생성이 필요하거나 GenericRecord을 압축하고 풀기위한 많은 수동 코드가 필요합니다.

7

당신이 아 브로 메시지 (카프카 부분은 이미 대답한다)에서 바이트 배열을 얻고 싶은 경우에, 이진 인코더 사용

GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try { 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
    } finally { 
     os.close(); 
    } 
+0

이 byteData를 KafkaBroker로 보내고 콘솔 소비자로부터 읽을 수 있습니까? 생산자 키 시리얼 라이저는 무엇이되어야합니까? – user2441441

+0

응답에서 언급했듯이 카프카 파트는 다른 응답에 문서화되어 있습니다 - http://stackoverflow.com/a/8348264/5266 및 http://stackoverflow.com/a/32341917/5266 –

12

이 기본적인 예입니다. 여러 파티션/주제로 시도하지 않았습니다.

// 샘플 생산 코드

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.*; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.avro.specific.SpecificDatumWriter; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import java.util.Properties; 


public class ProducerTest { 

    void producer(Schema schema) throws IOException { 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "0:9092"); 
     props.put("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, byte[]> producer = new Producer<String, byte[]>(config); 
     GenericRecord payload1 = new GenericData.Record(schema); 
     //Step2 : Put data in that genericrecord object 
     payload1.put("desc", "'testdata'"); 
     //payload1.put("name", "अasa"); 
     payload1.put("name", "dbevent1"); 
     payload1.put("id", 111); 
     System.out.println("Original Message : "+ payload1); 
     //Step3 : Serialize the object to a bytearray 
     DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload1, encoder); 
     encoder.flush(); 
     out.close(); 

     byte[] serializedBytes = out.toByteArray(); 
     System.out.println("Sending message in bytes : " + serializedBytes); 
     //String serializedHex = Hex.encodeHexString(serializedBytes); 
     //System.out.println("Serialized Hex String : " + serializedHex); 
     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes); 
     producer.send(message); 
     producer.close(); 

    } 


    public static void main(String[] args) throws IOException, DecoderException { 
     ProducerTest test = new ProducerTest(); 
     Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
     test.producer(schema); 
    } 
} 

// 샘플 소비자 코드

제 1 부 : 소비자 그룹 코드 : 여러 파티션/주제에 대해 여러 소비자 이상을 가질 수있다.

import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* Created by on 9/1/15. 
*/ 
public class ConsumerGroupExample { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){ 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId)); 
     this.topic = a_topic; 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){ 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 

     return new ConsumerConfig(props); 
    } 

    public void shutdown(){ 
     if (consumer!=null) consumer.shutdown(); 
     if (executor!=null) executor.shutdown(); 
     System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     try{ 
      if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){ 

      } 
     }catch(InterruptedException e){ 
      System.out.println("Interrupted"); 
     } 

    } 


    public void run(int a_numThreads){ 
     //Make a map of topic as key and no. of threads for that topic 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     //Create message streams for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

     //initialize thread pool 
     executor = Executors.newFixedThreadPool(a_numThreads); 
     //start consuming from thread 
     int threadNumber = 0; 
     for (final KafkaStream stream : streams) { 
      executor.submit(new ConsumerTest(stream, threadNumber)); 
      threadNumber++; 
     } 
    } 
    public static void main(String[] args) { 
     String zooKeeper = args[0]; 
     String groupId = args[1]; 
     String topic = args[2]; 
     int threads = Integer.parseInt(args[3]); 

     ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
     example.run(threads); 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException ie) { 

     } 
     example.shutdown(); 
    } 


} 

2 부 : 실제로 메시지를 소비하는 개인 소비자.

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.message.MessageAndMetadata; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.generic.IndexedRecord; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.Decoder; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.commons.codec.binary.Hex; 

import java.io.File; 
import java.io.IOException; 

public class ConsumerTest implements Runnable{ 

    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run(){ 
     ConsumerIterator<byte[], byte[]>it = m_stream.iterator(); 
     while(it.hasNext()) 
     { 
      try { 
       //System.out.println("Encoded Message received : " + message_received); 
       //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray()); 
       //System.out.println("Deserializied Byte array : " + input); 
       byte[] received_message = it.next().message(); 
       System.out.println(received_message); 
       Schema schema = null; 
       schema = new Schema.Parser().parse(new File("src/test_schema.avsc")); 
       DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); 
       GenericRecord payload2 = null; 
       payload2 = reader.read(null, decoder); 
       System.out.println("Message received : " + payload2); 
      }catch (Exception e) { 
       e.printStackTrace(); 
       System.out.println(e); 
      } 
     } 

    } 


} 

테스트 아브 스키마 :주의해야 할

{ 
    "namespace": "xyz.test", 
    "type": "record", 
    "name": "payload", 
    "fields":[ 
     { 
      "name": "name", "type": "string" 
     }, 
     { 
      "name": "id", "type": ["int", "null"] 
     }, 
     { 
      "name": "desc", "type": ["string", "null"] 
     } 
    ] 
} 

중요 사항은 다음과 같습니다

  1. 알면 박스에서이 코드를 실행하기 위해 표준 카프카와 브로 항아리가 필요합니다.

  2. 매우 중요합니다. props.put ("serializer.class", "kafka.serializer.DefaultEncoder"); Don t use stringEncoder as that won 바이트 배열을 메시지로 보내는 경우 작동하지 않습니다.

  3. 바이트 []를 16 진수 문자열로 변환하여 컨슈머로 보내면 16 진수 문자열을 바이트 []로 변환 한 다음 원본 메시지로 변환 할 수 있습니다.

  4. 여기에 언급 된 바와 같이 사육사와 브로커를 실행하고 : "0120-917-0380", "page_views"또는 원하는 항목을 만듭니다.

  5. ProducerTest.java를 실행 한 다음 ConsumerGroupExample.java를 실행하고 생성되고 소비되는 avro 데이터를 확인하십시오.

+0

도움을 주셔서 감사합니다! ! 나는 이것을 시도했지만 소비자 코드에서 it.hasNext() 함수는 false를 반환하므로 컨트롤이 while 루프를 입력하지 않습니다. 내가 뭘 잘못하고 있는지 알 수 있습니까? –

3

업데이트 답변.

카프카는 (포맷 SBT) 메이븐과 브로 시리얼 라이저/디시리얼라이저를 가지고 좌표

"io.confluent" % "kafka-avro-serializer" % "3.0.0" 

당신은 KafkaProducer 생성자에 KafkaAvroSerializer의 인스턴스를 전달합니다.

그런 다음 Avro GenericRecord 인스턴스를 생성하고 KafkaProducer와 함께 보낼 수있는 Kafka ProducerRecord 인스턴스 내부의 값으로 사용할 수 있습니다.

카프카 소비자 측에서 KafkaAvroDeserializer 및 KafkaConsumer를 사용합니다.

+0

짧지 만 완전한 예를 제공 할 수 있습니까? –

+1

이것은 Confluent 자신의 Maven 저장소가 추가 된 경우에만 작동합니다. 왜냐하면 그들은 artifacts를 maven central에 게시하지 않기 때문입니다 : http://packages.confluent.io/maven –

관련 문제