2016-07-31 3 views
0

저는 카프카를 처음 사용합니다. 나는 동물원 키퍼의 두 사례와 중개인의 두 인스턴스로 카프카를 테스트하고 있습니다. 테스트 주제 "topicA"를 만들었습니다. 다음은 내 주제에 대한 설명입니다.카프카 소비자가 데이터를 소비하지 않습니다

Topic:topicA PartitionCount:1  ReplicationFactor:1  Configs: 
     Topic: topicA Partition: 0 Leader: 2  Replicas: 2  Isr: 2 

주제에는 Kafka 중개인 -2에 하나의 partitoin이 있으며 동일한 중개인에는 하나의 복제본 만 있습니다. Kafka 제작자 (org.apache.kafka.kafka-clients.0.9.0.1)를 사용하여 브로커에 메시지를 보냅니다.

프로듀서 구성 :

props.put("bootstrap.servers", "***:12900"); // this is kafka broker url 
props.put("block.on.buffer.full", "true"); 
props.put("request.required.acks", "1"); 
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("partition.assignment.strategy", "range"); 

나는 제조자로부터 10,000 메시지를 보낸다.

kafkaProducer.send(new ProducerRecord<String, String>(
        topic,"partitionName", 
        String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); 

System.out.println("Sent Message - " + i + " Successfully"); 

그러나 소비자에게 메시지를 전달할 수 없습니다.

while (true) { 
       ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); 
       for (ConsumerRecord<String, String> record : records) { 
        System.out.println(record.offset() + "---->" + record.value()); 
       } 
      } 

내 consmer 소품 : 브로커 1

bootstrap.servers = *:12900 // this is my kafka broker 
group.id = test 
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
enable.auto.commit=true 
# fast session timeout makes it more fun to play with failover 
session.timeout.ms=10000 

# These buffer sizes seem to be needed to avoid consumer switching to 
# a mode where it processes one bufferful every 5 seconds with multiple 
# timeouts along the way. No idea why this happens. 
fetch.min.bytes=50000 
receive.buffer.bytes=262144 
max.partition.fetch.bytes=2097152 

오류 : BufferUnderflowException을 너무 여러 번 반복한다. 브로커 2

[Controller-1-to-broker-1-send-thread], Controller 1 epoch 6 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:1;ControllerEpoch:6;CorrelationId:10;ClientId:id_1-host_null-port_12900;Leaders:id:1,host:*,port:12900,id:2,host:*,port:12900;PartitionState:(__consumer_offsets,32) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,16) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,49) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,44) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,28) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,17) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,23) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,7) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,4) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,29) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,35) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,3) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,24) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,41) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,0) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,38) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,13) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,8) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,5) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,39) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,36) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,40) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,45) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,15) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,33) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,37) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,21) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,6) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,11) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,20) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,47) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,2) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,27) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,34) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,9) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,22) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,42) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,14) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,25) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,10) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,48) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,31) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,18) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,19) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,12) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,46) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,43) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,1) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,26) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,30) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2) to broker id:1,host:*,port:12900. Reconnecting to broker. 
java.io.IOException: Broken pipe 
kafka-request-handler-0]: [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [__consumer_offsets,32],[__consumer_offsets,16],[__consumer_offsets,44],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_offsets,24],[__consumer_offsets,0],[__consumer_offsets,38],[__consumer_offsets,8],[__consumer_offsets,36],[__consumer_offsets,40],[__consumer_offsets,6],[__consumer_offsets,20],[__consumer_offsets,2],[__consumer_offsets,34],[__consumer_offsets,22],[__consumer_offsets,42],[__consumer_offsets,14],[__consumer_offsets,10],[__consumer_offsets,48],[__consumer_offsets,18],[__consumer_offsets,12],[__consumer_offsets,46],[__consumer_offsets,26],[__consumer_offsets,30] 
2016-07-31 06:48:11,045 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-0 with log end offset 0 
2016-07-31 06:48:11,054 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,0] in log/kafka_1 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:11,058 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,0] on broker 1: No checkpointed highwatermark is found for partition [__consumer_offsets,0] 
2016-07-31 06:48:11,069 [INFO ][kafka-scheduler-4]: Loading offsets from [__consumer_offsets,0] 
2016-07-31 06:48:11,072 [INFO ][kafka-scheduler-4]: Finished loading offsets from [__consumer_offsets,0] in 3 milliseconds. 
2016-07-31 06:59:31,945 [ERROR][kafka-network-thread-12900-2]: Closing socket for /host because of error 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66) 
    at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85) 
    at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29) 
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50) 
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50) 
    at kafka.network.RequestChannel$Request.(RequestChannel.scala:50) 
    at kafka.network.Processor.read(SocketServer.scala:450) 
    at kafka.network.Processor.run(SocketServer.scala:340) 

