2017-10-10 2 views
0

kafka-python api를 사용하여 여러 개의 메시지를 주제로 보냅니다. 메시지의 일부는 성공적으로 항목을 전송받을 있지만 프로그램이 다음 오류 메시지와 함께 종료하기 전에의 모든가 전송됩니다KeyError : kafka.producer.record_accumulator.RecordBatch

KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290> 
Batch is already closed -- ignoring batch.done() 
Error processing errback 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs 
    f(value) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce 
    self._complete_batch(batch, error, -1, None) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch 
    self._accumulator.deallocate(batch) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate 
    self._incomplete.remove(batch) 
    File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove 
    return self._incomplete.remove(batch) 

는 모든 메시지의 다른 번호가 실제로 내 주제에 수신 실행합니다. 문제는 프로그램이 끝나기 전에 kafka producer.send 호출이 전송을 완료하지 못하는 것처럼 보입니다. 카프카 문서화의 producer.send에 따르면

아마 근본 원인 인 비동기 방법입니다 - 모든 비동기 스레드 완료되지 프로세스가 살해되기 전에 전송 :

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

이에 대한 순진한 솔루션은 여러 가지가있을 수 있습니다 (예 : batch.size을 낮은 수로 설정) 성능 병목 현상을 일으킬 수 있습니다.

성능을 손상시키지 않고이 문제를 해결하려면 어떻게해야합니까? ?

답변

0

종료하기 전에 producer.flush()으로 전화하십시오.

+0

내가 처음 시도한 것이 었습니다. 설명에서 언급했듯이, 제작자는 메시지를 이미 보내기 위해 비동기 호출을 전달한 것으로 보이지만 배치 크기가 충분히 작지 않기 때문에 아직 완료되지 않았습니다. – r2d2oid

+0

이 질문에 대한 답을 제공하지 않습니다. 충분한 [평판] (https://stackoverflow.com/help/whats-reputation)이 있으면 [모든 게시물에 주석 달기] (https://stackoverflow.com/help/privileges/comment) 할 수 있습니다. 대신, [질문자의 설명이 필요없는 답변을 제공하십시오] (https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can- i-do- 대신). - [리뷰에서] (리뷰/저품절 포스트/17956884) – demonplus

+1

@ r2d2 당신은 문제가 아주 간단합니다. 프로듀서가 모든 메시지를 보내기 전에 프로그램이 종료되었습니다. 배치 크기로 재생하여 문제를 해결하려는 것은 잘못된 것입니다. 버퍼에 뭔가가 있으면 보내기가 완료 될 때까지 기다려야합니다. [플러시] (https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)는 everythin이 전송 될 때까지 programm를 차단합니다. 마지막 레코드를 보낸 후에'producer.flush()'를 호출 한 후이 버그가 발생하면 나는 틀렸고 문제 (kafka-python의 버그?)를 이해하지 못합니다. – Loki