2 가지 (다중 처리) Queues 중 하나에서 사용할 수있는 것이있을 때까지 (회전하지 않고) 가장 좋은 방법은 무엇입니까? 두 시스템 모두 동일한 시스템에 있습니까?다중 파이썬 다중 처리 대기열에서 "선택"하시겠습니까?
답변
아직 공식적인 방법으로 처리하지 못했습니다. 또는 적어도,이 기반으로하지 :
당신은이 게시물이 무엇을하고 있는지 같은 것을 시도해 볼 수도 있습니다 - 기본 파이프 파일 핸들에 접근 :
을그런 다음 select를 사용하십시오.
대기열 가입자에게 상태 변경이 통보되는 Observer 패턴을 사용할 수 있습니다.
이 경우 작업자 스레드를 각 큐의 수신기로 지정할 수 있으며 준비 신호를 수신 할 때마다 새 항목에서 작업 할 수 있습니다. 그렇지 않으면 절전 모드가됩니다.
대기중인 항목을 하나의 대기열로 전달하는 스레드를 사용하는 것과 같아서 플랫폼 독립적 인 방식으로 다중 처리를 사용할 때 실용적인 선택입니다.
스레드를 피하려면 하위 수준 파이프/FD를 처리해야합니다. 하위 수준 파이프/FD는 플랫폼에 따라 다르며 상위 API와 일관되게 쉽게 처리 할 수 없습니다.
또는 적절한 고급 인터페이스라고 생각되는 콜백을 설정할 수있는 대기열이 필요합니다. 나는.
singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
아마도 다중 처리 패키지가이 API를 확장 할 수 있지만 아직 존재하지 않을 수 있습니다. 이 개념은 "대기열"대신 "채널"이라는 용어를 사용하는 py.execnet에서 잘 작동합니다. 여기를 참조하십시오. http://tinyurl.com/nmtr4w
실제로 select.select에서 다중 처리 .Queue 객체를 사용할 수 있습니다. 즉
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
은 읽을 준비가 된 경우에만 que를 선택합니다.
그것에 대한 문서가 없습니다. 멀티 프로세싱의 소스 코드를 읽었습니다. 큐 라이브러리 (보통 리눅스에서는 /usr/lib/python2.6/multiprocessing/queue.py와 같습니다)를 찾으십시오.
Queue.Queue이 작업을 수행하는 현명한 방법을 찾지 못했습니다.
은 Windows에서 작동하지 않는 것 같습니다. – fluke
이것은 유닉스에서 잘 작동하지만, Windows에서는'select.select' 구현이 파일 기술자가 아닌 소켓만을 다룰 수 있기 때문에 실패합니다. –
'Queue.Queue'와'multiprocessing.Queue'의 가장 큰 차이점은 무엇이며'multiprocessing.Queue'는 멀티 프로세싱뿐만 아니라 멀티 스레딩에 사용될 수 있습니까? – CMCDragonkai
멀티 프로세싱 대기열의 선택이 창에서 얼마나 잘 작동하는지 잘 모름. Windows에서 select가 소켓 핸들링이 아닌 파일 핸들 핸들링을 듣기 때문에 문제가있을 수 있습니다.
내 대답은 차단 방식으로 각 대기열을 수신하고 그 결과를 주 스레드가 수신하는 단일 대기열에 넣으므로 각 개별 대기열을 본질적으로 단일 대기열로 다중화하는 것입니다.이 작업을 수행하는
내 코드는 다음과 같습니다이 도움이
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
희망 :
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
다음 테스트 루틴을 사용하는 방법을 보여줍니다. 위의 코드의
토니 월러스
새로운 버전 ...
멀티 프로세싱 큐 윈도우에서 작동하는 방법을 잘 선택에 확실하지. Windows에서 select가 소켓 핸들링이 아닌 파일 핸들 핸들링을 듣기 때문에 문제가있을 수 있습니다.
내 대답은 차단 방식으로 각 대기열을 수신하고 그 결과를 주 스레드가 수신하는 단일 대기열에 넣으므로 각 개별 대기열을 본질적으로 단일 대기열로 다중화하는 것입니다. 이 작업을 수행하는
내 코드는 다음과 같습니다
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
는하지 마 :
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
추시 코드가 어떻게 작동하는지 보여주기 위해 내 테스트 루틴입니다.
메시지에 헤더를 넣고 공통 큐로 보냅니다. 이렇게하면 코드가 단순 해지고 전반적으로 더 깨끗해질 것입니다.
- 1. 파이썬 다중 처리 피닝
- 2. 트위스트 리액터가있는 파이썬 다중 처리
- 3. 파이썬 다중 처리 - 배열 공유?
- 4. 파이썬 : urllib2 다중 오프너 처리
- 5. 파이썬 다중 처리 풀 프로파일 링
- 6. 파이썬 다중 처리 및 소켓이 닫히지 않음
- 7. 파이썬 다중 처리 : 프로세스의 진행 보고서
- 8. 파이썬 다중 처리 : 사용자 정의 프로세스 풀
- 9. 다중 처리 하위 프로세스
- 10. SqlAlchemy 및 다중 처리
- 11. 처리 System.IO.IOException 다중 연결
- 12. 다중 자바 애플릿 처리
- 13. Python 다중 처리 질문
- 14. 다중 스레드 처리 문제
- 15. 자바에서 다중 스레드 처리
- 16. 파이썬에서 다중 처리
- 17. 트위스트로 다중 처리
- 18. 다중 처리 Pool.imap이 손상 되었습니까?
- 19. 파이썬 큐 및 다중 처리 큐 : 어떻게 동작합니까?
- 20. PHP와 파이썬 같은 다른 언어 사이의 다중 처리
- 21. 왜 파이썬 다중 처리 모듈로 인해 CPU가 완전히 소모됩니까?
- 22. Windows에서 일괄 파일/변환 작업을위한 파이썬 다중 처리
- 23. 파이썬 다중 처리 모듈로 생성 된 하위 프로세스가 인쇄되지 않습니다.
- 24. 파이썬 다중 처리 : 작업자 함수에서 while 루프로 출력되지 않습니다.
- 25. 파이썬 다중 처리 atexit 오류 "atexit._run_exitfuncs의 오류"
- 26. 파이썬 다중 처리 관리자 및 복합 패턴 공유
- 27. 파이썬 다중 처리 : 사용 된 코어 수를 제한합니다.
- 28. pyodbc로 파이썬 다중 처리 및 데이터베이스 액세스가 "안전하지 않습니다"?
- 29. 파이썬 다중 처리 - 프로세스 완료시 메모리를 해제하는 방법?
- 30. 다중 계층 응용 프로그램 처리
+1 와우, 좋은 발견! 내 Google-fu가 약한 것처럼 보입니다 ... – cdleary
불행히도 두 번째 URL이 더 이상 작동하지 않습니다. –