대기열과 파이프의 근본적인 차이점은 무엇입니까 Python's multiprocessing package에 있습니까?파이썬 다중 처리 - 파이프 대 큐
어떤 시나리오에서 하나를 선택해야합니까? Pipe()
을 사용하는 것이 유리합니까? Queue()
을 사용하는 것이 유리합니까?
대기열과 파이프의 근본적인 차이점은 무엇입니까 Python's multiprocessing package에 있습니까?파이썬 다중 처리 - 파이프 대 큐
어떤 시나리오에서 하나를 선택해야합니까? Pipe()
을 사용하는 것이 유리합니까? Queue()
을 사용하는 것이 유리합니까?
Pipe()
에는 두 개의 끝점 만있을 수 있습니다.
Queue()
에는 여러 생산자와 소비자가있을 수 있습니다. 당신이 Queue()
를 사용하여 통신하는 두 개 이상의 지점을해야하는 경우
언제 그들
사용할 수 있습니다.
절대 성능을 필요로하는 경우 은 Queue()
이 Pipe()
위에 구축되어 있기 때문에 훨씬 빠릅니다.
성능 벤치마킹
은의 당신이 두 개의 프로세스를 생성하고 가능한 한 빨리 그들 사이에 메시지를 보낼 가정 해 보자. 이것은 Pipe()
과 Queue()
을 사용하는 유사한 테스트 사이의 드래그 레이스의 타이밍 결과입니다. 이것은 Ubuntu 11.10 및 Python 2.7.2를 실행하는 ThinkpadT61에 있습니다.
참고로, 나는 JoinableQueue()
의 보너스로 결과를 던졌습니다. JoinableQueue()
은 queue.task_done()
이 호출 될 때 작업을 설명합니다 (특정 작업에 대해 알지도 못하고 큐의 미완료 작업 만 계산 함). 따라서 queue.join()
은 작업이 완료되었음을 나타냅니다. 요약 Pipe()
에서
이 답변의 하단에 각각에 대한 코드 ...
[email protected]:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
[email protected]:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
[email protected]:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
[email protected]:~$
는 Queue()
에 비해 약 3 배 빠르다. 실제로 혜택을 얻지 않는 한 JoinableQueue()
에 대해 생각조차하지 마십시오.
보너스 자료 2
은 멀티하면 일부 바로 가기를 모르면 하드 디버깅 만든다 정보 흐름의 미묘한 변화를 소개합니다. 예를 들어 많은 조건에서 사전을 통해 색인을 생성 할 때 잘 작동하는 스크립트가있을 수 있지만 특정 입력에 대해서는 가끔 실패합니다.
일반적으로 전체 파이썬 프로세스가 충돌 할 때 우리는 실패에 대한 단서를 얻습니다. 그러나 다중 처리 기능이 작동하지 않으면 콘솔에 원치 않는 크래시 추적이 인쇄되지 않습니다. 알 수없는 다중 처리 충돌을 추적하는 것은 프로세스를 손상시킨 것에 대한 단서가 없으면 어렵습니다.
내가 멀티 프로세싱 충돌 informaiton를 추적 발견 한 가장 간단한 방법은 try
/except
에서 전체 멀티 기능을 감아 사용하는 것입니다 traceback.print_exc()
: 당신이 충돌을 찾을 때
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
지금, 당신은 같은 것을 볼
:FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(task_q, result_q)
File "foo.py", line 46, in run
raise ValueError
ValueError
소스 코드 :
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader(pipe):
output_p, input_p = pipe
input_p.close() # We are only reading
while True:
try:
msg = output_p.recv() # Read from the output pipe and do nothing
except EOFError:
break
def writer(count, input_p):
for ii in xrange(0, count):
input_p.send(ii) # Write 'count' numbers into the input pipe
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
output_p, input_p = Pipe()
reader_p = Process(target=reader, args=((output_p, input_p),))
reader_p.start() # Launch the reader process
output_p.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, input_p) # Send a lot of stuff to reader()
input_p.close() # Ask the reader to stop when it reads EOF
reader_p.join()
print "Sending %s numbers to Pipe() took %s seconds" % (count,
(time.time() - _start))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print "Sending %s numbers to Queue() took %s seconds" % (count,
(time.time() - _start))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader(queue):
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = JoinableQueue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
queue.join() # Wait for the reader to finish
print "Sending %s numbers to JoinableQueue() took %s seconds" % (count,
(time.time() - _start))
@ 조나단 "요약 파이프에서()는 약 3 배 빠른보다 대기열()" –
그러나 파이프() 안전하게 다수의 생산자와 함께 사용할 수 없습니다/소비자. –
우수! 좋은 대답과 좋은 벤치 마크를 제공했습니다! 나는 단지 두 개의 작은 변명만을 가지고있다 : (1) "엄청나게 빠른 명령"은 과장된 표현이다. 그 차이는 x3이며, 이는 1 차수의 1/3 정도입니다. 그냥 말해. ;-); 그리고 (2) N 개의 작업자를 실행하는 것이보다 공정한 비교 일 것이며, 각 작업자는 점 대 점 파이프를 통해 주 스레드와 통신하며, 실행중인 N 작업자 모두가 단일 지점 대 다중 점 대기열에서 당겨집니다. – JJC