2014-09-30 5 views
1

카프카 항목에서 메시지를 가져 오는 시스템이 있는데 일부 외부 리소스를 사용할 수 없어 메시지를 처리 ​​할 수없는 경우 소비자를 종료하고 해당 메시지를 해당 토픽으로 반송 한 다음 기다립니다 소비자를 다시 시작하기에 앞서. 유일한 문제는 종료가 작동하지 않는다는 것입니다. 여기에 내 로그에서 볼 수있는 작업은 다음과 같습니다Kafka ConsumerConnector를 종료하는 방법

2014년 9월 30일 08 : 24 : 10,918 - com.example.kafka.KafkaConsumer [정보] - [응용 프로그램 akka.actor.workflow 문맥-8] 종료 주제에 대한 카프카 소비자의 새로운 문제보고 2014-09-30 08 : 24 : 10,927 - customers.kafka.ProblemReportObserver [info] - [application-akka.actor.workflow-context-8] 소비자 종료 2014-09 -30 08 : 24 : 11,946 - clients.kafka.ProblemReportObserver [warn] - [application-akka.actor.workflow-context-8] 큐에 7410-1412090624000을 다시 보냄 2014-09-30 08 : 24 : 12,021 - clients.kafka.ProblemReportObserver [디버그] - [kafka-akka.actor.kafka-consumer-worker-context-9] 파티션 0의 메시지 : key = 7410-1412090624000, msg = 7410-1412090624000

가 여기에 직장에서 몇 층을, 그러나 중요한 코드는 다음과 같습니다

KafkaConsumer.scala에서 :

(processor ? ProblemReportRequest(problemReportKey)).map { 
    case e: ConnectivityInterruption => 
    val backoff = 10.seconds 
    logger.warn(s"Can't connect to essential services, pausing for $backoff", e) 
    stop() 
    // XXX: Shutdown isn't instantaneous, so returning has to happen after a delay. 
    // Unfortunately, there's still a race condition here, plus there's a chance the 
    // system will be shut down before the message has been returned. 
    system.scheduler.scheduleOnce(100 millis) { returnMessage(message) } 
    system.scheduler.scheduleOnce(backoff) { start() } 
    false 
    case e: Exception => returnMessage(message, e) 
    case _ => true 
}.recover { case e => returnMessage(message, e) } 

그리고 정지 :

protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig) 
def shutdown() = { 
    logger.info(s"Shutting down kafka consumer for topic ${config.topic}") 
    consumer.shutdown() 
} 

메시지를 관찰 루틴에서 방법 :

def stop() = { 
    if (consumerRunning.get()) { 
    consumer.shutdown() 
    consumerRunning.compareAndSet(true, false) 
    logger.info("Consumer shutdown") 
    } else { 
    logger.info("Consumer is already shutdown") 
    } 
    !consumerRunning.get() 
} 

버그입니까? 아니면 잘못 했습니까?

답변

1

consumerdef입니다. 새 카프카 인스턴스를 생성하고 consumer.shutdown()과 같이 호출하면 새 인스턴스를 종료합니다. 대신 consumerval으로 만드십시오.