2017-01-16 2 views
1

나는 동시에 실행하는 함수의 수를 제한하는 데 사용할 수있는 pool_map 함수가 있습니다.asyncio 풀을 취소하는 방법은 무엇입니까?

아이디어가 coroutine function 가능한 매개 변수 목록에 매핑 된 단일 매개 변수를 받아들이는하지만, 모든 함수는 세마포어 인수로 호출을 래핑하는 것입니다 만 한정 한 번에 실행 양편 :

from typing import Callable, Awaitable, Iterable, Iterator 
from asyncio import Semaphore 

A = TypeVar('A') 
V = TypeVar('V') 

async def pool_map(
    func: Callable[[A], Awaitable[V]], 
    arg_it: Iterable[A], 
    size: int=10 
) -> Generator[Awaitable[V], None, None]: 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    return map(sub, arg_it) 

위의 코드를 수정하여 테스트하지는 않았지만이 변형이 잘 작동합니다. 예 : 다음과 같이 사용할 수 있습니다 :

from asyncio import get_event_loop, coroutine, as_completed 
from contextlib import closing 

URLS = [...] 

async def run_all(awaitables): 
    for a in as_completed(awaitables): 
     result = await a 
     print('got result', result) 

async def download(url): ... 


if __name__ != '__main__': 
    pool = pool_map(download, URLS) 

    with closing(get_event_loop()) as loop: 
     loop.run_until_complete(run_all(pool)) 

미래를 기다리는 동안 예외가 발생하면 문제가 발생합니다. 예약 된 작업이나 아직 실행중인 작업을 모두 취소하는 방법을 볼 수 없으며 세마포를 획득하기를 아직 기다리고있는 작업도 아닙니다.

잘 모르는 라이브러리 나 멋진 빌딩 블록이 있습니까? 아니면 모든 부분을 직접 만들어야합니까? (즉, 그 웨이터에 액세스 할 수있는 Semaphore, 그 실행중인 작업 큐에 대한 액세스를 제공하는 as_finished, ...) cancel이 작업이 이미 완료되어있는 경우에는 조합이 없다는 사실에 기반을

답변

1

사용 ensure_future 대신 코 루틴의 Task를 얻을 수 있습니다 :

import asyncio 
from contextlib import closing 


def pool_map(func, args, size=10): 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = asyncio.Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    tasks = [asyncio.ensure_future(sub(x)) for x in args] 

    return tasks 


async def f(n): 
    print(">>> start", n) 

    if n == 7: 
     raise Exception("boom!") 

    await asyncio.sleep(n/10) 

    print("<<< end", n) 
    return n 


async def run_all(tasks): 
    exc = None 
    for a in asyncio.as_completed(tasks): 
     try: 
      result = await a 
      print('=== result', result) 
     except asyncio.CancelledError as e: 
      print("!!! cancel", e) 
     except Exception as e: 
      print("Exception in task, cancelling!") 
      for t in tasks: 
       t.cancel() 
      exc = e 
    if exc: 
     raise exc 


pool = pool_map(f, range(1, 20), 3) 

with closing(asyncio.get_event_loop()) as loop: 
    loop.run_until_complete(run_all(pool)) 
1

여기에 순진 솔루션입니다 :

async def run_all(awaitables): 
    futures = [asyncio.ensure_future(a) for a in awaitables] 
    try: 
     for fut in as_completed(futures): 
      result = await fut 
      print('got result', result) 
    except: 
     for future in futures: 
      future.cancel() 
     await asyncio.wait(futures) 
관련 문제