0

KafkaConsumer을 사용하면 별도의 스레드에서 메시지를 사용할 수 있습니다. 그러나파이썬 카프카 다중 프로세스 대 스레드

, 내가 multiprocessing.Process 대신 threading.Thread를 사용하여 오류가 발생합니다 :

questionOSError: [Errno 9] Bad file descriptor

documentation 병렬이 가능에 멀티 프로세싱을 사용하여 메시지를 사용하는 것이 좋습니다. 누군가가 실제 사례를 공유해 주시겠습니까?

편집

여기에 몇 가지 예제 코드입니다. 죄송합니다. 원래 코드가 너무 복잡하므로 여기에 샘플을 작성하여 어떤 일이 일어나고 있는지 알려 주시기 바랍니다. multiprocessing.Process 대신 threading.Thread을 사용하면이 코드가 올바르게 작동합니다.

from multiprocessing import Process 

class KafkaWrapper(): 
    def __init__(self): 
     self.consumer = KafkaConsumer(bootstrap_servers='my.server.com') 

    def consume(self, topic): 
     self.consumer.subscribe(topic) 
     for message in self.consumer: 
      print(message.value) 

class ServiceInterface(): 
    def __init__(self): 
     self.kafka_wrapper = KafkaWrapper() 

    def start(self, topic): 
     self.kafka_wrapper.consume(topic) 

class ServiceA(ServiceInterface): 
    pass 

class ServiceB(ServiceInterface): 
    pass 


def main(): 

    serviceA = ServiceA() 
    serviceB = ServiceB() 

    jobs=[] 
    # The code works fine if I used threading.Thread here instead of Process 
    jobs.append(Process(target=serviceA.start, args=("my-topic",))) 
    jobs.append(Process(target=serviceB.start, args=("my-topic",))) 

    for job in jobs: 
     job.start() 

    for job in jobs: 
     job.join() 

if __name__ == "__main__": 
    main() 

그리고 여기가 내가 보는 오류이다 (다시 말하지만, 내 실제 코드는 위의 샘플과 다른, 나는 threading.Thread을 사용하는 경우 그것을 잘 작동하지만 난 multiprocessing.Process를 사용하지 않은 경우) :

File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "service_interface.py", line 58, in start 
    self._kafka_wrapper.start_consuming(self.service_object_id) 
    File "kafka_wrapper.py", line 141, in start_consuming 
    for message in self._consumer: 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__ 
    return next(self._iterator) 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator 
    self._client.poll(timeout_ms=poll_ms, sleep=True) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll 
    responses.extend(self._poll(timeout, sleep=sleep)) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll 
    ready = self._selector.select(timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select 
    kev_list = self._kqueue.control(None, max_ev, timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "service_interface.py", line 58, in start 
    self._kafka_wrapper.start_consuming(self.service_object_id) 
    File "kafka_wrapper.py", line 141, in start_consuming 
    for message in self._consumer: 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__ 
    return next(self._iterator) 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator 
    self._client.poll(timeout_ms=poll_ms, sleep=True) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll 
    responses.extend(self._poll(timeout, sleep=sleep)) 
OSError: [Errno 9] Bad file descriptor 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll 
    ready = self._selector.select(timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select 
    kev_list = self._kqueue.control(None, max_ev, timeout) 
OSError: [Errno 9] Bad file descriptor 
+0

예, 병렬로 메시지를 처리하면 카프카 스트림이 무엇을 의미합니다. 오류의 추적에 추가하여 코드를 게시 할 수 있습니까? – Kyle

+0

질문에 샘플 코드를 추가했습니다. – Deven

답변

2

카프카 소비자는 다중 프로세스 또는 멀티 스레딩이 될 수 있습니다 (Kafka 초기 버전에 필요한 Kafka 소비자 그룹을 올바르게 사용하는 클라이언트 라이브러리가 있는지 확인하십시오). 선택은 귀하에게 달려 있습니다.

그러나 우리는 프로세스를 사용하려면 카프카 클라이언트 라이브러리는 하나 개 이상의 프로세스에 의해 공유는 안 자체를 보장하는 데 사용되는 기본 TCP 연결 (카프카의 서버에 연결)하는 것이 안전 포크, 뭔가를 할 필요가 . 이것이 연결 오류가 발생한 이유입니다.

임시 해결책으로 생성하기 전에 KafkaConsumer을 생성하면 안됩니다. 대신 작업을 각 프로세스로 이동하십시오.

또 다른 방법은 단일 스레드/프로세스 페치 메시지를 사용하고 여분의 프로세스 풀을 사용하여 실제 작업을 수행하는 것입니다.

+0

고마워요! 나는 당신이 제안한 변화, 즉'카프카 소비자 (KafkaConsumer) '의 창조를 각 과정으로 옮겼으며 지금은 잘 작동하고 있습니다. – Deven

관련 문제