2009-07-14 10 views

답변

14

아직 공식적인 방법으로 처리하지 못했습니다. 또는 적어도,이 기반으로하지 :

당신은이 게시물이 무엇을하고 있는지 같은 것을 시도해 볼 수도 있습니다 - 기본 파이프 파일 핸들에 접근 :

그런 다음 select를 사용하십시오.

+0

+1 와우, 좋은 발견! 내 Google-fu가 약한 것처럼 보입니다 ... – cdleary

+1

불행히도 두 번째 URL이 더 이상 작동하지 않습니다. –

1

대기열 가입자에게 상태 변경이 통보되는 Observer 패턴을 사용할 수 있습니다.

이 경우 작업자 스레드를 각 큐의 수신기로 지정할 수 있으며 준비 신호를 수신 할 때마다 새 항목에서 작업 할 수 있습니다. 그렇지 않으면 절전 모드가됩니다.

+0

글쎄,'get'은 파괴적입니다. 그래서 GoF가 묘사 할 때 실제로 큐 자체에서 관찰을 할 수는 없습니다. dequeueing 쓰레드는 "관찰 된"것이어야한다 - 나는 두 개의 추가적인 쓰레드보다 적은 오버 헤드를 원했다. – cdleary

+0

또한 'select'와 같이 호출 프로세스에 대한 단일 액세스 지점을 원한다면 두 스레드 위에 스레드 안전 큐가 필요합니다. – cdleary

2

대기중인 항목을 하나의 대기열로 전달하는 스레드를 사용하는 것과 같아서 플랫폼 독립적 인 방식으로 다중 처리를 사용할 때 실용적인 선택입니다.

스레드를 피하려면 하위 수준 파이프/FD를 처리해야합니다. 하위 수준 파이프/FD는 플랫폼에 따라 다르며 상위 API와 일관되게 쉽게 처리 할 수 ​​없습니다.

또는 적절한 고급 인터페이스라고 생각되는 콜백을 설정할 수있는 대기열이 필요합니다. 나는.

 
    singlequeue = Queue() 
    incoming_queue1.setcallback(singlequeue.put) 
    incoming_queue2.setcallback(singlequeue.put) 
    ... 
    singlequeue.get() 

아마도 다중 처리 패키지가이 API를 확장 할 수 있지만 아직 존재하지 않을 수 있습니다. 이 개념은 "대기열"대신 "채널"이라는 용어를 사용하는 py.execnet에서 잘 작동합니다. 여기를 참조하십시오. http://tinyurl.com/nmtr4w

+0

아주 멋진 인터페이스입니다! Jesse가 @ars의 참조 된 버그 보고서에 언급했듯이 stdlib 인터페이스를 꽉 채우는 것이 이점이 분명 함에도 불구하고 – cdleary

+0

이 참이지만 현재 Queue 공개 API는 공통적 인 것으로 생각되는 사용 사례를 처리하지 않습니다. – hpk42

+0

"common"이라면 bugs.python.org에 버그 보고서 + 패치 (pete 사랑에 대한 테스트 포함)를 제출하고 2.7/3.x로 평가할 수 있습니다. – jnoller

21

실제로 select.select에서 다중 처리 .Queue 객체를 사용할 수 있습니다. 즉

que = multiprocessing.Queue() 
(input,[],[]) = select.select([que._reader],[],[]) 

은 읽을 준비가 된 경우에만 que를 선택합니다.

그것에 대한 문서가 없습니다. 멀티 프로세싱의 소스 코드를 읽었습니다. 큐 라이브러리 (보통 리눅스에서는 /usr/lib/python2.6/multiprocessing/queue.py와 같습니다)를 찾으십시오.

Queue.Queue이 작업을 수행하는 현명한 방법을 찾지 못했습니다.

+1

은 Windows에서 작동하지 않는 것 같습니다. – fluke

+3

이것은 유닉스에서 잘 작동하지만, Windows에서는'select.select' 구현이 파일 기술자가 아닌 소켓만을 다룰 수 있기 때문에 실패합니다. –

+0

'Queue.Queue'와'multiprocessing.Queue'의 가장 큰 차이점은 무엇이며'multiprocessing.Queue'는 멀티 프로세싱뿐만 아니라 멀티 스레딩에 사용될 수 있습니까? – CMCDragonkai

1

멀티 프로세싱 대기열의 선택이 창에서 얼마나 잘 작동하는지 잘 모름. Windows에서 select가 소켓 핸들링이 아닌 파일 핸들 핸들링을 듣기 때문에 문제가있을 수 있습니다.

내 대답은 차단 방식으로 각 대기열을 수신하고 그 결과를 주 스레드가 수신하는 단일 대기열에 넣으므로 각 개별 대기열을 본질적으로 단일 대기열로 다중화하는 것입니다.이 작업을 수행하는

