저는 셀러 리에 새로 온 사람이고이 태스크 큐를 프로젝트에 통합하려고 시도하지만 셀러 리가 실패한 태스크를 처리하는 방법을 아직 알지 못합니다 모든 사람들을 amqp 데드 레터 큐에 보관하고 싶습니다.셀러리 : 데드 - 레터 큐에 실패한 태스크를 어떻게 라우팅 할 수 있습니까?
문서 here에 따르면 acks_late가 활성화 된 작업에서 Reject (거부)를 선택하면 메시지를 확인하는 것과 동일한 효과가 생성 된 다음 데드 - 레터 대기열에 대한 몇 가지 단어가있는 것으로 보입니다.
그래서 나는 나의 다시마 객체CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct')
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE,
routing_key='celery-dlq')
DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME,
'x-dead-letter-routing-key': 'celery-dlq'}
CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME,
arguments=DEAD_LETTER_CELERY_OPTIONS,
type='direct')
CELERY_QUEUE = Queue(CELERY_QUEUE_NAME,
exchange=CELERY_EXCHANGE,
routing_key='celery-q')
내가 실행하고있어 작업처럼되어 찾고있는
celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],
CELERY_TASK_SERIALIZER='json',
CELERY_QUEUES=[CELERY_QUEUE,
CELERY_DLX_QUEUE],
CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME,
CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE
)
내 셀러리 설정에 사용자 정의 기본 큐를 추가 :
class HookTask(Task):
acks_late = True
def run(self, ctx, data):
logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self))
self.hook_process(ctx, data)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error('task_id %s failed, message: %s', task_id, exc.message)
def hook_process(self, t_ctx, body):
# Build context
ctx = TaskContext(self.request, t_ctx)
logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id)
raise Reject('no_reason', requeue=False)
Reject 예외를 제기 할 때 결과가없는 약간의 테스트를 수행했습니다.
이제 Task.on_failure를 재정 의하여 데드 - 레터 큐에 실패한 작업 경로를 강제로 적용하는 것이 좋은지 궁금합니다. 나는 이것이 효과가있을 것이라고 생각하지만이 해결책은 너무 깨끗하지 않다. 왜냐하면 나는 붉은 셀러리가이 모든 것을 혼자해야만하기 때문이다.
도움 주셔서 감사합니다.
아무도 대답하지 못한 슬픈. 이것에 대한 해결책을 찾았습니까? @onizukaek –