2017-12-29 3 views
1

"실제로"(실제로 샐러리 그룹이 아닌) 작업 그룹의 상태와 함께 클라이언트에게보기에서 메시지를 보냈습니다. 문제는 : 이것은 실제로 모든 작업이 실제로 수행되는지 여부를 무시합니다. 콜백 (task.apply_async(link=))을 추가하려고 시도했지만 도움이되지 않았습니다.django-channels/celery : 작업 목록의 진행 상황을 추적하는 방법은 무엇입니까?

하는 작업 자체는 정말 시간이 많이 걸릴하지 않지만, 작업이 실제로 수행 된 때 정말 카운터를 증가 할 수 있도록하고 싶습니다 :

if 'selected' in request.GET: 
     selected_as_list = request.GET.getlist('selected') 
     print(selected_as_list) 
     searches = list(set([s.strip() for s in selected_as_list if s.strip()])) 
     task_group = [refresh_func.s(str(user_profile.id), search, dont_auto_add=True) for search in searches] 

     for i,task in enumerate(task_group): 
      task.apply_async() 
      Group(str(request.user.id)).send({"text": json.dumps({"tasks_completed": i+1, 
                    "task_id": "fb_import", 
                    "completed": True if i == len(task_group) -1 else False, 
"total": len(task_group)})}) 

그래서 내가 코드를 이동 보기 밖으로, 실제로 수행 할 작업을 호출하는 동일한 블록으로 이동합니다. 지금은 많은 매개 변수를 전달한다는 의미 였지만 초기 문제는 해결되었습니다. 그러나 또 다른 하나를 제시합니다 : 인덱스가 "1"인 작업은 인덱스가 "3"인 작업 후에 끝날 수 있습니다. 이것은 분명히 카운터를 잘못 업데이트합니다.

이 문제를 해결하기 위해 수행 할 수있는 작업은 무엇입니까?

답변

1

주기적으로 생성 된 작업의 상태를 확인하는 백그라운드 스레드를 생성하는 방법 (작업 ID를 알고 있으면 해당 상태를 얻을 수 있습니다)?

이 아마 때문에이 스레드 (안 셀러리 작업)에 장고 서버에서 실행되어야합니다

어디 django-channel이 활성화 : 당신이 작업에 Group(...).send를 호출하면, 아마 때문에 특별히 그것을 (액세스 할 수 없습니다 대개 샐러리 작업자가 별도의 프로세스/시스템에서 실행)

.GET보기를 구현할 때 태스크를 생성한다고 가정 해 보겠습니다. 어쩌면 작업 ID를 수집하여 (스폰 된 위치에서) 스레드에서 상태를 주기적으로 확인할 수 있습니다 (따라서 .GET 응답을 차단하지 않음). 예에서

def verify_task_ids(channel_group_id, task_ids): 
    previous_finished_task_ids = set() 
    finished_task_ids = set() 
    logger.info("Verifying %s task_ids", len(task_ids)) 
    while len(finished_task_ids) < len(task_ids): 
     finished_task_ids = set() 
     for task_id in task_ids: 
      if AsyncResult(task_id).ready(): 
       finished_task_ids.add(task_id) 
     if finished_task_ids != previous_finished_task_ids: 
      logger.info("%s new finished tasks", 
         len(finished_task_ids) - len(previous_finished_task_ids)) 
     previous_finished_task_ids = finished_task_ids 

의 : 스레드에 대한 verify_task_ids 대상 함수가 될 수있는이 같은

class Test(generic.TemplateView): 
    template_name = 'stack_092.html' 

    def get(self, request, *args, **kwargs): 
     logger.info("Yep") 
     task_group = [foo_task.s(i) for i in range(5)] 
     logger.info("Task signatures created: %s", task_group) 

     task_ids = [task.apply_async().task_id for task in task_group] 
     logger.info("Tasks launched") 
     th = threading.Thread(target=verify_task_ids, args=('request.user.id', task_ids)) 
     th.start() 
     logger.info("Thread started") 
     return super(Test, self).get(request, *args, **kwargs) 

을 그리고 뭔가 :

의 당신이 당신의 작업을 산란 뷰는 다음과 같습니다 가정 해 봅시다 channel_group_id 인수는 순수한 하드 코딩 된 문자열 "request.user.id"입니다. 귀하의 경우 그룹 ID이므로 서버에 로그인 한 실제 사용자의 request.user.id으로 대체해야합니다.

그리고 새로운 작업이 완료되면, 난 단지 로그 메시지 보여줄 것을 볼 수 있습니다 : 당신은 아마

if finished_task_ids != previous_finished_task_ids: 
    Group(
     str(channel_group_id) 
    ).send(
     { 
      "text": json.dumps({ 
       "tasks_completed": len(finished_task_ids), 
       "task_id": "fb_import", 
       "completed": len(finished_task_ids) == len(task_ids), 
      }) 
     } 
    ) 

내가 돈을 호출해야합니다 어디 대신 logger.info 기능의 여기

if finished_task_ids != previous_finished_task_ids: 
     logger.info("%s new finished tasks", 
        len(finished_task_ids) - len(previous_finished_task_ids)) 

입니다 (erm ... 아무것도, 오히려 ... 장고 채널에 대해) 많은 것을 알지 못한다. 그래서이 솔루션이 효과가 있을지 모르지만, 시도해 볼만한 가치가 있을지 모르겠다.

+0

고맙습니다. 나는 폴링을 피하려고 노력하고있다. 어쩌면 나는 당신의 대답의 일부를 채택 할 수 있고 어딘가에 작업이 실행 중일 때 어딘가에 for 루프를 점검 할 수있다. – zerohedge

+1

나는 상상했다. ** (적어도 저에게있어) 가장 중요한 문제는 셀러리 작업자가 웹 서버와 다른 서버에서 실행되는 경향이 있다는 것입니다. 그렇지 않은 경우 셀러 리 작업자는 별도의 프로세스로 실행되므로 프로세스는 다소 어려웠습니다.그래도 행운을 빈다. 혼자서 해결하면, 자신이 한 질문에 답을 추가 할 수 있을까? 나에게도 관심이있는 것입니다. – BorrajaX

+0

나는 그럭저럭 오늘 일찍 실제로 그것을 할 수 있었다. 그것은 매우 clunky하고 간신히 읽을 수있는 순간, 그리고 또한 조금 스팸 것 같습니다,하지만 그것은 "비동기"입니다. 내가 일단 리팩토링되면 대답을 게시 하겠지만 여기에 일반적인 아이디어가있다. 함수 시그니처를 수집하고, 도우미 함수에 목록으로 보내면 for 루프에서'task.freeze()'를 실행하고 각 태스크 ID를 추가한다. 그런 다음 모든 ID의 상태가 "SUCCESS"를 반환 할 때까지이 목록을 반복하는 비동기 작업을 시작합니다 (이것은 채널을 통해 메시지를 보냅니다). 그런 다음 task.apply_async()를 비슷한 for 루프에서 수행하십시오. – zerohedge

관련 문제