2016-07-03 2 views
1

coroutines를 사용하여 장기 실행 파이프 라인을 구현했습니다. 이것은 일반적으로 로그 스트림을 가져오고, 약간의 강화 (스레드)를 수행하고이를 데이터 저장소에 기록합니다. 좋은 작품을 다음과 같이 호출여러 IO IO 동시 루틴을 병렬로 실행하는 방법

import time 
import random 
from concurrent import futures 

def coroutine(func): 
    def start(*args, **kwargs): 
     cr = func(*args, **kwargs) 
     next(cr) 
     return cr 
    return start 

@coroutine 
def foo(): 
    pool = futures.ThreadPoolExecutor(max_workers=10) 
    while True: 
     i = (yield) 
     fut = pool.submit(enrich, i) 
     fut.add_done_callback(result_handler) 
     time.sleep(random.random()*10) 

def enrich(i): 
    enriched = 'foo' + str(i) 
    time.sleep(random.random()) 
    return enriched 

def source(name, target): 
    while True: 
     time.sleep(random.random()) 
     i = random.randint(0,10) 
     target.send(name + str(i)) 

하나의 파이프 라인 :

다음은 파이프 라인을 시뮬레이션 할 수있는 작은 예입니다.

source('task one ', foo()) 

이제 백그라운드 스레드의 여러 로그에 대해 여러 파이프 라인을 실행하고 싶습니다. 한 가지 시도는 ThreadPoolExecutor를 다시 사용하여 여러 파이프 라인을 구동하는 것입니다.

def run(): 
    pool = futures.ThreadPoolExecutor(max_workers=10) 
    tasks = [source('task one ', foo()), 
      source('task two ', foo())] 
    for task in tasks: 
     fut = pool.submit(task) 
     fut.add_done_callback(result_handler) 

그러나 파이프 라인은 첫 번째 작업 이후에 차단되고 결코 두 번째 작업을 수행하지 못합니다. 백그라운드 스레드에서 그러한 장기 실행 파이프 라인 (아마도 영원히)을 실행하는 올바른 방법은 무엇입니까?

+1

당신은 pool.submit (작업)''에 소스 ('작업 한'foo는()''의 결과를 전달합니다. 그것은'pool.submit (소스 '작업 하나 여야합니다 ','task 2 ', foo())]'그리고 나서,'pool.submit ('*)', foo())' –

+0

' task)' –

+0

우수! 여기서 제시 한 시뮬레이트 된 문제를 해결합니다. 실제 파이프 라인이 상당히 복잡하므로 체크해야 할 콜백 예외가 여전히 있습니다. 빠른 응답 주셔서 감사합니다. – maiaini

답변

2

source 기능이 절대로 끝나지 않으므로 tasks = [source('task one ', foo()), source('task two ', foo())] 목록이 만들어지지 않습니다. 그래서 첫 번째 작업이 실행되고 파이프 블록이 차단됩니다.
해결책은 source과 그 인수를 pool.submit으로 전달하는 것입니다. run` 기능`에서

tasks = [(source, 'task one', foo()), (source, 'task two', foo())] 
for task in tasks: 
    fut = pool.submit(*task) 
    fut.add_done_callback(result_handler)