카프카 항목에서 메시지를 가져 오는 시스템이 있는데 일부 외부 리소스를 사용할 수 없어 메시지를 처리 할 수없는 경우 소비자를 종료하고 해당 메시지를 해당 토픽으로 반송 한 다음 기다립니다 소비자를 다시 시작하기에 앞서. 유일한 문제는 종료가 작동하지 않는다는 것입니다. 여기에 내 로그에서 볼 수있는 작업은 다음과 같습니다Kafka ConsumerConnector를 종료하는 방법
2014년 9월 30일 08 : 24 : 10,918 - com.example.kafka.KafkaConsumer [정보] - [응용 프로그램 akka.actor.workflow 문맥-8] 종료 주제에 대한 카프카 소비자의 새로운 문제보고 2014-09-30 08 : 24 : 10,927 - customers.kafka.ProblemReportObserver [info] - [application-akka.actor.workflow-context-8] 소비자 종료 2014-09 -30 08 : 24 : 11,946 - clients.kafka.ProblemReportObserver [warn] - [application-akka.actor.workflow-context-8] 큐에 7410-1412090624000을 다시 보냄 2014-09-30 08 : 24 : 12,021 - clients.kafka.ProblemReportObserver [디버그] - [kafka-akka.actor.kafka-consumer-worker-context-9] 파티션 0의 메시지 : key = 7410-1412090624000, msg = 7410-1412090624000
가 여기에 직장에서 몇 층을, 그러나 중요한 코드는 다음과 같습니다
KafkaConsumer.scala
에서 :
(processor ? ProblemReportRequest(problemReportKey)).map {
case e: ConnectivityInterruption =>
val backoff = 10.seconds
logger.warn(s"Can't connect to essential services, pausing for $backoff", e)
stop()
// XXX: Shutdown isn't instantaneous, so returning has to happen after a delay.
// Unfortunately, there's still a race condition here, plus there's a chance the
// system will be shut down before the message has been returned.
system.scheduler.scheduleOnce(100 millis) { returnMessage(message) }
system.scheduler.scheduleOnce(backoff) { start() }
false
case e: Exception => returnMessage(message, e)
case _ => true
}.recover { case e => returnMessage(message, e) }
그리고 정지 :
protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig)
def shutdown() = {
logger.info(s"Shutting down kafka consumer for topic ${config.topic}")
consumer.shutdown()
}
메시지를 관찰 루틴에서 방법 :
def stop() = {
if (consumerRunning.get()) {
consumer.shutdown()
consumerRunning.compareAndSet(true, false)
logger.info("Consumer shutdown")
} else {
logger.info("Consumer is already shutdown")
}
!consumerRunning.get()
}
버그입니까? 아니면 잘못 했습니까?