1

을 (매직 V1 레코드 헤더를 지원하지 않습니다) 카프카 1.0.0로 전송하지 벌금.봄 카프카 프로듀서 내가 로컬 카프카를 설정이 고정 표시기-작성 설정을 사용하고

o.a.kafka.common.utils.AppInfoParser  : Kafka version : 1.0.0 
o.a.kafka.common.utils.AppInfoParser  : Kafka commitId : aaa7af6d4a11b29d 

내가이

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test"); 
같은 메시지를 보내려고 : 그것은 카프카의 올바른 버전을 인쇄 봄 응용 프로그램을 시작할 때

는 지금은 spring-kafka:2.1.0.RELEASE

를 통해 카프카에 연결을 시도

클라이언트 쪽에서 보내기가 실패합니다.

서버 콘솔에서

(3210) 나는 메시지 매직 (V1)가 기록 헤더

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis) 
java.lang.IllegalArgumentException: Magic v1 does not support record headers 

인터넷 검색이 버전 충돌을 암시를 지원하지 않습니다 얻을 수 있지만, 버전 (org.apache.kafka:kafka-clients:1.0.0이 클래스 경로에) 맞는 것 같다.

실마리가 있습니까? 감사!

편집 : 문제의 원인을 좁혔습니다. 일반 문자열 보내기가 작동하지만 JsonSerializer를 통해 Json을 보내면 주어진 문제가 발생합니다. 여기 내 생산 설정의 내용은 다음과 같습니다

@Value("\${kafka.bootstrap-servers}") 
lateinit var bootstrapServers: String 

@Bean 
fun producerConfigs(): Map<String, Any> = 
     HashMap<String, Any>().apply { 
      // list of host:port pairs used for establishing the initial connections to the Kakfa cluster 
      put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) 
      put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) 
      put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java) 
     } 

@Bean 
fun producerFactory(): ProducerFactory<String, MyClass> = 
     DefaultKafkaProducerFactory(producerConfigs()) 

@Bean 
fun kafkaTemplate(): KafkaTemplate<String, MyClass> = 
     KafkaTemplate(producerFactory()) 
+1

그건 의미가 없습니다. (서버 측에서 그 메시지를받는 것). 클라이언트 버전이 오래된 경우 모든 헤더가 전송되지 않으므로 모든 사항이 잘 수행되어야합니다 (예 : 0.10 브로커 1.0.0 클라이언트를 사용하여 테스트했으며 헤더를 보내지 않는 한 작동 함). 1.0.0 클라이언트에서는 템플릿이 헤더를 보내지 않으면 "빈"'RecordHeaders'가 (클라이언트에 의해) 보내집니다. –

+1

이미지 이름이 버전을 지정하지 않은 것으로 보입니다. 따라서 이전 캐시 된 도커 이미지와 최신 클라이언트를 사용할 수 있습니다. 0.10 브로커에 헤더를 보내는 1.0 클라이언트가이 오류가 발생합니다. 도커 이미지 버전을 확인하고 도커가 최신 1.0 브로커 이미지를 가져옵니다. –

답변

1

해결. 문제는 중개인, 일부 도커 캐시 또는 Spring 앱이 아닙니다.

문제는 내가 디버깅을 위해 병렬로 사용했던 콘솔 사용자입니다.--bootstrap-server 옵션 A "새로운"소비자가 (특히 JsonSerializer와 카프카 1.0을 사용하는 경우)를 사용한다 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

: 이것은 "오래된"소비자가 kafka-console-consumer.sh --topic=topic --zookeeper=...

시작했을 때 그것은 실제로 경고를 출력 시작했다. 참고 : 여기에서 오래된 소비자를 사용하면 실제로 제작자에게 영향을 줄 수 있습니다.

1

난 그냥 아무 문제없이 그 고정 표시기 이미지에 대한 테스트를 실행 ...

$docker ps 

CONTAINER ID  IMAGE     COMMAND     CREATED    STATUS    PORTS            NAMES 
f093b3f2475c  kafkadocker_kafka  "start-kafka.sh"   33 minutes ago  Up 2 minutes  0.0.0.0:32768->9092/tcp        kafkadocker_kafka_1 
319365849e48  wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 33 minutes ago  Up 2 minutes  22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafkadocker_zookeeper_1 

.

@SpringBootApplication 
public class So47953901Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So47953901Application.class, args); 
    } 

    @Bean 
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) { 
     return args -> template.send("foo", "bar", "baz"); 
    } 

    @KafkaListener(id = "foo", topics = "foo") 
    public void listen(String in) { 
     System.out.println(in); 
    } 

} 

.

spring.kafka.bootstrap-servers=192.168.177.135:32768 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.consumer.enable-auto-commit=false 

.

아직도 나를 위해 작동

2017-12-23 13:27:27.990 INFO 21305 --- [  foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0] 
baz 

편집 ...

spring.kafka.bootstrap-servers=192.168.177.135:32768 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.consumer.enable-auto-commit=false 
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer 
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer 

.

2017-12-23 15:27:59.997 INFO 44079 --- [   main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
    acks = 1 
    ... 
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer 

... 

2017-12-23 15:28:00.071 INFO 44079 --- [  foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0] 
baz 
+0

확인해 주셔서 감사합니다! 나는 그 문제의 근원을 좁혔다. 내가 평범한 현을 보내면 그것은 효과가있다! 하지만 JsonSerializer를 사용하면 문제가 발생합니다. 업데이트를 참조하십시오. – DerM

+1

여전히 나를 위해 일합니다 - 내 편집을 참조하십시오. 'JsonSerializer'는 기본적으로 헤더를 추가하기 때문에 도커 이미지의 브로커가 <0.11 인 것처럼 보입니다. 제작자 속성 인'JsonSerializer.ADD_TYPE_INFO_HEADERS'를'false'로 설정하여 확인할 수 있습니다. 이 속성은 헤더에 유형 정보를 추가하지 못하게합니다. –

+0

감사! 당신 말이 맞아요, 그것은 주어진 속성과 함께 작동합니다. 나는 단지 그것을 얻지 않는다. 부두를 발사 할 때 작성하면 확실히 인쇄됩니다.'kafka_1 | [2017-12-23 20 : 51 : 53,289] 정보 카프카 버전 : 1.0.0'. 또한 다음과 같이 모든 이미지를 삭제하고 다시 만들려고했습니다. '도킹 - 작성 및 도킹 - 작성 도킹 --no-cache && 도킹 - 작성 " – DerM

관련 문제