2014-10-08 3 views
0

셀러리를 처음 사용합니다. 셀러 리와 함께 분산 작업을 수행하려고합니다.셀러리 준비가 완료되면 대기열에 작업을 추가합니다.

taskset = TaskSet(
    tasks.generate_sorts.subtask(args = (params, meta_data_dict)) 
    for meta_data_dict in chunk_generator) 

print "Dispatching tasks" 
taskset_result = taskset.apply_async() 

print "Waiting for results" 
results = taskset_result.join_native() 
print "Results:" 
#Process the results. 

지금 chunk_generator은 기본적으로 발전기입니다 :

@celery.task 
def generate_sorts(params,meta_data_dict): 
    ''' 
    This will generate some sorts . 
    ''' 
    pass 

그리고 나는 약간의 분산 처리의 일환으로 다음을 수행 오전 :

내가 task.py 파일 내 작업에서 하나의 작업을 말할 수 있습니다 패턴은 데이터베이스로 이동하여 일부 메타 데이터를 가져옵니다. 내 문제는 지금 이러한 작업이 결국 작업 대기열로 전송되기 전에 누적됩니다. 제 생성기는 작업이 실제로 대기열에 추가되기 전에 모든 메타 데이터를 가져 오기 위해 약 30 분 정도 걸립니다. 나는 그것이 수행하려는 의도 인 것임을 알고 있습니다 (TaskSet). TaskSet에 대한 대안을 찾고 있는데, 즉 아래에 해당하는 것을 분산 방식으로 수행 할 수 있습니까?

pool.imap_unordered(generate_sorts, chunk_generator) 

위의 내용은 생성기가 결과를 얻 자마자 generate_sots를 수행합니다. 다른 말로하면 발전기가 첫 번째 작업을 생성하자마자 발전기에서 추가 할 수있는 TaskSet에 대한 대체물이 있습니다. 대신 발전기가 모든 작업을 수행하기 전에 기다렸다가 일부 작업을 수행하기 시작할 수 있습니다.

답변

1

당신은 ResultSetAsyncResult의 결과 인스턴스를 즉시 시작하고 추가 시도해야합니다 :

from celery.result import ResultSet 

result_set = ResultSet() 
for meta_data_dict in chunk_generator: 
    # Add the task to the queue immediately 
    result = task.generate_sorts.delay(params, meta_data_dict) 
    result_set.add(result) 

print "Waiting for results" 
results = result_set.join_native() 
print "Results:" 
# Process the results 
+0

이 확실히 나에게 도움이 될 것입니다. 매우 감사 – Senthil

관련 문제