각 처리기에 고유 한 대기열이있는 토큰을 사용하여 비동기 서버를 만들려고합니다. 종점이 호출되면 작업이 대기열에 배치됩니다. 대기열에서 작업을 비동기 적으로 '사용'하는 소비자 함수가 있습니다. 그러나 소비자의 행동은 내가 self.consumer()
또는 AsyncHandler.consumer()
이라고 부르는 지에 따라 달라집니다. 내 초기 추측은 인스턴스 수준 잠금 때문에 발생하지만 그 증거를 찾을 수 없다는 것입니다. 4 게시물 요청을 연속적으로 발생시킵니다. 다음은 출력이있는 2 개의 미리보기입니다.토네이도 RequestHandler에서 __init__에서 호출 된 비동기 소비자가 정적으로 호출 된 것과 다르게 동작하는 이유는 무엇입니까?
import tornado.web
from tornado import gen
from time import sleep, time
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
class AsyncHandler(tornado.web.RequestHandler):
JOB_QUEUE = Queue()
EXECUTOR = ThreadPoolExecutor()
def post(self):
job = lambda: sleep(3) or print("{}:handler called".format(int(time())))
self.JOB_QUEUE.put(job)
self.set_status(200)
self.finish()
@staticmethod
@gen.coroutine
def consumer():
while True:
job = yield AsyncHandler.JOB_QUEUE.get()
print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize()))
print(AsyncHandler.JOB_QUEUE)
output = yield AsyncHandler.EXECUTOR.submit(job)
AsyncHandler.JOB_QUEUE.task_done()
if __name__ == "__main__":
AsyncHandler.consumer()
APP = tornado.web.Application([(r"/test", AsyncHandler)])
APP.listen(9000)
IOLoop.current().start()
이 예상되는 출력을 제공한다 :
qsize : 0
<Queue maxsize=0 tasks=1>
1508618429:handler called
qsize : 2
<Queue maxsize=0 queue=deque([<function...<lambda> at 0x7fbf8f741400>, <function... <lambda> at 0x7fbf8f760ea0>]) tasks=3>
1508618432:handler called
qsize : 1
<Queue maxsize=0 queue=deque([<function AsyncHandler.post.<locals>.<lambda> at 0x7fbf8f760ea0>]) tasks=2>
1508618435:handler called
qsize : 0
<Queue maxsize=0 tasks=1>
1508618438:handler called
output = yield AsyncHandler.EXECUTOR.submit(job)
출력을 반환하는 3 초 소요되므로 출력 3 초 지연 도착. 또한 우리는 대기열 구축을 볼 수 있습니다. 코드의 흥미로운 부분을 이제
__init__
내부에 소비자를 호출하고
qsize : 0
<Queue maxsize=0 tasks=1>
qsize : 0
<Queue maxsize=0 tasks=2>
qsize : 0
<Queue maxsize=0 tasks=3>
qsize : 0
<Queue maxsize=0 tasks=4>
1508619138:handler called
1508619138:handler called
1508619139:handler called
1508619139:handler called
참고 :
import tornado.web
from tornado import gen
from time import sleep, time
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
class AsyncHandler(tornado.web.RequestHandler):
JOB_QUEUE = Queue()
EXECUTOR = ThreadPoolExecutor()
def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.consumer()
def post(self):
job = lambda: sleep(3) or print("{}:handler called".format(int(time())))
self.JOB_QUEUE.put(job)
self.set_status(200)
self.finish()
@staticmethod
@gen.coroutine
def consumer():
while True:
job = yield AsyncHandler.JOB_QUEUE.get()
print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize()))
print(AsyncHandler.JOB_QUEUE)
output = yield AsyncHandler.EXECUTOR.submit(job)
AsyncHandler.JOB_QUEUE.task_done()
if __name__ == "__main__":
APP = tornado.web.Application([(r"/test", AsyncHandler)])
APP.listen(9000)
IOLoop.current().start()
출력은 이상하게 (즐겁게)처럼 보인다. 우리는 작업이 병렬로 (대기열 구축없이) 구축 및 실행 됨으로써 거의 동시에 완료됨을 알 수 있습니다. 마치 output = yield AsyncHandler.EXECUTOR.submit(job)
이 (가) 앞으로 차단하지 않는 것입니다. 많은 실험 후에도이 동작을 설명 할 수 없습니다. 나는 정말로 도움이된다.
이 표시됩니다. 감사! 소비자 호출 인스턴스를 인쇄하여 확인했습니다. 따라서 기본적으로 while True 루프는 실제로 단일 소비자 호출이어야합니다. 제 이해는 모든 요청 처리기가 주 스레드에서 실행되므로 여러 소비자가 문제가되지 않아야한다는 것입니다. 이 접근법에서 볼 수있는 특정 단점은 무엇입니까? – ArikKartman