나는 동시에 실행하는 함수의 수를 제한하는 데 사용할 수있는 pool_map
함수가 있습니다.asyncio 풀을 취소하는 방법은 무엇입니까?
아이디어가 coroutine function 가능한 매개 변수 목록에 매핑 된 단일 매개 변수를 받아들이는하지만, 모든 함수는 세마포어 인수로 호출을 래핑하는 것입니다 만 한정 한 번에 실행 양편 :
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
위의 코드를 수정하여 테스트하지는 않았지만이 변형이 잘 작동합니다. 예 : 다음과 같이 사용할 수 있습니다 :
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
미래를 기다리는 동안 예외가 발생하면 문제가 발생합니다. 예약 된 작업이나 아직 실행중인 작업을 모두 취소하는 방법을 볼 수 없으며 세마포를 획득하기를 아직 기다리고있는 작업도 아닙니다.
잘 모르는 라이브러리 나 멋진 빌딩 블록이 있습니까? 아니면 모든 부분을 직접 만들어야합니까? (즉, 그 웨이터에 액세스 할 수있는 Semaphore
, 그 실행중인 작업 큐에 대한 액세스를 제공하는 as_finished
, ...) cancel
이 작업이 이미 완료되어있는 경우에는 조합이 없다는 사실에 기반을