2016-11-11 4 views
2

avro 형식의 Kafka 메시지를 사용하려고하지만 이동 중에 avro에서 json으로 메시지를 디코딩 할 수 없습니다.이동 중에 Kafka Avro 메시지를 사용합니다.

저는 Confluent 플랫폼 (3.0.1)을 사용하고 있습니다. 예를 들어 나는 다음과 같은 메시지를 생성합니다.

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' 
{"f1":"message1"} 
{"f1":"message2"} 

이제는 이동중인 메시지와 함께 메시지를 사용합니다. 일반 텍스트 메시지가 올바르게 작동합니다. Avro 메시지는 해독되어야합니다.

그러나 디코딩 후 나는 값없이 JSON (두 libs와) 얻을 github.com/elodina/go-avro github.com/linkedin/goavro, : 나는 다른 libs가 발견

{"f1":""} 

goavro :

avroSchema := ` 
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]} 
` 
codec, err := goavro.NewCodec(avroSchema) 
if err != nil { 
    log.Fatal(err) 
} 
bb := bytes.NewBuffer(msg.Value) 
decoded, err := codec.Decode(bb) 
log.Println(fmt.Sprintf("%s", decoded)) 

이동 - 브로 :

schema := avro.MustParseSchema(avroSchema) 
reader := avro.NewGenericDatumReader() 
reader.SetSchema(schema) 
decoder := avro.NewBinaryDecoder(msg.Value) 
decodedRecord := avro.NewGenericRecord(schema) 
log.Println(decodedRecord.String()) 

MSG = sarama.ConsumerMessage

답변

1

그냥 내가 메시지 바이트 배열의 첫 번째 다섯 개 요소를 제거했다가 (진 브로 메시지를 비교하여) 발견 -

message = msg.Value[5:] 

어쩌면 누군가가 설명 할 수있는 지금은 모든 작품을 : 왜

3

첫 번째 바이트는 데이터 유형을 식별하는 마법입니다. 다음 4 바이트는 스키마 레지스트리를 사용하는 경우에만 매우 유용한 avro 스키마 ID입니다.

관련 문제