샐러리 작업자 쪽에서 jupyter 커널을 사용하고 싶습니다. 셀러리 작업자마다 Jupyter Kernel이 하나 있습니다.작업 내에서 셀러리 작업자 인스턴스에 액세스
기본 값 Worker
셀러리를 무시하고 작업자의 초기화에서 jupyter 커널을 시작하고 stop 메소드를 사용하여 jupyter 커널을 종료합니다.
현재 직면하고있는 문제는 작업이 실행되는 동안 해당 커널 인스턴스에 작업 내에서 어떻게 액세스 할 수 있습니까?
celery
응용 프로그램에 대한 Worker
클래스 정의를 재정의하는 더 좋은 방법은 app.Worker = CustomWorker
입니까?
다음은 사용자 정의 작업자가있는 셀러리 구성입니다. 여기
from __future__ import absolute_import, unicode_literals
from celery import Celery
from jupyter_client import MultiKernelManager
app = Celery('proj',
broker='redis://',
backend='redis://',
include=['tasks'])
app.conf.update(
result_expires=3600
)
class CustomWorker(app.Worker):
def __init__(self, *args, **kwargs):
self.km = MultiKernelManager()
self.kernel_id = self.km.start_kernel()
print("Custom initializing")
self.kernel_client = km.get_kernel(kernel_id).client()
super(CustomWorker, self).__init__(*args, **kwargs)
def on_close(self):
self.km.shutdown_kernel(self.kernel_id)
super(CustomWorker, self).on_close()
app.Worker = CustomWorker
if __name__ == '__main__':
app.start()
는 요청 객체가 내 문제를 해결에 그 근로자의 인스턴스 정보를 추가
tasks.py
from __future__ import absolute_import, unicode_literals
from celery import app
from celery import Task
from tornado import gen
from jupyter_client import MultiKernelManager
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
reply_futures = {}
# This is my celery task where I pass the arbitary python code to execute on
# some celery worker(actually to the corresponding kernel)
@app.task
def pythontask(code):
# I don't know how to get the kernel_client for current celery worker !!?
kernel_client = self.get_current_worker().kernel_client
mid = kernel_client.execute(code)
# defining the callback which will be executed when message arrives on
# zmq stream
def reply_callback(session, stream, msg_list):
idents, msg_parts = session.feed_identities(msg_list)
reply = session.deserialize(msg_parts)
parent_id = reply['parent_header'].get('msg_id')
reply_future = reply_futures.get(parent_id)
if reply_future:
reply_future.set_result(reply)
@gen.coroutine
def execute(kernel_client, code):
msg_id = kernel_client.execute(code)
f = reply_futures[msg_id] = Future()
yield f
raise gen.Return(msg_id)
# initializing the zmq streams and attaching the callback to receive message
# from the kernel
shell_stream = ZMQStream(kernel_client.shell_channel.socket)
iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
# create a IOLoop
loop = ioloop.IOLoop.current()
# listen on the streams
msg_id = loop.run_sync(lambda: execute(kernel_client,code))
print(reply_msgs[msg_id])
reply_msgs[msg_id] = []
# Disable callback and automatic receiving.
shell_stream.on_recv_stream(None)
iopub_stream.on_recv_stream(None)