로그

2016-07-31 06:48:10,972 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [__consumer_offsets,49],[__consumer_offsets,17],[__consumer_offsets,23],[__consumer_offsets,7],[__consumer_offsets,29],[__consumer_offsets,35],[__consumer_offsets,3],[__consumer_offsets,41],[__consumer_offsets,13],[__consumer_offsets,5],[__consumer_offsets,39],[__consumer_offsets,45],[__consumer_offsets,15],[__consumer_offsets,33],[__consumer_offsets,37],[__consumer_offsets,21],[__consumer_offsets,11],[__consumer_offsets,47],[__consumer_offsets,27],[__consumer_offsets,9],[__consumer_offsets,25],[__consumer_offsets,31],[__consumer_offsets,19],[__consumer_offsets,43],[__consumer_offsets,1] 
2016-07-31 06:48:10,990 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-29 with log end offset 0 
2016-07-31 06:48:10,994 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,29] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:10,996 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,29] on broker 2: No checkpointed highwatermark is found for partition [__consumer_offsets,29] 
2016-07-31 06:48:10,998 [INFO ][kafka-scheduler-5]: Loading offsets from [__consumer_offsets,29] 
2016-07-31 06:48:11,011 [INFO ][kafka-scheduler-5]: Finished loading offsets from [__consumer_offsets,29] in 13 milliseconds. 
2016-07-31 06:48:11,023 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-45 with log end offset 0 
2016-07-31 06:48:11,025 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,45] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:11,913 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([ 

1 (브로커에 오류가 없음)) 내 소비자가 메시지를받을 수없는 이유를 알려주세요? 2) 제 제작자와 소비자 구성이 괜찮습니까? 내 소비자/생산자가 브로커에 직접 연결하는 대신 동물원 키퍼에 연결해야합니까? 3) 컨트롤러에서 에포크는 무엇을 의미합니까? 4) 다음과 같은 경고가 의미하는 것 ...

+0

Btw : 사육사는 잘 작동하려면 홀수 개의 서버가 필요합니다! –

답변

1

에 대해 검사 표식 된 하이 워터 마크가 발견되지 않았습니다. 분명히 클라이언트 버전보다 오래된 Kafka 브로커 버전을 사용하고 있습니다. 카프카 브로커 및 일반 소비자 버전을 다시 확인하십시오.

오류는 소비자의 JoinGroupRequest를 처리 할 수 ​​없다고 말합니다. 이는 브로커가 이해할 수없는 JoinGroupRequest 버전을 사용자가 보내고 있음을 의미합니다.

일반적으로 Kafka 브로커 버전은 이러한 오류를 방지하는 데 사용하는 클라이언트 버전 이상이어야합니다.

솔루션은 Kafka 브로커를 업그레이드하거나 사용하는 클라이언트를 다운 그레이드해야합니다.

귀하의 CONFIGS이 하나를 제외하고 대부분 좋아 보이는이 하나가 된 소비자에 관한 안전하게 제거 할 수 있도록 가장 가능성이 생산자에 의해 무시되기 때문에

props.put("partition.assignment.strategy", "range"); 

그것은, 쓸모가 없다.

에포크는 클러스터 상태의 버전 또는 생성 ID와 유사합니다. 이를 통해 일반 브로커와 컨트롤러의 적절한 상태 동기화가 가능합니다.

관련 문제