2013-10-16 6 views
2

체인에 대한 참조를 데이터베이스에 보관합니다.파이썬 셀러리 : 이전 체인에 작업을 추가하는 방법

from tasks import t1, t2, t3 
from celery import chain 
res = chain(t1.s(123)|t2.s()|t3.s())() 
res.get() 

이 특정 체인에 어떻게 다른 작업을 추가 할 수 있습니까?

res.append(t2.s()) 

내 목표는 체인 내 코드에서 지정한 것과 같은 순서로 실행되어 있는지 확인하는 것입니다. 그리고 내 체인에서 작업이 실패하면 다음 작업이 실행되지 않습니다.

알다시피 나는 지정된 대기열에서 매우 큰 작업을 사용하고 있습니다.

답변

4

모든 정보는 메시지에 포함되어 있습니다.

메시지가 전 세계에 전송 될 수도 있고, 중간 프로세서가 메시지를 소비 할 수도 있습니다. 이러한 이유로 메시지를 보낸 후에는 메시지를 수정할 수 없습니다.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

내 목표는 체인 내 코드에서 지정한 것과 같은 순서로 실행되어 있는지 확인하는 것입니다 참조하십시오. 그리고> 내 체인에서 작업이 실패하면 다음 작업이 실행되지 않습니다.

메시지의 일부로 주문이 전송되고 아무 작업도 실패하면 을 계속할 수 없습니다.

이제 런타임에 작업을 추가 할 수있게하려면 정보를 데이터베이스에 저장하고 작업 자체에서이를 확인하고 새 작업을 호출 할 수 있습니다. 그래도이 작업을 수행 할 때 몇 가지 문제가 있습니다.

1) 성공한 경우 다음 작업이 다음 작업을 호출하는 등 체인의 첫 번째 작업이 다음 작업을 호출합니다.

2)이 프로세스에 작업을 추가하면 첫 번째 작업이 이미 실행 된 경우 어떻게됩니까? 또는 두 번째 또는 세 번째?

이렇게 추측 할 수 있듯이 작동하려면 약간의 동기화가 필요합니다.

그런 다음
from celery import subtask 
from celery.result import from_serializable 

@app.task(bind=True) 
def after_task(self, result, callback, errback=None): 
    result = from_serializable(result) 
    if not result.ready(): 
     raise self.retry(countdown=1) 
    if task.successful(): 
     subtask(callback).delay(result.get()) 
    else: 
     if errback: 
      subtask(errback)() 


def add_to_chain(result, callback, errback=None): 
    callback = callback.clone()  # do not modify caller 
    new_result = callback.freeze() # sets id for callback, returns AsyncResult 
    new_result.parent = result 
    after_task.delay(result.serializable(), callback, errback) 
    return new_result 

이처럼 사용할 수 있습니다 :

from tasks import t1, t2, t3 

res = (t1.s(123) | t2.s() | t3.s())() 
res = add_to_chain(t2.s()) 

나는 쉬운 솔루션은 다음 콜백을 적용 을 완료하는 데 하나의 작업을 대기 작업을 생성하는 것 같아요 메모 :

bind=True은 다음 3.1 버전에서 새로 제공되며 이전 버전은 으로, 설명서를 사용하고 current_task.retry (get from celery import current_task)을 사용하십시오.

Signature.freeze 또한 3 가지입니다.,

from celery import uuid 

def freeze(sig, _id=None): 
    opts = sig.options 
    try: 
     tid = opts['task_id'] 
    except KeyError: 
     tid = opts['task_id'] = _id or uuid() 
    return sig.AsyncResult(tid) 
+0

당신이 @asksol 감사합니다 내가 당신의 대답을 읽 제대로 이해하려고 노력 : 1, 당신이 사용할 수있는 이전 버전의 동일한 작업을 수행합니다 –

관련 문제