모든 항목에 대한 파티션을 다시 시작할 때 디버그 메시지를 계속 가져 오는 중입니다. 아래처럼. 이 메시지는 내 서버의 모든 밀리 초를 계속해서 인쇄합니다.Akka.Kafka - 경고 메시지 - 파티션 다시 시작
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8
여기이 은
val zookeeperHost = "localhost"
val zookeeperPort = "9092"
// Kafka queue settings
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(zookeeperHost + ":" + zookeeperPort)
.withGroupId((groupName))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// Streaming the Messages from Kafka queue
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
.map(msg => {
consumed(msg.record.value)
})
.runWith(Sink.ignore)
는 DEBUG 메시지를 중지 올바르게 파티션을 도와주세요 코드를입니다.
KafkaConsumer.resume 메서드를 계속 호출 했습니까? – amethystic
KafkaConsumer.resume 메서드를 호출하지 않습니다. 필자는 메인 클래스에서'Consumer.committableSource'를 한 번 호출합니다. –
Akka-Kafka의 파티션 구성을 수행해야합니까? 미리 감사드립니다 !! –