asyncio 프레임 워크를 기반으로 앱을 쓰고 있습니다. 이 앱은 속도 제한 (최대 2 통화/초)이있는 API와 상호 작용합니다. 그래서 나는 API와 상호 작용하는 메소드를 rate limiter로 사용하기 위해 셀러리에 옮겼습니다. 그러나 오버 헤드처럼 보입니다.비동기 및 비율 제한
coroutins의 실행을 보장하는 새로운 asyncio 이벤트 루프 (또는 다른 것)를 만들 수있는 방법은 초당 n입니까?
asyncio 프레임 워크를 기반으로 앱을 쓰고 있습니다. 이 앱은 속도 제한 (최대 2 통화/초)이있는 API와 상호 작용합니다. 그래서 나는 API와 상호 작용하는 메소드를 rate limiter로 사용하기 위해 셀러리에 옮겼습니다. 그러나 오버 헤드처럼 보입니다.비동기 및 비율 제한
coroutins의 실행을 보장하는 새로운 asyncio 이벤트 루프 (또는 다른 것)를 만들 수있는 방법은 초당 n입니까?
나는이 같은 사이클을 쓸 수 있습니다 생각 :
while True:
t0 = loop.time()
await make_io_call()
dt = loop.time() - t0
if dt < 0.5:
await asyncio.sleep(0.5 - dt, loop=loop)
허용 된 대답은 정확합니다. 그러나 일반적으로 가능한 한 2QPS에 가까워지기를 원합니다. 이 방법은 병렬 처리를 제공하지 않으므로 make_io_call()이 1 초보다 오래 걸리면 문제가 될 수 있습니다. 보다 나은 해결책은 make_io_call에 세마포어를 전달하여 실행이 시작될 수 있는지 여부를 알 수 있도록하는 것입니다.
여기 구현은 다음과 같습니다. RateLimitingSemaphore
은 속도 제한이 요구 사항 아래로 떨어지면 해당 컨텍스트를 릴리스합니다.
import asyncio
from collections import deque
from datetime import datetime
class RateLimitingSemaphore:
def __init__(self, qps_limit, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.qps_limit = qps_limit
# The number of calls that are queued up, waiting for their turn.
self.queued_calls = 0
# The times of the last N executions, where N=qps_limit - this should allow us to calculate the QPS within the
# last ~ second. Note that this also allows us to schedule the first N executions immediately.
self.call_times = deque()
async def __aenter__(self):
self.queued_calls += 1
while True:
cur_rate = 0
if len(self.call_times) == self.qps_limit:
cur_rate = len(self.call_times)/(self.loop.time() - self.call_times[0])
if cur_rate < self.qps_limit:
break
interval = 1./self.qps_limit
elapsed_time = self.loop.time() - self.call_times[-1]
await asyncio.sleep(self.queued_calls * interval - elapsed_time)
self.queued_calls -= 1
if len(self.call_times) == self.qps_limit:
self.call_times.popleft()
self.call_times.append(self.loop.time())
async def __aexit__(self, exc_type, exc, tb):
pass
async def test(qps):
executions = 0
async def io_operation(semaphore):
async with semaphore:
nonlocal executions
executions += 1
semaphore = RateLimitingSemaphore(qps)
start = datetime.now()
await asyncio.wait([io_operation(semaphore) for i in range(5*qps)])
dt = (datetime.now() - start).total_seconds()
print('Desired QPS:', qps, 'Achieved QPS:', executions/dt)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(test(100))
asyncio.get_event_loop().close()
Desired QPS: 100 Achieved QPS: 99.82723898022084
감사를 인쇄 할 것이다! 나는 대답을 기다리는 동안 이런 식으로 장식자를 만들었다. 이것은 이것이 하나의 적절한 접근 방법 인 것처럼 보입니다. 사실입니까? –
"단일 적절한 접근법"이란 무엇을 의미합니까? 제게는 문제를 해결하는 가장 간단하고 가장 명백한 방법이지만 복잡한 솔루션을 12 개까지 초대 할 수는 있습니다. –
정확히 내가 듣고 싶습니다 :) 감사합니다. –