2017-12-17 1 views
0

gevent.pool (고정 크기)은 여러 작업 생성자간에 공유됩니다. 모든 작업 생성자는 여유 슬롯이있는 경우 풀에 새 녹색let을 적용 할 수 있습니다. 작업이 풀에 추가 된 후 작업 생성자는 추가 된 모든 작업이 완료 될 때까지 대기해야합니다.Gevent : 완료된 greenlets 집합을 기다리는 방법

gevent.queue.JoinableQueue를 사용하여 모든 작업이 완료 될 때까지 기다렸습니다. 그것은 내가 기다리는 끝에 매우 성가신 예외를 받는다는 것을 제외하면 작동합니다.

어떻게 피하면 아래 코드를 수정할 수 있습니까? 내가 뭔가 잘못했을 수 있습니까?

from gevent import monkey, sleep; monkey.patch_all() 
from gevent.queue import JoinableQueue 
from gevent.pool import Pool 

pool = Pool(3) 


def worker(n): 
    print 'Worker {} started'.format(n) 
    sleep(1) 
    print 'Worker {} finished'.format(n) 
    return n 


def main(): 
    results = [] 

    queue = JoinableQueue() 
    for job_no in range(5): 
     pool.wait_available() 
     greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret)) 
     queue.put(greenlet) 
     sleep(.05) 
    print 'All workers added' 

    queue.join() 
    print 'All workers finished', results 


if __name__ == '__main__': 
    main() 

출력 :

Worker 0 started 
Worker 1 started 
Worker 2 started 
Worker 0 finished 
Worker 3 started 
Worker 1 finished 
Worker 4 started 
All workers added 
Worker 2 finished 
Worker 3 finished 
Worker 4 finished 
Traceback (most recent call last): 
    File "main.py", line 32, in <module> 
    main() 
    File "main.py", line 27, in main 
    queue.join() 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join 
    return self._cond.wait(timeout=timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait 
    return self._wait(timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait 
    gotit = self._wait_core(timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core 
    result = self.hub.switch() 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch 
    return RawGreenlet.switch(self) 
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>) 

답변

1

당신은 대기열에서 작업을 소비 할 greenlet이 없기 때문에 오류 '영원히 차단하는 것이이 작업은'모든 greenlets까지 queue.join() 단지 블록이 완료 얻을, 다음 예외 높인.

JoinableQueue 여기 필요하지 않습니다, 완료 모든 greenlets 기다릴 gevent.joinall()를 사용

import gevent 

def main(): 
    results = [] 
    gs = [] 
    for job_no in range(5): 
     greenlet = .. 
     gs.append(greenlet) 
    gevent.joinall(gs) 
    print 'All workers finished', results