2016-07-31 2 views
0

나는 futures 모듈이 설치된 python2.7로 작업하고 있습니다.토네이도로 멀티 스레딩을 구현하는 방법은 무엇입니까?

ThreadPoolExecutor를 사용하여 토네이도에서 멀티 스레딩을 구현하려고합니다.

다음은 구현 한 코드입니다.

from __future__ import absolute_import 
from base_handler import BaseHandler 
from tornado import gen 
from pyrestful import mediatypes 
from pyrestful.rest import get, post, put, delete 
from bson.objectid import ObjectId 
from spark_map import Map 
from concurrent import futures 
import tornado 
class MapService(BaseHandler): 

    MapDB = dict() 
    executor = futures.ProcessPoolExecutor(max_workers=3) 

    @tornado.web.asynchronous 
    @gen.coroutine 
    @post(_path='/map', _type=[str, str]) 
    def postMap(self, inp, out): 
     db = self.settings['db'] 
     function = lambda (x,y): (x,y[0]*2) 
     future = yield db.MapInfo.insert({'input': inp, 'output': out, 'input_function': str(function)}) 
     response = {"inserted ID": str(future)} 
     self.write(response) 

     m = Map(inp, out, function, appName=str(future)) 
     futuree = self.executor.submit(m.operation()) 
     self.MapDB[str(future)] = {'map_object': m, 'running_process_future_object': futuree} 
     self.finish() 

    @tornado.web.asynchronous 
    @gen.coroutine 
    @delete(_path='/map/{_id}', _types=[str]) 
    def deleteMap(self, _id): 
     db = self.settings['db'] 
     future = yield db.MapInfo.find_one({'_id': ObjectId(_id)}) 
     if future is None: 
      raise AttributeError('No entry exists in the database with the provided ID') 
     chk = yield db.MapInfo.remove(future) 
     response = { "Succes": "OK" } 
     self.write(response) 

     self.MapDB[_id]['map_object'].stop() 
     del self.MapDB[_id] 
     self.finish() 

위 코드에서, 나는 inp와 out의 post 요청을 사용하여 두 개의 입력을 받는다. 그런 다음 그들과 몇 가지 작업을 수행합니다. 이 작업은 프로세스를 중지하고 제거하기 위해 삭제 요청을받을 때까지 지속되어야합니다.

내가 직면 한 문제는 여러 요청 때문입니다. 다른 요청이 첫 번째 요청을 완료 할 때까지 기다리는 동안 첫 번째 요청 만 실행하여 주 Ioloop을 차단합니다.

그래서 각 프로세스를 별도의 스레드로 실행하고 싶습니다. 어떻게 구현해야합니까?

+0

m.operation()은 무엇을합니까? –

+0

m.operation()은 구현 한 사용자 정의 클래스 Map의 함수입니다. 기본적으로 일부 데이터에 대해 일부 계산을 수행합니다. 이 작업은 해당 작업에 대해 삭제 요청이 수신 될 때까지 실행되어야합니다. – vidhan

+0

m.operation()에서 블로킹하고있는 것처럼 들리며 비동기 적으로 다시 작성해야합니다. 그러나 우리는 operation()을위한 몇 가지 코드를 확인해야 할 것입니다. 또는 operation() 호출 전후에 "print"를 넣을 수도 있습니다. –

답변

1

m.operation()이 (가) 차단되어 나타나므로 스레드에서 실행해야합니다. m.operation()를 호출하는 동안 당신이 블록을 메인 스레드를하고있는 방법, 그리고 후 스레드 을 생성합니다 :

:

당신이 원하는
self.executor.submit(m.operation()) 

는 대신, 그것을 실행하는 스레드 기능을 전달하는

self.executor.submit(m.operation) 

아니요.

관련 문제