2014-09-26 5 views
1

동시에 여러 작업을 수행 할 수 있지만 일단 모든 것이 준비되면 최종 작업을 실행하고 싶습니다. 나는 다음과 같은 코드를 사용하고 있습니다 :모든 작업이 완료되기 전에 셀러리 코드가 실행됩니다.

chunk_tasks = [] 
for index, chunk in enumerate(chunks): 
    chunk_tasks.append(import_chunk.s(meta.pk)) 

g = group(chunk_tasks) 
chord(g)(import_completed.s(meta.pk, max_lines=max_lines)) 

을이 모든 작업이 완료되기 전에 import_completed이 실행되는 것 같습니다 그러나. 또한 import_chunk 작업은 다음과 같습니다.

@task(bind=True, ignore_result=IGNORE_RESULTS) 
def import_chunk(self, meta_pk): 
    try: 
     # do some stuff 
    except Exception, e: 
     if self.max_retries == self.request.retries: 
      logger.exception('Unexpected error in import_chunk') 
     raise self.retry(countdown=60, max_retries=3) 

그래서 내가 뭘 잘못하고있는 것입니까?

답변

0

코드는 그룹의 모든 작업이 완료된 후에 만 ​​실행되는 작업입니다. 따라서 은 동기화를 위해 헤더에 작업 상태이 필요합니다.

ignore_resulttask으로 설정하면 작업자 이 작업 상태을 저장하지 않고이 작업의 값을 반환하지 않습니다.

작업을 재 시도하거나 워크 플로에 따라 예외를 던지거나 오작동 할 수 있습니다.

그래서, chord(add.s(i, i) for i in range(10))(tsum.s()).get() 완벽하게 유효하고 CASE 1에 대한 결과를 제공하지만 CASE 2에 대한 몇 가지 문제

CASE 1 제공 : CASE 2

@app.task 
def add(x, y): 
    return x + y 

@app.task 
def tsum(numbers): 
    return sum(numbers) 

을 :

@app.task(ignore_result=True) 
def add(x, y): 
    return x + y 

@app.task(ignore_result=True) 
def tsum(numbers): 
    return sum(numbers) 

그래서 ignore_result을 변경하거나 wo를 변경해야합니다. 당신의 업무의 흐름. 문서에서

:

당신은 가능한 한 코드를 사용하지 않아야합니다. 그럼에도 불구하고 화음은 많은 병렬 알고리즘에서 동기화가 필요한 단계이기 때문에 도구 상자에있는 강력한 기본 요소입니다.

+0

예, 이미 ignore_result로 문제가 발생하여 False로 설정되었습니다. –

관련 문제