2017-02-21 1 views
2

내가 셀러리 작업의 가장 간단한 예 (case.py)이 있습니다제한 작업 번호 셀러리 노동자에 대기

import time 
import celery 


app = celery.Celery('case', broker='redis://localhost') 

@app.task 
def do_sth(): 
    time.sleep(5) 

을 내가 몇 가지 작업 인스턴스를 만들 :

>>> import case 
>>> tasks_list = [case.do_sth.delay() for i in range(4)] 

그럼 싶습니다 --concurrency=1 매개 변수가있는 동적 인 작업자 수가 있습니다. 필자의 경우 모든 작업자가 하나의 병렬 작업을 갖고 서버에 대기중인 작업이있을 때 작업자를 추가하는 것이 중요합니다. 일부 작업에 작업자가없고 새 작업자가 추가되면 새 작업자는 대기중인 작업 (보류중인 작업이지만 진행중인 작업이 아님)을 처리해야합니다.

[tasks] 
    . case.do_sth 

[2017-02-21 10:03:18,454: INFO/MainProcess] Connected to redis://localhost:6379// 
[2017-02-21 10:03:18,459: INFO/MainProcess] mingle: searching for neighbors 
[2017-02-21 10:03:19,471: INFO/MainProcess] mingle: all alone 
[2017-02-21 10:03:19,480: INFO/MainProcess] [email protected] ready. 
[2017-02-21 10:03:19,658: INFO/MainProcess] Received task: case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] 
[2017-02-21 10:03:19,660: INFO/MainProcess] Received task: case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] 
[2017-02-21 10:03:19,662: INFO/MainProcess] Received task: case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] 
[2017-02-21 10:03:19,664: INFO/MainProcess] Received task: case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] 
[2017-02-21 10:03:24,667: INFO/PoolWorker-1] Task case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] succeeded in 5.006301835s: None 
[2017-02-21 10:03:29,675: INFO/PoolWorker-1] Task case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] succeeded in 5.005384011s: None 
[2017-02-21 10:03:34,683: INFO/PoolWorker-1] Task case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] succeeded in 5.005373027s: None 
[2017-02-21 10:03:39,690: INFO/PoolWorker-1] Task case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] succeeded in 5.00531687s: None 

한편 나는 (동일한 명령을 사용하여) 다른 근로자를 시작하지만 아무것도하지 않았다 :

[tasks] 
    . case.do_sth 

[2017-02-21 10:03:20,321: INFO/MainProcess] Connected to redis://localhost:6379// 
[2017-02-21 10:03:20,326: INFO/MainProcess] mingle: searching for neighbors 
[2017-02-21 10:03:21,339: INFO/MainProcess] mingle: all alone 
[2017-02-21 10:03:21,352: INFO/MainProcess] [email protected] ready. 

당신이 날짜를 확인하는 경우를 나는 $ celery -A case worker --loglevel=info --concurrency=1를 호출 할 때하지만 난 (셀러리 로고 제외) 다음과 같은 로그가 두 번째 작업자가 첫 번째 작업 후 5 초 이내에 트리거되었음을 알 수 있습니다.

단일 작업자에서 대기중인 작업을 정확하게 하나의 작업으로 제한하는 방법 (일부 작업자 또는 셀러리 옵션)이 있습니까?

답변

1

찾고있는 것이 prefetch limit이라고 생각합니다. 주어진 시간에 작업자가 예약 할 수있는 작업 수를 효과적으로 줄입니다.

프리 페치 제한은 근로자가

자체에 대한 예약 할 수있는 작업 (메시지)의 수에 대한 제한입니다