0

Avro 시리얼 라이저와 스키마 레지스트리를 사용하여 카프카에 객체를 보내려고합니다.
여기에 단순화 된 코드 :Avro 시리얼 라이저와 스키마 레지스트리를 사용하여 카프카에게 메시지를 보내는 방법

Properties props = new Properties(); 
    ... 
    props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); 
    props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081"); 

    Producer<String, User> producer = new KafkaProducer(properties); 

    User user = new User("name", "address", 123); 
    ProducerRecord record = new ProducerRecord<>(topic, key, user); 
    producer.send(record); 

내가 스키마 레지스트리에서와 객체 (사용자) "무대 뒤에서"읽기된다고 가정 직렬화,하지만 난 오류가 아래 얻을.
내가 무엇이 누락 되었습니까?
스키마를 명시 적으로 읽고 GenericRecord를 보내야합니까?

org.apache.kafka.common.errors.SerializationException : 오류 직렬화에 의한 브로 메시지
: java.lang.IllegalArgumentException가 : 지원되지 않는 브로 유형입니다. 지원되는 유형은 io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema (AbstractKafkaAvroSerDe.java:123) ~ [kafka-avro-serializer]에서 null, Boolean, Integer, Long, Float, Double, String, byte [] 및 IndexedRecord
입니다. -3.3.0.jar!/:??
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:73) ~ [kafka-avro-serializer-3.3.0.jar!/:?]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:53) ~ [kafka-avro-serializer-3.3.0.jar!/:?]
at org.apache.kafka.clients. producer.KafkaProducer.send (KafkaProducer.java:424) ~ [kafka-clients-0.9.0.1.jar!/:?]

답변

1

코드가 올바른 것 같습니다. 누락 될 수있는 유일한 것은 AVRO 객체가 AVRO 플러그인으로 제대로 생성되지 않았기 때문에 클래스가 SpecificRecords을 구현해야하며 IndexedRecord을 구현해야한다는 것입니다.

+0

오른쪽. 내가 avsc 파일을 적절하게 만들고 gradle-avro-plugin을 사용하여 자바 파일을 생성하면 모든 것이 작동하는 것 같습니다. 나는 스키마가 Avro에 의해 자동으로 생성되었다고 생각하고, 리플렉션을 사용하여 잘못했다. – msayag

관련 문제