KafkaConsumer
을 사용하면 별도의 스레드에서 메시지를 사용할 수 있습니다. 그러나파이썬 카프카 다중 프로세스 대 스레드
, 내가 multiprocessing.Process
대신 threading.Thread
를 사용하여 오류가 발생합니다 :
이 questionOSError: [Errno 9] Bad file descriptor
및 documentation 병렬이 가능에 멀티 프로세싱을 사용하여 메시지를 사용하는 것이 좋습니다. 누군가가 실제 사례를 공유해 주시겠습니까?
편집
여기에 몇 가지 예제 코드입니다. 죄송합니다. 원래 코드가 너무 복잡하므로 여기에 샘플을 작성하여 어떤 일이 일어나고 있는지 알려 주시기 바랍니다. multiprocessing.Process
대신 threading.Thread
을 사용하면이 코드가 올바르게 작동합니다.
from multiprocessing import Process
class KafkaWrapper():
def __init__(self):
self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')
def consume(self, topic):
self.consumer.subscribe(topic)
for message in self.consumer:
print(message.value)
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic):
self.kafka_wrapper.consume(topic)
class ServiceA(ServiceInterface):
pass
class ServiceB(ServiceInterface):
pass
def main():
serviceA = ServiceA()
serviceB = ServiceB()
jobs=[]
# The code works fine if I used threading.Thread here instead of Process
jobs.append(Process(target=serviceA.start, args=("my-topic",)))
jobs.append(Process(target=serviceB.start, args=("my-topic",)))
for job in jobs:
job.start()
for job in jobs:
job.join()
if __name__ == "__main__":
main()
그리고 여기가 내가 보는 오류이다 (다시 말하지만, 내 실제 코드는 위의 샘플과 다른, 나는 threading.Thread
을 사용하는 경우 그것을 잘 작동하지만 난 multiprocessing.Process
를 사용하지 않은 경우) :
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "service_interface.py", line 58, in start
self._kafka_wrapper.start_consuming(self.service_object_id)
File "kafka_wrapper.py", line 141, in start_consuming
for message in self._consumer:
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
return next(self._iterator)
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
self._client.poll(timeout_ms=poll_ms, sleep=True)
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
responses.extend(self._poll(timeout, sleep=sleep))
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
ready = self._selector.select(timeout)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "service_interface.py", line 58, in start
self._kafka_wrapper.start_consuming(self.service_object_id)
File "kafka_wrapper.py", line 141, in start_consuming
for message in self._consumer:
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
return next(self._iterator)
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
self._client.poll(timeout_ms=poll_ms, sleep=True)
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
responses.extend(self._poll(timeout, sleep=sleep))
OSError: [Errno 9] Bad file descriptor
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
ready = self._selector.select(timeout)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
OSError: [Errno 9] Bad file descriptor
예, 병렬로 메시지를 처리하면 카프카 스트림이 무엇을 의미합니다. 오류의 추적에 추가하여 코드를 게시 할 수 있습니까? – Kyle
질문에 샘플 코드를 추가했습니다. – Deven