2014-09-30 3 views
5

나는셀러리 작업자 : 모든 대기열에서 소비하는 방법?

  • 은하지 CELERY_QUEUES
  • 정의 CELERY_DEFAULT_QUEUE = 'default' 이 표에서와 같이 즉시 경로를 생성
  • 사용자 지정 라우터 클래스 (https://github.com/celery/celery/issues/150)을 (직접 형) 정의 CELERY_CREATE_MISSING_QUEUES = True
  • 을 설정해야 .

CELERY_CREATE_MISSING_QUEUES 때문에 사용자 지정 라우터가 반환 한 경로의 새 큐가 생성된다고 가정합니다. 지금은 실행 작업자 노드

, 나는 -Q 인수를 전달하지 않고이 문서에 부합하는 것 같다 '기본'큐에서만 소비 -

기본 그것으로 CELERY_QUEUES 설정에 정의 된 모든 대기열에서 소비됩니다 (지정되지 않은 경우 대기열은 기본적으로 셀체라는 ).

동적으로 작성된 것을 포함하여 모든 대기열에서 작업자 노드를 사용할 수있는 방법이 있습니까?

덕분에,

답변

3

작업자는 이러한 큐의 이름을 얻을 당신이 그들을 만들 때 어쩌면 그들을 저장하거나 rabbitmqctl list_queues에서 어쩌면 그들을 얻을 수있는 방법이 필요하므로, 이러한 자동 또는 동적으로 생성 된 큐에 대해 이야기 할 필요가 당신 경우 RabbitMQ를 브로커로 사용하고 예를 들어 신호 처리기를 추가하여 이러한 동적 대기열을 소비 할 작업자에 추가합니다.

#command all workers to consume from the 'dynamic_queue' queue 
app.control.add_consumer('dynamic_queue', reply=True) 

# command specific workers 
app.control.add_consumer('dynamic_queue', reply=True, destination=[[email protected]]) 

참조 : 당신은 항상 새로운 동적 큐가 생성 한 경우

from celery.signals import celeryd_after_setup 

@celeryd_after_setup.connect 
def add_dynamic_queue(sender, instance, **kwargs): 
    # get the dynamic queue, maybe stored somewhere 
    queue = 'dynamic_queue' 
    instance.app.amqp.queues.select_add(queue) 

, 당신은 또한 사용하여 런타임에 이러한 큐에서 소모 시작 노동자를 명령 할 수있다 : celeryd_after_setup 신호를 사용하여 예를 들어

Adding Consumers.

이 정보가 도움이되기를 바랍니다. 자세한 정보를 얻을 때 질문을 수정하겠습니다.

+0

감사합니다. 재미있었습니다. 예, 저는 RabbitMQ를 사용하며 항상 동적 큐를 가질 계획이었습니다. 일종의 와일드 카드 패턴을 사용하여 모든 대기열에서 작업자가 작업하도록 지시 할 수 있기를 바랍니다. 나는 당신의 제안을 지금 시도 할 것입니다. 질문 - 메시지 브로커가 유일한 공통점 인 작업자가 다른 서버에서 실행될 수 있다는 점을 감안할 때 app.control.add_consumer (...)는 정보를 근로자에게 어떻게 전달합니까? – ksrini

+0

@ksrini 예 와일드 카드 패턴을 사용할 수 있는지 찾기 위해 소스 코드를 검색하고 있었지만 소비 할 대기열을 설정하는 메소드는 쉼표로 구분 된 대기열 목록이 필요합니다. 그리고 제어 명령은 브로커 (RabbitMQ)를 사용하여 작업자에게 'celery.app.control' 모듈을 확인합니다. – Pierre

+0

add_consumer 제어 명령을 사용하여 제안 된 접근 방식이 효과적이었습니다! 감사!add_consumer가 reply = True 일 때 add_consumer가 약간의 시간 (약 1-2 초)이 걸릴 것으로 보이고 내가 결정한 라우터에서 작업을 수행 할 계획이기 때문에 worker가 이미 해당 대기열에서 소비하고 있지 않은 경우에만 add_consumer를 호출하고 싶습니다. 새로운 큐는 경로의 일부로. reply = False로 설정하면 더 빠릅니다. worker가 이미 대기열에서 소비하고있을지라도 add_consumer 호출에 문제가 있습니까? – ksrini

관련 문제