GIL이 그림의 일부가 아니기 때문에 다중 처리 모듈을 제외하고는 파이썬에서 진정한 동시성이 없습니다.
원하는 작업은 이벤트 대기열을 확인하고 적절하게 디스패치하는 이벤트 루프가 필요합니다. Pububsub 가능성이 귀하의 인생을 더 쉽게 만들 수 있지만 당신이 원하는 pubsub의 저자로서 나는 과분할지도 모릅니다 : :) 다중 프로세스의 완벽한 통합이 mp 모듈에 의해 어떻게 제공되는지 감안할 때 실제로 사용하지 않는 이유가 있습니까? 병행 성이 정말로 필요한 경우?
모든 스레드에서 하나 이상의 스레드로 이벤트를 이동하려는 사실은 스레드가 게시 할 수있는 공유 게시 대기열 (어떤 이벤트 유형 및 이벤트 데이터를 나타내는 데이터)을 사용할 수 있음을 나타냅니다. 또한 각 스레드에 대해 메시지 대기열을 갖게됩니다. 공유 게시 대기열에 게시하는 스레드, 기본 프로세스 이벤트 루프는 게시 대기열을 확인하고 이벤트를 개별 스레드 메시지 대기열에 적절하게 복사합니다. 각 스레드는 큐를 정기적으로 확인하고 프로세스 된 이벤트를 제거해야합니다. 각 스레드는 특정 이벤트에 대한 기본 프로세스를 구독 할 수 있습니다.
가 여기에 서로 메시지를주고 3 개 보조 스레드의 예입니다
from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint
def log(lock, threadId, msg):
lock.acquire()
print 'Thread', threadId, ':', msg
lock.release()
def auxThread(id, lock, sendQueue, recvQueue, genType):
## Read from the queue
log(lock, id, 'starting')
while True:
# send a message (once in a while!)
if randint(1,10) > 7:
event = dict(type = genType, fromId = id, val = randint(1, 10))
log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
sendQueue.put(event)
# block until we get a message:
maxWait = 1 # second
try:
msg = recvQueue.get(False, maxWait)
log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
if (msg['val'] == 'DONE'):
break
except QueueEmpty:
pass
log(lock, id, 'done')
def createThread(id, lock, postOffice, genType):
messagesForAux = Queue()
args = (id, lock, postOffice, messagesForAux, genType)
auxProc = Process(target=auxThread, args=args)
auxProc.daemon = True
return dict(q=messagesForAux, p=auxProc, id=id)
def mainThread():
postOffice = Queue() # where all threads post their messages
lock = Lock() # so print can be synchronized
# setup threads:
msgThreads = [
createThread(1, lock, postOffice, 'heartbeat'),
createThread(2, lock, postOffice, 'new_socket'),
createThread(3, lock, postOffice, 'keypress'),
]
# identify which threads listen for which messages
dispatch = dict(
heartbeat = (2,),
keypress = (1,),
new_socket = (3,),
)
# start all threads
for th in msgThreads:
th['p'].start()
# process messages
count = 0
while True:
try:
maxWait = 1 # second
msg = postOffice.get(False, maxWait)
for threadId in dispatch[msg['type']]:
thObj = msgThreads[threadId - 1]
thObj['q'].put(msg)
count += 1
if count > 20:
break
except QueueEmpty:
pass
log(lock, 0, "Main thread sending exit signal to aux threads")
for th in msgThreads:
th['q'].put(dict(type='command', val='DONE', fromId=0))
for th in msgThreads:
th['p'].join()
log(lock, th['id'], 'joined main')
log(lock, 0, "DONE")
if __name__ == '__main__':
mainThread()
당신은 완전히 맞아 pypubsub 기능이 설명이 주 유사성하지만 pypubsub의 작은 부분 만 사용하는 것 즉, 노력의 복잡성 대부분이 두 가지 유형의 대기열이라고 생각합니다. pypubsub는이 문제에 대해 많은 도움이됩니다. mp 모듈을 사용하여 큐 시스템을 작동 시키면 (필자의 예제에 따라), 필자 자신의 이벤트가 아닌 pypubsub를 가져 와서 메시지를 게시/대기시킬 수 있습니다.
"전체 프로세스 (즉, 모든 스레드)가 일시 중단됩니다."- GIL 또는 "정지"를 유발하는 다른 것을 언급하고 있습니까? – shx2