2011-12-11 5 views

답변

180
  • Pipe()에는 두 개의 끝점 만있을 수 있습니다.

  • Queue()에는 여러 생산자와 소비자가있을 수 있습니다. 당신이 Queue()를 사용하여 통신하는 두 개 이상의 지점을해야하는 경우

언제 그들

사용할 수 있습니다.

절대 성능을 필요로하는 경우 은 Queue()Pipe() 위에 구축되어 있기 때문에 훨씬 빠릅니다.

성능 벤치마킹

은의 당신이 두 개의 프로세스를 생성하고 가능한 한 빨리 그들 사이에 메시지를 보낼 가정 해 보자. 이것은 Pipe()Queue()을 사용하는 유사한 테스트 사이의 드래그 레이스의 타이밍 결과입니다. 이것은 Ubuntu 11.10 및 Python 2.7.2를 실행하는 ThinkpadT61에 있습니다.

참고로, 나는 JoinableQueue()의 보너스로 결과를 던졌습니다. JoinableQueue()queue.task_done()이 호출 될 때 작업을 설명합니다 (특정 작업에 대해 알지도 못하고 큐의 미완료 작업 만 계산 함). 따라서 queue.join()은 작업이 완료되었음을 나타냅니다. 요약 Pipe()에서

이 답변의 하단에 각각에 대한 코드 ...

[email protected]:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds 
Sending 100000 numbers to Pipe() took 0.328398942947 seconds 
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds 
[email protected]:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds 
Sending 100000 numbers to Queue() took 0.980564117432 seconds 
Sending 1000000 numbers to Queue() took 10.1611330509 seconds 
[email protected]:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds 
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds 
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds 
[email protected]:~$ 

Queue()에 비해 약 3 배 빠르다. 실제로 혜택을 얻지 않는 한 JoinableQueue()에 대해 생각조차하지 마십시오.

보너스 자료 2

은 멀티하면 일부 바로 가기를 모르면 하드 디버깅 만든다 정보 흐름의 미묘한 변화를 소개합니다. 예를 들어 많은 조건에서 사전을 통해 색인을 생성 할 때 잘 작동하는 스크립트가있을 수 있지만 특정 입력에 대해서는 가끔 실패합니다.

일반적으로 전체 파이썬 프로세스가 충돌 할 때 우리는 실패에 대한 단서를 얻습니다. 그러나 다중 처리 기능이 작동하지 않으면 콘솔에 원치 않는 크래시 추적이 인쇄되지 않습니다. 알 수없는 다중 처리 충돌을 추적하는 것은 프로세스를 손상시킨 것에 대한 단서가 없으면 어렵습니다.

내가 멀티 프로세싱 충돌 informaiton를 추적 발견 한 가장 간단한 방법은 try/except에서 전체 멀티 기능을 감아 사용하는 것입니다 traceback.print_exc() : 당신이 충돌을 찾을 때

import traceback 
def reader(args): 
    try: 
     # Insert stuff to be multiprocessed here 
     return args[0]['that'] 
    except: 
     print "FATAL: reader({0}) exited while multiprocessing".format(args) 
     traceback.print_exc() 

지금, 당신은 같은 것을 볼

:

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing 
Traceback (most recent call last): 
    File "foo.py", line 19, in __init__ 
    self.run(task_q, result_q) 
    File "foo.py", line 46, in run 
    raise ValueError 
ValueError 

소스 코드 :


""" 
multi_pipe.py 
""" 
from multiprocessing import Process, Pipe 
import time 

def reader(pipe): 
    output_p, input_p = pipe 
    input_p.close() # We are only reading 
    while True: 
     try: 
      msg = output_p.recv() # Read from the output pipe and do nothing 
     except EOFError: 
      break 

def writer(count, input_p): 
    for ii in xrange(0, count): 
     input_p.send(ii)    # Write 'count' numbers into the input pipe 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     output_p, input_p = Pipe() 
     reader_p = Process(target=reader, args=((output_p, input_p),)) 
     reader_p.start()  # Launch the reader process 

     output_p.close()  # We no longer need this part of the Pipe() 
     _start = time.time() 
     writer(count, input_p) # Send a lot of stuff to reader() 
     input_p.close()  # Ask the reader to stop when it reads EOF 
     reader_p.join() 
     print "Sending %s numbers to Pipe() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_queue.py 
""" 
from multiprocessing import Process, Queue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     if (msg == 'DONE'): 
      break 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 
    queue.put('DONE') 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = Queue() # reader() reads from queue 
          # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     reader_p.join()   # Wait for the reader to finish 
     print "Sending %s numbers to Queue() took %s seconds" % (count, 
      (time.time() - _start)) 

""" 
multi_joinablequeue.py 
""" 
from multiprocessing import Process, JoinableQueue 
import time 

def reader(queue): 
    while True: 
     msg = queue.get()   # Read from the queue and do nothing 
     queue.task_done() 

def writer(count, queue): 
    for ii in xrange(0, count): 
     queue.put(ii)    # Write 'count' numbers into the queue 

if __name__=='__main__': 
    for count in [10**4, 10**5, 10**6]: 
     queue = JoinableQueue() # reader() reads from queue 
            # writer() writes to queue 
     reader_p = Process(target=reader, args=((queue),)) 
     reader_p.daemon = True 
     reader_p.start()  # Launch the reader process 

     _start = time.time() 
     writer(count, queue) # Send a lot of stuff to reader() 
     queue.join()   # Wait for the reader to finish 
     print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, 
      (time.time() - _start)) 
+2

@ 조나단 "요약 파이프에서()는 약 3 배 빠른보다 대기열()" –

+0

그러나 파이프() 안전하게 다수의 생산자와 함께 사용할 수 없습니다/소비자. –

+11

우수! 좋은 대답과 좋은 벤치 마크를 제공했습니다! 나는 단지 두 개의 작은 변명만을 가지고있다 : (1) "엄청나게 빠른 명령"은 과장된 표현이다. 그 차이는 x3이며, 이는 1 차수의 1/3 정도입니다. 그냥 말해. ;-); 그리고 (2) N 개의 작업자를 실행하는 것이보다 공정한 비교 일 것이며, 각 작업자는 점 대 점 파이프를 통해 주 스레드와 통신하며, 실행중인 N 작업자 모두가 단일 지점 대 다중 점 대기열에서 당겨집니다. – JJC