2017-05-11 1 views
0

나는 내가 RabbitMQ 브로커와 두 개의 셀러리 소비자 (main1.pymain2.py) 같은 브로커에 연결을 모두 설치 서버가 있습니다. 두 번째 사용자 (main2.py에서셀러리 작업

app = Celery('tasks', broker=..., backend=...) 
app.conf.task_routes = (
    [ 
     ('tasks.beat', {'queue': 'print-queue'}), 
    ], 
) 
app.conf.beat_schedule = { 
    'beat-every-10-seconds': { 
     'task': 'tasks.beat', 
     'schedule': 10.0 
    }, 
} 

@app.task(name='tasks.beat', bind=True) 
def beat(self): 
    for i in range(10): 
     app.send_task("tasks.print", args=[i], queue="print-queue") 

    return None 

: 제 소비자에

( main1.py)는, I는 특정 큐에 여러 번에 다른 작업을 보내는 셀러리 비트 구현

,691 :

app = Celery('tasks', broker=..., backend=...) 
app.conf.task_routes = (
    [ 
     ('tasks.print', {'queue': 'print-queue'}), 
    ], 
) 

@app.task(name='tasks.print', bind=True) 
def print(self, name): 
    return name 

나는 두 셀러리 노동자를 시작할 때 :) 나는 작업이 이상했다 구현

consumer1: celery worker -A main1 -Q print-queue --beat 
consumer2: celery worker -A main2 -Q print-queue 

나는 이러한 오류를 얻을 : 두 번째 소비자

에 최초의 소비자

[ERROR/MainProcess] Received unregistered task of type 'tasks.beat' 

[ERROR/MainProcess] Received unregistered task of type 'tasks.print' 

은 가능한 모두 연결 다른 셀러리 응용 프로그램에서 작업을 분할하는 것입니다 같은 브로커?

미리 감사드립니다.

답변

1

여기에 무슨 일이 일어나고 있습니다. 두 노동자 A 또한 셀러리 비트를 (예를 들어 하나가 B입니다)을 실행하는 일 중 하나 B 있습니다.

  1. 셀리 비트가 task.beat을 큐에 제출합니다. 이 모든 작업의 ​​이름을 포함하여 일부 메타 데이터와 함께 토끼에 메시지를 대기열에 넣는 것입니다. 두 노동자의
  2. 하나의 메시지를 읽습니다. A와 B 모두 동일한 대기열을 듣고있어 읽을 수 있습니다.

    a. A가 메시지를 읽으면 A가 해당 작업을 정의하지 않기 때문에 tasks.beat이라는 작업을 찾으려고합니다.

    b. B는 메시지를 읽는 경우 성공적으로 작업이 tasks.beat라고 찾으려고하고 코드를 실행합니다 (그것은 그 작업을 가지고 있기 때문에). tasks.beattasks.print의 메타 데이터를 포함하는 새 메시지를 토끼에 넣습니다. 단지 하나와 B가 tasks.print를 정의하지만, 하나가 메시지를받을 수 있기 때문에

  3. 같은 문제가 다시 발생합니다.

실제로 셀러리는 오류 메시지를 던지기 위해 일부 검사를 수행하고있을 수 있지만 이것이 근원적 인 문제라는 것을 확실히 알고 있습니다.

즉, 대기열의 모든 작업자 (비트 포함)는 동일한 코드를 실행해야합니다.

+0

감사 @mbattifarano, 난 완전히 당신이 말을 이해했다. 나는 ** tasks.beat **와 ** tasks.print **를위한 두 개의 다른 대기열을 사용하여 문제를 해결했다. – alauri