2016-08-24 2 views
0

파일 확장자 또는 디렉토리 이름에 따라 작업을 호출하는 파일 트리 워킹 스크립트를 구현하고 있습니다.같은 이름의 셀러리 작업

내 도보 코드는 다음과 같습니다

from celery import Celery 
import os 


app = Celery('tasks', broker='amqp://[email protected]//') 
app.conf.update(
    CELERY_DEFAULT_EXCHANGE = 'path_walker', 
    CELERY_DEFAULT_EXCHANGE_TYPE = 'topic', 
) 

for root, dirs, files in os.walk("/"): 
    for filename in files: 
     name, ext = os.path.splitext(filename) 
     if ext: 
      app.send_task("process", args=(os.path.join(root, filename),), routing_key="file" + ext) 
    for dirname in dirs: 
     app.send_task("process", args=(dirname,), routing_key="directory." + dirname) 

당신은 내가 다른 routing_key들과 같은 작업 (process) 만 호출하고 있습니다 것을 볼 수 있습니다. 내 노동자에

는 내가 가진 :

from celery import Celery 
from kombu import Queue, Exchange 
import uuid 

app = Celery(broker='amqp://[email protected]//') 

file_queue = Queue(str(uuid.uuid4()), routing_key="file.py") 
dir_queue = Queue(str(uuid.uuid4()), routing_key="directory.tmp") 

app.conf.update(
    CELERY_DEFAULT_EXCHANGE="path_walker", 
    CELERY_DEFAULT_EXCHANGE_TYPE="topic", 
    CELERY_QUEUES=(
     dir_queue, 
     file_queue, 
    ), 
) 


@app.task(name="process", ignore_result=True, queue=dir_queue) 
def process_dir(dir_name): 
    print("Found a tmp dir: {}".format(dir_name)) 


@app.task(name="process", ignore_result=True, queue=file_queue) 
def process_file(file_name): 
    print("Found a python file: {}".format(file_name)) 

위의 코드는 다른 라우팅 키를 사용하여 두 개의 큐를 작성합니다. 두 작업은 개별 대기열에 바인딩되지만 트리 워커를 실행하면 두 번째 작업 (process_file 함수) 만 호출됩니다.

동일한 작업자가 다른 대기열에서 동일한 이름으로 작업을 수행 할 수 있습니까? 또는이 접근법을 고수하기 원할 경우 작업자 당 하나의 작업 만 필요로합니까? 그것은 하나의 셀러리 응용 프로그램에 같은 이름을 가진 두 가지 작업을 할 수는 없습니다

:

답변