2017-10-21 3 views
1

각 처리기에 고유 한 대기열이있는 토큰을 사용하여 비동기 서버를 만들려고합니다. 종점이 호출되면 작업이 대기열에 배치됩니다. 대기열에서 작업을 비동기 적으로 '사용'하는 소비자 함수가 있습니다. 그러나 소비자의 행동은 내가 self.consumer() 또는 AsyncHandler.consumer()이라고 부르는 지에 따라 달라집니다. 내 초기 추측은 인스턴스 수준 잠금 때문에 발생하지만 그 증거를 찾을 수 없다는 것입니다. 4 게시물 요청을 연속적으로 발생시킵니다. 다음은 출력이있는 2 개의 미리보기입니다.토네이도 RequestHandler에서 __init__에서 호출 된 비동기 소비자가 정적으로 호출 된 것과 다르게 동작하는 이유는 무엇입니까?

import tornado.web 
from tornado import gen 
from time import sleep, time 
from tornado.queues import Queue 
from concurrent.futures import ThreadPoolExecutor 
from tornado.ioloop import IOLoop 

class AsyncHandler(tornado.web.RequestHandler): 

    JOB_QUEUE = Queue() 
    EXECUTOR = ThreadPoolExecutor() 

    def post(self): 
     job = lambda: sleep(3) or print("{}:handler called".format(int(time()))) 
     self.JOB_QUEUE.put(job) 
     self.set_status(200) 
     self.finish() 

    @staticmethod 
    @gen.coroutine 
    def consumer(): 
     while True: 
      job = yield AsyncHandler.JOB_QUEUE.get() 
      print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize())) 
      print(AsyncHandler.JOB_QUEUE) 
      output = yield AsyncHandler.EXECUTOR.submit(job) 
      AsyncHandler.JOB_QUEUE.task_done() 


if __name__ == "__main__": 
    AsyncHandler.consumer() 
    APP = tornado.web.Application([(r"/test", AsyncHandler)]) 
    APP.listen(9000) 
    IOLoop.current().start() 

이 예상되는 출력을 제공한다 :

qsize : 0 
<Queue maxsize=0 tasks=1> 
1508618429:handler called 
qsize : 2 
<Queue maxsize=0 queue=deque([<function...<lambda> at 0x7fbf8f741400>, <function... <lambda> at 0x7fbf8f760ea0>]) tasks=3> 
1508618432:handler called 
qsize : 1 
<Queue maxsize=0 queue=deque([<function AsyncHandler.post.<locals>.<lambda> at 0x7fbf8f760ea0>]) tasks=2> 
1508618435:handler called 
qsize : 0 
<Queue maxsize=0 tasks=1> 
1508618438:handler called 

output = yield AsyncHandler.EXECUTOR.submit(job) 출력을 반환하는 3 초 소요되므로 출력 3 초 지연 도착. 또한 우리는 대기열 구축을 볼 수 있습니다. 코드의 흥미로운 부분을 이제

는 지금 우리가 __init__ 내부에 소비자를 호출하고

qsize : 0 
<Queue maxsize=0 tasks=1> 
qsize : 0 
<Queue maxsize=0 tasks=2> 
qsize : 0 
<Queue maxsize=0 tasks=3> 
qsize : 0 
<Queue maxsize=0 tasks=4> 
1508619138:handler called 
1508619138:handler called 
1508619139:handler called 
1508619139:handler called 

참고 :

import tornado.web 
from tornado import gen 
from time import sleep, time 
from tornado.queues import Queue 
from concurrent.futures import ThreadPoolExecutor 
from tornado.ioloop import IOLoop 

class AsyncHandler(tornado.web.RequestHandler): 
    JOB_QUEUE = Queue() 
    EXECUTOR = ThreadPoolExecutor() 

    def __init__(self, application, request, **kwargs): 
     super().__init__(application, request, **kwargs) 
     self.consumer() 

    def post(self): 
     job = lambda: sleep(3) or print("{}:handler called".format(int(time()))) 
     self.JOB_QUEUE.put(job) 
     self.set_status(200) 
     self.finish() 

    @staticmethod 
    @gen.coroutine 
    def consumer(): 
     while True: 
      job = yield AsyncHandler.JOB_QUEUE.get() 
      print("qsize : {}".format(AsyncHandler.JOB_QUEUE.qsize())) 
      print(AsyncHandler.JOB_QUEUE) 
      output = yield AsyncHandler.EXECUTOR.submit(job) 
      AsyncHandler.JOB_QUEUE.task_done() 


if __name__ == "__main__": 
    APP = tornado.web.Application([(r"/test", AsyncHandler)]) 
    APP.listen(9000) 
    IOLoop.current().start() 

출력은 이상하게 (즐겁게)처럼 보인다. 우리는 작업이 병렬로 (대기열 구축없이) 구축 및 실행 됨으로써 거의 동시에 완료됨을 알 수 있습니다. 마치 output = yield AsyncHandler.EXECUTOR.submit(job)이 (가) 앞으로 차단하지 않는 것입니다. 많은 실험 후에도이 동작을 설명 할 수 없습니다. 나는 정말로 도움이된다.

답변

0

첫 번째 응용 프로그램은 한 번 실행되기 때문에 하나만 실행됩니다 (consumer). 각 요청은 소비자를 "차단"(한 번에 하나씩 만 반복)하므로 이전 다음에 처리됩니다.

후자의 앱은 각 요청마다 새로운 consumer 루프를 시작합니다 (RequestHandler는 req 당 생성되기 때문에). 그래서 첫 번째 요청은 다음 블록을 "차단"하지 않습니다. 왜냐하면 새로운 while Truegetsubmit ...

+0

이 표시됩니다. 감사! 소비자 호출 인스턴스를 인쇄하여 확인했습니다. 따라서 기본적으로 while True 루프는 실제로 단일 소비자 호출이어야합니다. 제 이해는 모든 요청 처리기가 주 스레드에서 실행되므로 여러 소비자가 문제가되지 않아야한다는 것입니다. 이 접근법에서 볼 수있는 특정 단점은 무엇입니까? – ArikKartman

관련 문제