2012-04-06 1 views
3

Celery에서 쿼리에서 가져온 각 항목에 대해 하나의 하위 작업을 실행하는 주 작업을 실행하고 있습니다. 하위 작업은 병렬로 실행되어야합니다. UI에는 전체적으로 수행되는 하위 작업의 수를 보여주는 진행률 막대가 있습니다. 진행 상황 표시 줄에 정보를 제공하기 위해 주 작업 상태를 업데이트하고 있습니다. 내 문제는 더 이상 자신의 상태를 업데이트 할 수 없도록 모든 하위 작업을 브로커에 전달한 직후 주 작업이 종료되었다는 것입니다. 주 작업은 모든 하위 작업이 완료 될 때까지 기다릴 수 있기를 바랍니다. 가능한가? 다른 솔루션? 여기에 내 의사 코드 (실제 코드는 전역을 사용하지 마십시오 ;-)).샐러리에서는 모든 하위 작업이 완료 될 때까지 주 작업 상태를 업데이트하는 방법을 알고 있습니까?

total = 0 
done = 0 

@task(ignore_result=True) 
def copy_media(path): 
    global total, done 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    documents = Document.objects.all() 
    total = documents.count() 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    for document in documents: 
     process_doc.delay(document, path, copy_media) 

@task(ignore_result=True) 
def process_doc(document, path, copy_media): 
    global total, done 
    # Do some stuff 
    done += 1 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 

답변

0

나는 TaskSet을 사용하는 방법을 찾았습니다. 그러나 하위 작업의 결과를 무시할 수 없기 때문에 완전히 만족하지는 않습니다. 나는 process_doc 작업 results.ready()에 대한 결과를 무시하면 항상 여기 등 False, results.completed_count() 항상 0을 반환 반환 코드는 다음과 같습니다

@task(ignore_result=True) 
def copy_media(path): 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    documents = Document.objects.all() 
    total = documents.count() 
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 
    job = TaskSet(tasks=[process_doc.subtask((document, path)) 
         for document in documents]) 
    results = job.apply_async() 
    doc_name = '' 
    while not results.ready(): 
     done = results.completed_count() 
     if done: 
      last = done - 1 
      for idx in xrange(last, -1, -1): 
       if results[idx].ready(): 
        doc_name = results[idx].result 
        break 
     copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name}) 
     time.sleep(0.25) 

@task() 
def process_doc(document, path): 
    # Do some stuff 
    return document 
+1

"다른 작업 결과를 기다리는 작업을 수행하는 것은 실제로 비효율적이며 작업자 풀이 모두 소모되면 교착 상태가 발생할 수 있습니다. 대신 콜백을 사용하여 디자인을 비동기로 만듭니다. " http://celery.readthedocs.org/en/latest/userguide/tasks.html#task-synchronous-subtasks – antoinet

+0

내 주요 작업'copy_media'는 다른 작업의 결과를 기다리지 않습니다. 계속해서 상태를 업데이트하여 얼마나 많은 하위 작업이 완료되었는지 보여줍니다. 하위 작업은 병렬로 실행되므로 콜백 옵션이 없습니다. 그것의 꼭대기에 나는'copy_media'가 한 번에 하나씩 만 실행될 수 있기 때문에 교착 상태를 가질 수 없기 때문에 1 명의 작업자를 차단하고 있습니다. – Etienne

+0

다른 작업의 결과를 효과적으로 기다리고 있습니다. results.ready()를 호출 중이며 다른 테스트 작업도 있습니다. 작업자가 고갈되면 교착 상태가 발생하여 하위 작업이 실행되지 않으며 주 작업이 완료되지 않습니다. – rsalmei

0

당신은 작업을 완료의 저장 번호에 memcached를 백업 캐싱을 사용할 수 있습니다. django 캐시 API에 심지어 cache.inrc의 원자 단위 증분을 사용하면 count의 동시 업데이트로 문제가 해결되지 않습니다.

또한 모든 하위 작업이 완료 될 때까지 실행중인 기본 작업을 유지하는 것은 기본적으로 셀러리 작업자 중 하나를 오랫동안 차단하므로 나쁜 생각입니다. 셀러리가 하나의 작업자 프로세스로 실행되는 경우 이는 결코 끝나지 않는 잠금이됩니다.

+0

Django 캐시를 사용하여 카운트를 저장하라는 제안에 대해 셀러리에 이미있는 것을 다시 구현해야한다는 것이 이상하다는 것을 알았습니다. 국가 시스템. 그리고 내 요구는 단지 계산을 유지하는 것보다 더 복잡합니다. 내 대답에서 볼 수 있듯이, 나는 또한 문서 이름 (그리고 실제 프로젝트에서 더 많은 것들)을 전달하고있다. 내 주요 임무가 샐러리 노동자를 막고 있다는 사실 때문에 문제는 알 수 있지만 내 경우에는 문제가되지 않습니다. 나는이 주요 작업을위한 전용 셀러리 데몬을 가지고 있으며 많은 작업자와 하위 작업을 수행하며 주요 작업이 동시에 실행되는 것을 막습니다. – Etienne

0

실행중인 셀러리의 버전을 알 수는 없지만 Group 하위 타스크 (3.0의 새로운 기능)를 살펴볼 수 있습니다.

관련 문제