2013-08-25 3 views
0

셀타와 Django를 사용하여 periodic_task를 만들고 X 초마다 실행하려고합니다. 작업에 몇 가지 하위 작업이 생성되어야합니다. 하나의 하위 작업 집합이 각 주 작업에 대해 생성됩니다.Django의 Celery periodic_task가 한 번 실행됩니다 (!)

이 내가 가진 것입니다 ..

@periodic_task(run_every=datetime.timedelta(seconds=2)) 
def initialize_new_jobs(): 
    for obj in Queue.objects.filter(status__in=['I', 'Q']): 
     obj = Queue.objects.get(id=obj.id) 
     if obj.status not in ['I', 'Q']: 
      continue 
     obj.status = 'A' 
     obj.save() 
     create_other_task.delay(obj.id) 

이 좀 작동하지만 잘못 느낀다. 루프의 시작 부분에있는 obj를 새로 고침하여 다른 실행중인 periodic_task가 동일한 Queue 객체에서 create_other_task를 발행하는지 확인하십시오.

이런 종류의 작업을 수행하는 더 좋은 방법이 있습니까? 기본적으로 create_other_task는 가능한 한 자주 수행하지만 상태 I 또는 Q가있는 큐 객체 당 하나만 사용합니다.

이것은 내 문제의 단축 버전이므로 작성시 create_other_task를 실행할 수 있다는 사실을 무시하십시오. 대신 당신은 트랜잭션을 사용할 수 주기적 작업 :

답변

1

을 실행하는 큐 개체 : 값을 읽으려고 할 때 다른 사용자가 차단됩니다 있도록

@periodic_task(run_every=datetime.timedelta(seconds=2)) 
@transaction.commit_on_success 
def initialize_new_jobs(): 
    for obj in Queue.objects.select_for_update().filter(status__in=['I', 'Q']): 
     obj.status = 'A' 
     obj.save() 
     create_other_task.delay(obj.id) 

select_for_update() 행에 배타적 잠금을 넣습니다. 잠금은 트랜잭션이 커밋되거나 롤백 된 후에 해제됩니다. Reference.

이렇게하면 objI 또는 Q의 상태이고 obj.save()이 올바르게 작동하는지 확인할 수 있습니다.

+0

감사합니다. 그것은 광고 된 것과 똑같이 작동했습니다! :) – xeor

관련 문제