2013-03-05 5 views
2

셀러와 분산 작업 실행 시스템을 구축하려고합니다.여러 작업자가 하나의 시스템에 여러 작업 수행

내가 단일 시스템 (로컬 호스트), 가산 작업 add과 뺄셈 작업 sub에 대한 다른에 대한 어느 하나에, 다음 sevral 추가 작업을 킥오프 add.delay()을 사용하여에이 명 근로자를 시작, 빼기에 오류가있어 노동자의 터미널 :이 테스트에서

[2013-03-05 15:51:18,898: ERROR/MainProcess] Received unregistered task of type 'add_tasks.add'.

, 나는이 또한 작업을 시작했다 : 하나는 또한 노동자에 의해 체포되어 위의 오류를 발생시킨 뺄셈 노동자에 걸려 다른 동안. 두 번째 추가 작업이 빼기 작업자에 의해 잡히지 않도록 구성을 어떻게 변경할 수 있습니까? 감사.

add_tasks.py:

celery = Celery('add_tasks', backend='amqp', broker='amqp://[email protected]//') 

@celery.task 
def add(x, y): 
    sleep(20) 
    return x + y 

sub_tasks.py:

celery = Celery('sub_tasks', backend='amqp', broker='amqp://[email protected]//') 

@celery.task 
def sub(x, y): 
    sleep(10) 
    return x - y 

내가 로컬 호스트 머신의 두 단자에 celery -A add_tasks worker --loglevel=info -n worker1celery -A sub_tasks worker --loglevel=info -n worker2에 의해 노동자를 출시 :

여기에 코드입니다.

답변

4

마지막으로 나는 ROUTER 기능이 내 문제를 해결할 수 있음을 알았습니다. 나는 내 해결책을 여기에두고 같은 문제가있는 사람들에게 유용 할 것이라고 희망한다.

작업자를 시작할 때 -Q queue 옵션을 사용하여 작업을 수락하는 작업자를 queue으로 제한 할 수 있습니다. 내 상황에서는 celery -A add_tasks worker --loglevel=info -n worker1 -Q addition을 사용했습니다.

한편 새 작업을 시작할 때는 대기열 인수를 명시해야합니다 (예 : add.apply_async(queue='addition',priority=0,args=[1,4])sub.apply_async(queue='subtraction',priority=0,args=[1,4])). 그러면 빼기 작업자가 추가 작업을 수락하지 않습니다.

관련 문제