0

나는 클라인과 디퍼링을 탐험 중이다. 다음 예제에서는 하위 프로세스를 사용하여 숫자를 증가시키고 Future를 통해이를 반환하려고합니다. 나는 미래의 전화를받을 수 있습니다.연기 된 클라인 앱

지연된 개체는 절대로 cb() 함수를 호출하지 않으며 끝점에 대한 요청은 반환되지 않습니다. 문제를 파악하도록 도와주세요. 다음

이 다음 내 server.py 코드

from klein import Klein 
from twisted.internet.defer import inlineCallbacks, returnValue 
import Process4 

if __name__ == '__main__': 
    app = Klein() 

    @app.route('/visit') 
    @inlineCallbacks 
    def get_num_visit(request):   
     try: 
      resp = yield Process4.get_visitor_num() 
      req.setResponseCode(200) 
      returnValue('Visited = {}'.format(resp)) 
     except Exception as e: 
      req.setResponseCode(500) 
      returnValue('error {}'.format(e)) 

    print('starting server') 
    app.run('0.0.0.0', 5005) 

것은 프로세스 P1을 시작하기 전에 콜백을 추가

from multiprocessing import Process 
from concurrent.futures import Future 
from time import sleep 
from twisted.internet.defer import Deferred 

def foo(x): 
    result = x+1 
    sleep(3) 
    return result 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 
     self.visit = 0 

    def run(self): 
     r = foo(self.visit) 
     self.f.set_result(result=r) 

def cb(result): 
    print('visitor number {}'.format(result)) 
    return result 

def eb(err): 
    print('error occurred {}'.format(err)) 
    return err 


def future_to_deferred(future): 
    d = Deferred() 

    def callback(f): 
     e = f.exception() 
     if e: 
      d.errback(e) 
     else: 
      d.callback(f.result()) 

    future.add_done_callback(callback) 
    return d 

def get_visitor_num(): 
    p1 = MyProcess(target=foo, args=None) 
    d = future_to_deferred(p1.f) 
    p1.start() 
    d.addCallback(cb) 
    d.addErrback(eb) 
    sleep(1) 
    return d 

편집 한

이 문제를 해결 Process4.py 코드 cb() 함수를 호출합니다. 그러나 여전히 엔드 포인트에 대한 http 요청은 리턴되지 않습니다.

+0

트위스트와 다음 stdlib 멀티 모듈에게 있습니다. 대신 Ampoule을 고려하십시오. https://stackoverflow.com/questions/5715217/mix-python-twisted-with-multiprocessing 및 https://stackoverflow.com/questions/1470850/twisted-network-client-with-multiprocessing-workers 및 기타 유사한 질문을 참조하십시오. 에. –

+0

결과가 메인 스레드에서 설정되도록'reactor.callFromThread'를 호출해야합니다. [내가 잠시 줬던이 답변] (https://stackoverflow.com/questions/45930518/how-to-make-twisted-defer-get-function-result/45969032#45969032)을 살펴보고 말이된다. 비슷한 것을 적용 할 수 있어야합니다. –

+0

답장을 보내 주셔서 감사합니다. 제 대답을 아래에서보십시오. @ notorious.no, Jean-Paul Calderone –

답변

0

그것은집니다 밖으로 그() 방법은 어떤 스레드가없는 자식 프로세스에있는 콜백() 방법을 트리거 안목으로 미래의 결과를 self.f.set_result (결과 = R) 설정 결과가 반환되기를 기다리는 중!

MainProcess에서 트리거 된 callback() 함수를 얻으려면 MainProcess에서 작업자 스레드를 사용하여 다중 프로세스 대기열을 사용하여 자식 프로세스의 결과를 얻은 다음 미래의 결과를 설정해야했습니다.

@ notorious.no 답장을 보내 주셔서 감사합니다. 한가지 주목해야 할 것은 reactor.callFromThread가 작업 스레드에서 결과를 MainThread로 수정 한 코드에서 전환하지만 d.callback (f.result())은 정상적으로 작동하지만 작업자 스레드에서 결과를 반환한다는 것입니다.

다음은 가난한 맞는 수정 작업 코드

server.py

from klein import Klein 
from twisted.internet.defer import inlineCallbacks, returnValue 


import Process4 

if __name__ == '__main__': 
    app = Klein() 
    visit_count = 0 

    @app.route('/visit') 
    @inlineCallbacks 
    def get_num_visit(req): 
     global visit_count 
     try: 
      resp = yield Process4.get_visitor_num(visit_count) 
      req.setResponseCode(200) 
      visit_count = resp 
      returnValue('Visited = {}'.format(resp)) 
     except Exception as e: 
      req.setResponseCode(500) 
      returnValue('error {}'.format(e)) 

    print('starting server') 
    app.run('0.0.0.0', 5005) 

Process4.py

from multiprocessing import Process, Queue 
from concurrent.futures import Future 
from time import sleep 
from twisted.internet.defer import Deferred 
import threading 
from twisted.internet import reactor 


def foo(x, q): 
    result = x+1 
    sleep(3) 
    print('setting result, {}'.format(result)) 
    q.put(result) 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.visit = 0 

    def run(self): 
     self.target(*self.args) 


def future_to_deferred(future): 
    d = Deferred() 

    def callback(f): 
     e = f.exception() 
     print('inside callback {}'.format(threading.current_thread().name)) 
     if e: 
      print('calling errback') 
      d.errback(e) 
      # reactor.callFromThread(d.errback, e) 
     else: 
      print('calling callback with result {}'.format(f.result())) 
      # d.callback(f.result()) 
      reactor.callFromThread(d.callback, f.result()) 
    future.add_done_callback(callback) 
    return d 


def wait(q,f): 
    r = q.get(block=True) 
    f.set_result(r) 


def get_visitor_num(x): 

    def cb(result): 
     print('inside cb visitor number {} {}'.format(result, threading.current_thread().name)) 
     return result 

    def eb(err): 
     print('inside eb error occurred {}'.format(err)) 
     return err 

    f = Future() 
    q = Queue() 
    p1 = MyProcess(target=foo, args=(x,q,)) 

    wait_thread = threading.Thread(target=wait, args=(q,f,)) 
    wait_thread.start() 

    defr = future_to_deferred(f) 
    defr.addCallback(cb) 
    defr.addErrback(eb) 
    p1.start() 
    print('returning deferred') 
    return defr