2016-06-29 3 views
6

저는 셀러 리에 새로 온 사람이고이 태스크 큐를 프로젝트에 통합하려고 시도하지만 셀러 리가 실패한 태스크를 처리하는 방법을 아직 알지 못합니다 모든 사람들을 amqp 데드 레터 큐에 보관하고 싶습니다.셀러리 : 데드 - 레터 큐에 실패한 태스크를 어떻게 라우팅 할 수 있습니까?

문서 here에 따르면 acks_late가 활성화 된 작업에서 Reject (거부)를 선택하면 메시지를 확인하는 것과 동일한 효과가 생성 된 다음 데드 - 레터 대기열에 대한 몇 가지 단어가있는 것으로 보입니다.

그래서 나는 나의 다시마 객체

CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct') 
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE, 
          routing_key='celery-dlq') 

DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME, 
          'x-dead-letter-routing-key': 'celery-dlq'} 

CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME, 
           arguments=DEAD_LETTER_CELERY_OPTIONS, 
           type='direct') 

CELERY_QUEUE = Queue(CELERY_QUEUE_NAME, 
         exchange=CELERY_EXCHANGE, 
         routing_key='celery-q') 

내가 실행하고있어 작업처럼되어 찾고있는

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'], 
         CELERY_TASK_SERIALIZER='json', 
         CELERY_QUEUES=[CELERY_QUEUE, 
             CELERY_DLX_QUEUE], 
         CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME, 
         CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE 
         ) 

내 셀러리 설정에 사용자 정의 기본 큐를 추가 :

class HookTask(Task): 
    acks_late = True 

def run(self, ctx, data): 
    logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self)) 
    self.hook_process(ctx, data) 


def on_failure(self, exc, task_id, args, kwargs, einfo): 
    logger.error('task_id %s failed, message: %s', task_id, exc.message) 

def hook_process(self, t_ctx, body): 
    # Build context 
    ctx = TaskContext(self.request, t_ctx) 
    logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id) 
    raise Reject('no_reason', requeue=False) 

Reject 예외를 제기 할 때 결과가없는 약간의 테스트를 수행했습니다.

이제 Task.on_failure를 재정 의하여 데드 - 레터 큐에 실패한 작업 경로를 강제로 적용하는 것이 좋은지 궁금합니다. 나는 이것이 효과가있을 것이라고 생각하지만이 해결책은 너무 깨끗하지 않다. 왜냐하면 나는 붉은 셀러리가이 모든 것을 혼자해야만하기 때문이다.

도움 주셔서 감사합니다.

+1

아무도 대답하지 못한 슬픈. 이것에 대한 해결책을 찾았습니까? @onizukaek –

답변

1

나는 을 CELERY_EXCHANGE에 넣으면 안된다고 생각합니다. CELERY_QUEUE에 queue_arguments=DEAD_LETTER_CELERY_OPTIONS을 추가해야합니다.

다음 예는 내가 한 그것은 잘 작동 것입니다 :

큐의 생성 후
from celery import Celery 
from kombu import Exchange, Queue 
from celery.exceptions import Reject 

app = Celery(
    'tasks', 
    broker='amqp://[email protected]:5672//', 
    backend='redis://localhost:6379/0') 

dead_letter_queue_option = { 
    'x-dead-letter-exchange': 'dlx', 
    'x-dead-letter-routing-key': 'dead_letter' 
} 

default_exchange = Exchange('default', type='direct') 
dlx_exchange = Exchange('dlx', type='direct') 

default_queue = Queue(
    'default', 
    default_exchange, 
    routing_key='default', 
    queue_arguments=dead_letter_queue_option) 
dead_letter_queue = Queue(
    'dead_letter', dlx_exchange, routing_key='dead_letter') 

app.conf.task_queues = (default_queue, dead_letter_queue) 

app.conf.task_default_queue = 'default' 
app.conf.task_default_exchange = 'default' 
app.conf.task_default_routing_key = 'default' 


@app.task 
def add(x, y): 
    return x + y 


@app.task(acks_late=True) 
def div(x, y): 
    try: 
     z = x/y 
     return z 
    except ZeroDivisionError as exc: 
     raise Reject(exc, requeue=False) 

, 당신은 '기능'열 것을 볼 수, 그것은 DLX (데드 - 레터 교환)를 보여줍니다 및 DLK (데드 레터 라우팅 키) 레이블이 있습니다.

enter image description here

참고 : 이미 RabbitMQ에서 그들을 만든 경우 당신은 이전 대기열을 삭제해야합니다. 셀러리가 기존 대기열을 삭제하지 않고 새 대기열을 다시 작성하지 않기 때문입니다.

+0

예제가 제대로 작동하지 않습니다. 'dead_letter_queue'를'task_queues'에서 제거해야합니다. 그렇지 않으면 샐러리 작업자가이 대기열에 연결하고 처리하지 않고 메시지를 소비합니다. 이러한 대기열을 만들려면 다른 방법이 필요하지만 –

+0

