2016-08-12 5 views
2

신청서를 Gunicornnginx으로 실행합니다. 내 응용 프로그램의 초기화 모듈에서 나는 web.run_app(app)을 사용하여 응용 프로그램을 실행하지 않고 단지 Gunicorn에 의해 가져 오기 될 인스턴스를 만들어 각 작업자에서 실행합니다. Gunicorn가 만듭니다. 그래서 Gunicorn은 몇 개의 작업자 프로세스와 이벤트 루프를 생성 한 다음 해당 루프에서 응용 프로그램의 요청 처리기 runs을 만듭니다.aiohttp 신청 프로세스에서 ZeroMQ 듣기

aiohttp 응용 프로그램은 내가 Gunicorn에 의해 시작 응용 프로그램 프로세스의 어떤에서 발생한 이벤트에 통보 할 연결 WebSockets (모바일 응용 프로그램 클라이언트)의 컬렉션이 있습니다. 그리고 모두WebSockets에 연락하고 모든 신청 절차에 연락하고 싶습니다. 따라서 어떤 종류의 업스트림 프록시를 만들려면 ZeroMQ을 사용하고 각 응용 프로그램 프로세스에서 zmq.SUB 소켓을 사용하여 구독하고 싶습니다.

는 ... 그래서 기본적으로 나는 각 응용 프로그램 작업자 이런 일을 달성하고자 :

context = zmq.Context() 
socket = context.socket(zmq.SUB) 
socket.connect('tcp://localhost:5555') 

while True: 
    event = socket.recv() 
    for ws in app['websockets']: 
     ws.send_bytes(event) 
    # break before app shutdown. How? 

어떻게 aiohttp 응용 프로그램 내 ZeroMQ 프록시 WebSockets에 메시지를 전달을들을 수 있습니다?

이 코드를 이벤트 루프 내에서 백그라운드로 실행하고 응용 프로그램의 수명주기 내에서 aiohttp 이내에 올바르게 실행하고 종료하는 방법은 무엇입니까?


UPDATE

은 이미 문제를 설명하고 해결 방안을 제시 aiohttp의 GitHub의 저장소에 issue을 만들었습니다. 나는 여기서 설명 된 문제에 대해 의견을 높게 평가할 것이다.

답변

1

좋아, 질문이 issue에 대한 논의는 우리가 Application.on_startup() 방법을 사용하여 on_startup 응용 프로그램 신호를 등록 할 수있는 능력을 가지고 있습니다 1.0 I, 즉 버전, aiohttp에 공헌 한 새로운 기능하게되었다.

Documentation.
Working example on the master branch.

#!/usr/bin/env python3 
"""Example of aiohttp.web.Application.on_startup signal handler""" 
import asyncio 

import aioredis 
from aiohttp.web import Application, WebSocketResponse, run_app 

async def websocket_handler(request): 
    ws = WebSocketResponse() 
    await ws.prepare(request) 
    request.app['websockets'].append(ws) 
    try: 
     async for msg in ws: 
      print(msg) 
      await asyncio.sleep(1) 
    finally: 
     request.app['websockets'].remove(ws) 
    return ws 


async def on_shutdown(app): 
    for ws in app['websockets']: 
     await ws.close(code=999, message='Server shutdown') 


async def listen_to_redis(app): 
    try: 
     sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop) 
     ch, *_ = await sub.subscribe('news') 
     async for msg in ch.iter(encoding='utf-8'): 
      # Forward message to all connected websockets: 
      for ws in app['websockets']: 
       ws.send_str('{}: {}'.format(ch.name, msg)) 
      print("message in {}: {}".format(ch.name, msg)) 
    except asyncio.CancelledError: 
     pass 
    finally: 
     print('Cancel Redis listener: close connection...') 
     await sub.unsubscribe(ch.name) 
     await sub.quit() 
     print('Redis connection closed.') 


async def start_background_tasks(app): 
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app)) 


async def cleanup_background_tasks(app): 
    print('cleanup background tasks...') 
    app['redis_listener'].cancel() 
    await app['redis_listener'] 


async def init(loop): 
    app = Application(loop=loop) 
    app['websockets'] = [] 
    app.router.add_get('/news', websocket_handler) 
    app.on_startup.append(start_background_tasks) 
    app.on_cleanup.append(cleanup_background_tasks) 
    app.on_shutdown.append(on_shutdown) 
    return app 

loop = asyncio.get_event_loop() 
app = loop.run_until_complete(init(loop)) 
run_app(app)