셀러리를 처음 사용합니다. 셀러 리와 함께 분산 작업을 수행하려고합니다.셀러리 준비가 완료되면 대기열에 작업을 추가합니다.
taskset = TaskSet(
tasks.generate_sorts.subtask(args = (params, meta_data_dict))
for meta_data_dict in chunk_generator)
print "Dispatching tasks"
taskset_result = taskset.apply_async()
print "Waiting for results"
results = taskset_result.join_native()
print "Results:"
#Process the results.
지금 chunk_generator은 기본적으로 발전기입니다 :
@celery.task
def generate_sorts(params,meta_data_dict):
'''
This will generate some sorts .
'''
pass
그리고 나는 약간의 분산 처리의 일환으로 다음을 수행 오전 :
내가 task.py
파일 내 작업에서 하나의 작업을 말할 수 있습니다 패턴은 데이터베이스로 이동하여 일부 메타 데이터를 가져옵니다. 내 문제는 지금 이러한 작업이 결국 작업 대기열로 전송되기 전에 누적됩니다. 제 생성기는 작업이 실제로 대기열에 추가되기 전에 모든 메타 데이터를 가져 오기 위해 약 30 분 정도 걸립니다. 나는 그것이 수행하려는 의도 인 것임을 알고 있습니다 (TaskSet
). TaskSet
에 대한 대안을 찾고 있는데, 즉 아래에 해당하는 것을 분산 방식으로 수행 할 수 있습니까?
pool.imap_unordered(generate_sorts, chunk_generator)
위의 내용은 생성기가 결과를 얻 자마자 generate_sots를 수행합니다. 다른 말로하면 발전기가 첫 번째 작업을 생성하자마자 발전기에서 추가 할 수있는 TaskSet
에 대한 대체물이 있습니다. 대신 발전기가 모든 작업을 수행하기 전에 기다렸다가 일부 작업을 수행하기 시작할 수 있습니다.
이 확실히 나에게 도움이 될 것입니다. 매우 감사 – Senthil