파일 확장자 또는 디렉토리 이름에 따라 작업을 호출하는 파일 트리 워킹 스크립트를 구현하고 있습니다.같은 이름의 셀러리 작업
내 도보 코드는 다음과 같습니다
from celery import Celery
import os
app = Celery('tasks', broker='amqp://[email protected]//')
app.conf.update(
CELERY_DEFAULT_EXCHANGE = 'path_walker',
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic',
)
for root, dirs, files in os.walk("/"):
for filename in files:
name, ext = os.path.splitext(filename)
if ext:
app.send_task("process", args=(os.path.join(root, filename),), routing_key="file" + ext)
for dirname in dirs:
app.send_task("process", args=(dirname,), routing_key="directory." + dirname)
당신은 내가 다른 routing_key
들과 같은 작업 (process
) 만 호출하고 있습니다 것을 볼 수 있습니다. 내 노동자에
는 내가 가진 :
는from celery import Celery
from kombu import Queue, Exchange
import uuid
app = Celery(broker='amqp://[email protected]//')
file_queue = Queue(str(uuid.uuid4()), routing_key="file.py")
dir_queue = Queue(str(uuid.uuid4()), routing_key="directory.tmp")
app.conf.update(
CELERY_DEFAULT_EXCHANGE="path_walker",
CELERY_DEFAULT_EXCHANGE_TYPE="topic",
CELERY_QUEUES=(
dir_queue,
file_queue,
),
)
@app.task(name="process", ignore_result=True, queue=dir_queue)
def process_dir(dir_name):
print("Found a tmp dir: {}".format(dir_name))
@app.task(name="process", ignore_result=True, queue=file_queue)
def process_file(file_name):
print("Found a python file: {}".format(file_name))
위의 코드는 다른 라우팅 키를 사용하여 두 개의 큐를 작성합니다. 두 작업은 개별 대기열에 바인딩되지만 트리 워커를 실행하면 두 번째 작업 (process_file
함수) 만 호출됩니다.
동일한 작업자가 다른 대기열에서 동일한 이름으로 작업을 수행 할 수 있습니까? 또는이 접근법을 고수하기 원할 경우 작업자 당 하나의 작업 만 필요로합니까? 그것은 하나의 셀러리 응용 프로그램에 같은 이름을 가진 두 가지 작업을 할 수는 없습니다
: