2016-12-07 2 views
4

모든 항목에 대한 파티션을 다시 시작할 때 디버그 메시지를 계속 가져 오는 중입니다. 아래처럼. 이 메시지는 내 서버의 모든 밀리 초를 계속해서 인쇄합니다.Akka.Kafka - 경고 메시지 - 파티션 다시 시작

08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8 

여기이 은

val zookeeperHost = "localhost" 
val zookeeperPort = "9092" 
// Kafka queue settings 
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(zookeeperHost + ":" + zookeeperPort) 
     .withGroupId((groupName)) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") 

// Streaming the Messages from Kafka queue 
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName)) 
    .map(msg => { 
     consumed(msg.record.value) 
    }) 
    .runWith(Sink.ignore) 

는 DEBUG 메시지를 중지 올바르게 파티션을 도와주세요 코드를입니다.

+0

KafkaConsumer.resume 메서드를 계속 호출 했습니까? – amethystic

+0

KafkaConsumer.resume 메서드를 호출하지 않습니다. 필자는 메인 클래스에서'Consumer.committableSource'를 한 번 호출합니다. –

+0

Akka-Kafka의 파티션 구성을 수행해야합니까? 미리 감사드립니다 !! –

답변

1

reactive-kafka code 인출을 시작하기 전에 모든 파티션을 다시 시작 보인다 : 파티션이 이전에 일시 정지되지 않은 경우

consumer.assignment().asScala.foreach { tp => 
    if (partitionsToFetch.contains(tp)) consumer.resume(java.util.Collections.singleton(tp)) 
    else consumer.pause(java.util.Collections.singleton(tp)) 
} 
def tryPoll{...} 
checkNoResult(tryPoll(0)) 

KafkaConsumer.resume 방법은 no-op입니다.

+0

내 코드에서 파티션을 일시 중지하지 않았습니다. 그러나 계속해서 파티션을 다시 시작합니다. 이것을 막을 수있는 방법이 있습니까? 어떻게해야합니까? 어떤 제안을하시기 바랍니다. 미리 감사드립니다 !! –

+0

@ArunKannan 해결 했습니까? 나는 같은 문제를 가지고있다 – igx