2017-03-16 1 views
0

샐러리 작업자 쪽에서 jupyter 커널을 사용하고 싶습니다. 셀러리 작업자마다 Jupyter Kernel이 하나 있습니다.작업 내에서 셀러리 작업자 인스턴스에 액세스

기본 값 Worker 셀러리를 무시하고 작업자의 초기화에서 jupyter 커널을 시작하고 stop 메소드를 사용하여 jupyter 커널을 종료합니다.

현재 직면하고있는 문제는 작업이 실행되는 동안 해당 커널 인스턴스에 작업 내에서 어떻게 액세스 할 수 있습니까?

celery 응용 프로그램에 대한 Worker 클래스 정의를 재정의하는 더 좋은 방법은 app.Worker = CustomWorker입니까?

다음은 사용자 정의 작업자가있는 셀러리 구성입니다. 여기

from __future__ import absolute_import, unicode_literals 
from celery import Celery 
from jupyter_client import MultiKernelManager 

app = Celery('proj', 
    broker='redis://', 
    backend='redis://', 
    include=['tasks']) 

app.conf.update(
    result_expires=3600 
) 

class CustomWorker(app.Worker): 
    def __init__(self, *args, **kwargs): 
     self.km = MultiKernelManager() 
     self.kernel_id = self.km.start_kernel() 
     print("Custom initializing") 
     self.kernel_client = km.get_kernel(kernel_id).client() 
     super(CustomWorker, self).__init__(*args, **kwargs) 

    def on_close(self): 
     self.km.shutdown_kernel(self.kernel_id) 
     super(CustomWorker, self).on_close() 

app.Worker = CustomWorker 

if __name__ == '__main__': 
    app.start() 

는 요청 객체가 내 문제를 해결에 그 근로자의 인스턴스 정보를 추가 tasks.py

from __future__ import absolute_import, unicode_literals 
from celery import app 

from celery import Task 
from tornado import gen 
from jupyter_client import MultiKernelManager 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
ioloop.install() 

reply_futures = {} 

# This is my celery task where I pass the arbitary python code to execute on 
# some celery worker(actually to the corresponding kernel) 
@app.task 
def pythontask(code): 
    # I don't know how to get the kernel_client for current celery worker !!? 
    kernel_client = self.get_current_worker().kernel_client 
    mid = kernel_client.execute(code) 

    # defining the callback which will be executed when message arrives on 
    # zmq stream 
    def reply_callback(session, stream, msg_list): 
     idents, msg_parts = session.feed_identities(msg_list) 
     reply = session.deserialize(msg_parts) 
     parent_id = reply['parent_header'].get('msg_id') 
     reply_future = reply_futures.get(parent_id) 
     if reply_future: 
      reply_future.set_result(reply) 

    @gen.coroutine 
    def execute(kernel_client, code): 
     msg_id = kernel_client.execute(code) 
     f = reply_futures[msg_id] = Future() 
     yield f 
     raise gen.Return(msg_id) 

    # initializing the zmq streams and attaching the callback to receive message 
    # from the kernel 
    shell_stream = ZMQStream(kernel_client.shell_channel.socket) 
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket) 
    shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 

    # create a IOLoop 
    loop = ioloop.IOLoop.current() 
    # listen on the streams 
    msg_id = loop.run_sync(lambda: execute(kernel_client,code)) 
    print(reply_msgs[msg_id]) 
    reply_msgs[msg_id] = [] 

    # Disable callback and automatic receiving. 
    shell_stream.on_recv_stream(None) 
    iopub_stream.on_recv_stream(None) 

답변

0

의 골격이다. 그렇게하기 위해 나는 worker 클래스의 _process_task 메소드를 오버라이드했다.

def _process_task(self, req): 
    try: 
    req.kwargs['kernel_client'] = self.kernel_client 
    print("printing from _process_task {}".format(req.kwargs)) 
    req.execute_using_pool(self.pool) 
    except TaskRevokedError: 
    try: 
     self._quick_release() # Issue 877 
    except AttributeError: 
     pass 
    except Exception as exc: 
    logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True) 

여기에 내가 솔로 모드에서 노동자를 시작할 때이 일에만 작동 그렇지 않으면하지가 일부 산세 오류를 던져 kernel_client

@app.task(bind=True) 
def pythontask(self,code, kernel_client=None): 

    mid = kernel_client.execute(code) 

    print("{}".format(kernel_client)) 
    print("{}".format(mid)) 

에 액세스 어디에 내 작업입니다. 어쨌든 솔로 근로자를 사용하는 것이 나의 요구 사항이므로이 솔루션이 저에게 효과적입니다

관련 문제