@ K.P. 나는 똑같은 문제를 만났다. 당신은 절대적으로 맞습니다. 귀하의 솔루션에 감사드립니다. 나는 그것을 테스트했고 훌륭하게 작동합니다! :-) –

1

나는 비슷한 경우를 겪고 있으며 같은 문제에 직면했습니다. 또한 하드 코딩 된 값이 아니라 구성에 기반한 솔루션을 원했습니다. Hengfeng Li의 제안 된 솔루션은 매우 도움이되었고 메커니즘과 개념을 이해하는 데 도움이되었습니다. 그러나 데드 레터 대기열을 선언하는 데 문제가있었습니다. 특히 task_default_queues에 DLQ를 주입 한 경우 Celery가 대기열을 사용 중이며 항상 비어 있습니다. 그래서 DL (X/Q)를 선언하는 수동 방법이 필요했습니다.

Celery의 Bootsteps은 코드 실행 단계를 잘 제어하기 때문에 사용했습니다. 나의 초기 실험은 앱을 생성 한 후 정확하게 생성하는 것이었지만 프로세스를 포크 한 후에 연결이 끊어 지거나 추한 예외가 생겼다. Pool 단계 후에 정확히 실행되는 부트 스텝을 사용하면 각 작업자가 분기되고 연결 풀이 준비된 후에 각 작업자가 시작할 수 있음을 보장 할 수 있습니다.

마지막으로 셀카의 Reject을 다시 올려서 잡히지 않은 예외를 태스크 거부로 변환하는 데코레이터를 만들었습니다.재시도 같은 작업 처리 방법에 대한 작업이 이미 결정된 경우에 특히주의해야합니다.

전체 작동 예제입니다. div.delay(1, 0) 작업을 실행하고 작동 방식을 확인하십시오.

from celery import Celery 
from celery.exceptions import Reject, TaskPredicate 
from functools import wraps 
from kombu import Exchange, Queue 

from celery import bootsteps 


class Config(object): 

    APP_NAME = 'test' 

    task_default_queue = '%s_celery' % APP_NAME 
    task_default_exchange = "%s_celery" % APP_NAME 
    task_default_exchange_type = 'direct' 
    task_default_routing_key = task_default_queue 
    task_create_missing_queues = False 
    task_acks_late = True 

    # Configuration for DLQ support 
    dead_letter_exchange = '%s_dlx' % APP_NAME 
    dead_letter_exchange_type = 'direct' 
    dead_letter_queue = '%s_dlq' % APP_NAME 
    dead_letter_routing_key = dead_letter_queue 


class DeclareDLXnDLQ(bootsteps.StartStopStep): 
    """ 
    Celery Bootstep to declare the DL exchange and queues before the worker starts 
     processing tasks 
    """ 
    requires = {'celery.worker.components:Pool'} 

    def start(self, worker): 
     app = worker.app 

     # Declare DLX and DLQ 
     dlx = Exchange(
      app.conf.dead_letter_exchange, 
      type=app.conf.dead_letter_exchange_type) 

     dead_letter_queue = Queue(
      app.conf.dead_letter_queue, 
      dlx, 
      routing_key=app.conf.dead_letter_routing_key) 

     with worker.app.pool.acquire() as conn: 
      dead_letter_queue.bind(conn).declare() 


app = Celery('tasks', broker='pyamqp://[email protected]//') 
app.config_from_object(Config) 


# Declare default queues 
# We bypass the default mechanism tha creates queues in order to declare special queue arguments for DLX support 
default_exchange = Exchange(
    app.conf.task_default_exchange, 
    type=app.conf.task_default_exchange_type) 
default_queue = Queue(
     app.conf.task_default_queue, 
     default_exchange, 
     routing_key=app.conf.task_default_routing_key, 
     queue_arguments={ 
      'x-dead-letter-exchange': app.conf.dead_letter_exchange, 
      'x-dead-letter-routing-key': app.conf.dead_letter_routing_key 
     }) 

# Inject the default queue in celery application 
app.conf.task_queues = (default_queue,) 

# Inject extra bootstep that declares DLX and DLQ 
app.steps['worker'].add(DeclareDLXnDLQ) 


def onfailure_reject(requeue=False): 
    """ 
    When a task has failed it will raise a Reject exception so 
    that the message will be requeued or marked for insertation in Dead Letter Exchange 
    """ 

    def _decorator(f): 
     @wraps(f) 
     def _wrapper(*args, **kwargs): 

      try: 
       return f(*args, **kwargs) 
      except TaskPredicate: 
       raise # Do not handle TaskPredicate like Retry or Reject 
      except Exception as e: 
       print("Rejecting") 
       raise Reject(str(e), requeue=requeue) 
     return _wrapper 

    return _decorator 


@app.task() 
@onfailure_reject() 
def div(x, y): 
    return x/y 

편집 : 내가 셀러리 4.1.0에서 일부 호환성 문제가 발견 셀러리 (소문자)의 새로운 구성 스키마를 사용하는 코드를 업데이트했습니다.

관련 문제