내 코드는 다음과 같습니다이 도움이

import multiq 
import queue 

q1 = queue.Queue() 
q2 = queue.Queue() 

q3 = multiq.multi_queue([q1,q2]) 

q1.put(1) 
q2.put(2) 
q1.put(3) 
q1.put(4) 

res=0 
while not res==4: 
    while not q3.empty(): 
     res = q3.get()[1] 
     print ("returning result =",res) 

희망 :

""" 
Allow multiple queues to be waited upon. 

queue,value = multiq.select(list_of_queues) 
""" 
import queue 
import threading 

class queue_reader(threading.Thread): 
    def __init__(self,inq,sharedq): 
     threading.Thread.__init__(self) 
     self.inq = inq 
     self.sharedq = sharedq 
    def run(self): 
     while True: 
      data = self.inq.get() 
      print ("thread reads data=",data) 
      result = (self.inq,data) 
      self.sharedq.put(result) 

class multi_queue(queue.Queue): 
    def __init__(self,list_of_queues): 
     queue.Queue.__init__(self) 
     for q in list_of_queues: 
      qr = queue_reader(q,self) 
      qr.start() 

def select(list_of_queues): 
    outq = queue.Queue() 
    for q in list_of_queues: 
     qr = queue_reader(q,outq) 
     qr.start() 
    return outq.get() 

다음 테스트 루틴을 사용하는 방법을 보여줍니다. 위의 코드의

토니 월러스

1

새로운 버전 ...

멀티 프로세싱 큐 윈도우에서 작동하는 방법을 잘 선택에 확실하지. Windows에서 select가 소켓 핸들링이 아닌 파일 핸들 핸들링을 듣기 때문에 문제가있을 수 있습니다.

내 대답은 차단 방식으로 각 대기열을 수신하고 그 결과를 주 스레드가 수신하는 단일 대기열에 넣으므로 각 개별 대기열을 본질적으로 단일 대기열로 다중화하는 것입니다. 이 작업을 수행하는

내 코드는 다음과 같습니다

import multiq 
import queue 

q1 = queue.Queue() 
q2 = queue.Queue() 

q3 = multiq.multi_queue([q1,q2]) 

q1.put(1) 
q2.put(2) 
q1.put(3) 
q1.put(4) 
q1.put(multiq.EndOfQueueMarker) 
q2.put(multiq.EndOfQueueMarker) 
res=0 
have_data = True 
while have_data: 
    res = q3.get()[1] 
    print ("returning result =",res) 
    have_data = not(res==multiq.EndOfQueueMarker) 
-2

는하지 마 :

""" 
Allow multiple queues to be waited upon. 

An EndOfQueueMarker marks a queue as 
    "all data sent on this queue". 
When this marker has been accessed on 
all input threads, this marker is returned 
by the multi_queue. 

""" 
import queue 
import threading 

class EndOfQueueMarker: 
    def __str___(self): 
     return "End of data marker" 
    pass 

class queue_reader(threading.Thread): 
    def __init__(self,inq,sharedq): 
     threading.Thread.__init__(self) 
     self.inq = inq 
     self.sharedq = sharedq 
    def run(self): 
     q_run = True 
     while q_run: 
      data = self.inq.get() 
      result = (self.inq,data) 
      self.sharedq.put(result) 
      if data is EndOfQueueMarker: 
       q_run = False 

class multi_queue(queue.Queue): 
    def __init__(self,list_of_queues): 
     queue.Queue.__init__(self) 
     self.qList = list_of_queues 
     self.qrList = [] 
     for q in list_of_queues: 
      qr = queue_reader(q,self) 
      qr.start() 
      self.qrList.append(qr) 
    def get(self,blocking=True,timeout=None): 
     res = [] 
     while len(res)==0: 
      if len(self.qList)==0: 
       res = (self,EndOfQueueMarker) 
      else: 
       res = queue.Queue.get(self,blocking,timeout) 
       if res[1] is EndOfQueueMarker: 
        self.qList.remove(res[0]) 
        res = [] 
     return res 

    def join(self): 
     for qr in self.qrList: 
      qr.join() 

def select(list_of_queues): 
    outq = queue.Queue() 
    for q in list_of_queues: 
     qr = queue_reader(q,outq) 
     qr.start() 
    return outq.get() 

추시 코드가 어떻게 작동하는지 보여주기 위해 내 테스트 루틴입니다.

메시지에 헤더를 넣고 공통 큐로 보냅니다. 이렇게하면 코드가 단순 해지고 전반적으로 더 깨끗해질 것입니다.

관련 문제