2012-11-10 5 views
3

요청 처리를 위해 대부분 하위 프로세스를 사용하는 병 응용 프로그램이 있습니다. 단일 응답을 반환하는 경로의 경우 아래와 같은 작업을 수행합니다.Python 병, 다중 처리 및 gevent를 사용하는 스트리밍 연결

@route('/index') 
def index(): 
    worker = getWorker() 
    return worker.doStuff() 

내 경로 중 하나가 데이터 스트림이어야합니다. 작업자가 응답 스트림을 반환하게하는 현명한 방법을 알 수 없습니다. 아래의 예는 내가하고 싶은 일과 비슷합니다.

@route('/stream') 
def stream(): 
    yield 'START' 
    sleep(3) 
    yield 'MIDDLE' 
    sleep(5) 
    yield 'END' 

다음과 같이하고 싶습니다. 내가 생성기를 산출/반환 할 수 없기 때문에이 방법은 불가능합니다.

@route('/stream') 
def stream(): 
    worker = getWorker() 
    yield worker.doStuff() 
class worker: 
    # Remember, this is run in a subprocess in real life. 
    def doStuff(): 
     yield 'START' 
     sleep(3) 
     yield 'MIDDLE' 
     sleep(5) 
     yield 'END' 

큰 프로젝트 용이며 일을하는 방식에 유연성이별로 없습니다. 나는 때로는 가장 쉬운 대답은 "당신의 디자인이 잘못되었다는 것을 안다." 그러나이 경우에는 필자의 통제 범위를 벗어나는 몇 가지 제약 사항이 있습니다 (경로는 데이터 스트림이어야하며 작업은 하위 프로세스에 의해 수행되어야합니다).

편집 또한 doStuff() 블록을 사용할 수 없습니다. 내가 반환하고 작업자 프로세스가있는 gevent 큐와 같은 것을 만들 수 있기를 원합니다. 문제는 이제 gevent.queue와 Process를 함께 사용할 수있는 것처럼 보이지 않는다는 것입니다.

@route('/stream') 
def index(): 
    body = gevent.queue.Queue() 
    worker = multiprocessing.Process(target=do_stuff, args=body) 
    worker.start() 
    return body() 

def do_stuff(body): 
    while True: 
     gevent.sleep(5) 
     body.put("data") 

답변

0

많은 연구와 실험 끝에 gevent 대기열을 이런 방식으로 파이썬 다중 처리와 함께 사용할 수 없다는 결론을 얻었습니다. 이런 식으로 일하는 대신에 redis와 같은 것을 사용하여 그린 레트가 의사 소통을하도록 허용 할 수 있습니다.

@route('/stream') 
def index(): 
    worker = multiprocessing.Process(target=do_stuff) 
    worker.start() 
    yield redis_server.lpop() 

def do_stuff(body): 
    while True: 
     gevent.sleep(5) 
     redis_server.lpush("data") 
1

마지막 예제에서 worker.doStuff()은 반복 가능한 생성기를 반환합니다. 반환 할 수 있습니다 (yield ~ return). 병은 바이트 또는 유니 코드 문자열을 반환하는 한 iterables를 반환 값으로 사용합니다.

+0

이것은 단순한 예입니다. 실제 응용 프로그램은 많은 양의 데이터를 스트리밍합니다. iterable을 반환하면 스트리밍 대신 모든 데이터를 먼저 채워야하지 않습니까? 어쩌면 다중 처리를 반환 할 수 있습니다. doStuff()에서 처리하고 작업자가 계속 채우게하십시오. –

관련 문제