2017-09-25 2 views
0

나는 Kafka 소비자 구현의 통합 테스트를하고 있습니다. wurstmeister/kafka 도커 이미지와 Apache Kafka 소비자를 사용합니다. 윙윙 거리는 시나리오는 주제에 "예기치 않은"메시지를 보내는 경우입니다. kafkaConsumer.poll(POLLING_TIMEOUT)은 RUN 모드에서 무한 루프에있는 것처럼 보입니다. 내가 디버깅을 할 때, 일시 중지하고 다시 실행할 때 작동합니다.Apache kafka 소비자 투표가 예기치 않은 메시지에 대해 무한 루프

예상되는 메시지를 보낼 때이 문제가 발생하지 않습니다 (역 직렬화시 예외가 발생하지 않음). 여기

는 카프카에 대한 내 docker-compose 구성입니다 :

kafka: 
    image: wurstmeister/kafka 
    links: 
    - zookeeper 
    ports: 
    - "9092:9092" 
    environment: 
    KAFKA_ADVERTISED_HOST_NAME: localhost 
    KAFKA_ADVERTISED_PORT: 9092 
    KAFKA_CREATE_TOPICS: "ProductLocation:1:1,ProductInformation:1:1,InventoryAvailableToSell:1:1" 
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
    volumes: 
    - /var/run/docker.sock:/var/run/docker.sock 

자바 일반 소비자 :

@Override 
public Collection<T> consume() { 
    String eventToBePublishedName = ERROR_WHILE_RESETTING_OFFSET; 
    boolean success = false; 

    try { 
     kafkaConsumer.resume(kafkaAssignments); 
     if (isPollingTypeFull) { 
      // dummy poll because its needed before resetting offset. 
      // https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition 
      kafkaConsumer.poll(POLLING_TIMEOUT); 
      resetOffset(); 
     } else if (!offsetGotResetFirstTime) { 
      resetOffset(); 
      offsetGotResetFirstTime = true; 
     } 

     eventToBePublishedName = ERROR_WHILE_POLLING; 

     ConsumerRecords<Object, T> records; 

     List<T> output = new ArrayList<>(); 

     do { 
      records = kafkaConsumer.poll(POLLING_TIMEOUT); 
      records.forEach(cr -> { 
       T val = cr.value(); 
       if (val != null) { 
        output.add(cr.value()); 
       } 
      }); 
     } while (records.count() > 0); 

     eventToBePublishedName = CONSUMING; 
     success = true; 
     kafkaConsumer.pause(kafkaAssignments); 
     return output; 
    } finally { 
     applicationEventPublisher.publishEvent(
       new OperationResultApplicationEvent(
         this, OperationType.ConsumingOfMessages, eventToBePublishedName, success)); 
    } 
} 

직렬화 복원 : 내 통합 테스트에서

public T deserialize(String topic, byte[] data) { 
    try { 
     JsonNode jsonNode = mapper.readTree(data); 
     JavaType javaType = mapper.getTypeFactory().constructType(getValueClass()); 
     JsonNode value = jsonNode.get("value"); 
     return mapper.readValue(value.toString(), javaType); 
    } catch (IllegalArgumentException | IOException | SerializationException e) { 
     LOGGER.error("Can't deserialize data [" + Arrays.toString(data) 
       + "] from topic [" + topic + "]", e); 
     return null; 
    } 
} 

, 내가 주제를 작성 각 테스트마다 타임 스탬프가 찍힌 주제 이름을 보냅니다. 이것은 새로운 주제를 만들고 테스트에 상태를 없게 만듭니다. 당신이 직면하는 경우 사용을 시작하기 전에 사용 후 resume 그들, 또는 pause 사용 후 바로 close 소비자를

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", kafkaConfiguration.getServer()); 
    properties.put("group.id", kafkaConfiguration.getGroupId()); 
    properties.put("key.deserializer", kafkaConfiguration.getKeyDeserializer().getName()); 
    properties.put("value.deserializer", kafkaConfiguration.getValueDeserializer().getName()); 

답변

0

:

이 내가 카프카 소비자를 구성하는 방법입니다.

1

"포이즌 필"메시지를 건너 뛰려면 예외를 캐치하고 커밋 된 오프셋을 +1로 증가시킵니다.

+0

어떤 예외가 있습니까? 나는 여론 조사가 무한 루프에 있다고 말했다. 그리고 그 주제에 대한 이전 소비자가 폐쇄되지 않았기 때문입니다. 둘 다 쓰레드 안전하지 않은 Kafka 소비자를 사용합니다. 난 그냥 null을 반환하고 코드 샘플에 표시된 것처럼 소비자 레코드의 결과를 필터링 할 수 있습니다. 오프셋은 자동으로 커밋됩니다. –

+0

이전 소비자 앱을 수정하는 방법에 대해 이야기하고 있습니다. 나는 예상치 못한 메시지를 역 직렬화 할 수 없기 때문에 이전 소비자가 사망했다고 말했지만 이 경우 해당 앱을 수정하여 SerDes 예외 (또는 다른 예외)를 잡아두고 1로 닫고 끝내거나 오프셋 한만큼 오프셋을 계속 진행해야합니다. –

+0

소비자는 사망하지 않으므로 예외가 발생하지 않으며 폴링 중에 매달립니다. 이것은 Apache kafka 소비자 스레드 안전과 관련이 있습니다. 새로운 주제를 만들기 전에 (특정 주제와 그룹 ID에 대해) 특정 주제와 그룹 ID에 대해 카프카 소비자를 닫아야합니다. 그것이 문제였습니다. –

관련 문제