2013-07-15 4 views
1

django-celery-3.0.17, celery-3.0.21 및 django-1.5.1을 사용하여 체인 실행을 모니터링하려고합니다. 해결책을 찾았지만 나에게 이상한 것처럼 보입니다. 가능한 경우보다 쉬운 해결책을 찾고 있습니다.셀러리 체인 모니터링 : 쉬운 방법

views.py

def runCod(request): 
    runFunTask = runFunctions.delay(shpId, codId, stepValues, bboxTuple); 
    getRunFunStatus.delay(runFunTask) 
    return render_to_response('tss/append_runCod.html', 
          {'runFunTask': runFunTask}, 
           context_instance=RequestContext(request)) 

def getProgressCod(request): 
    task = AsyncResult(taskId) 
    currStep = task.result['step'] 
    totSteps = task.result['total'] 

    if task.status == 'SUCCESS': 
     task.revoke() # Manually terminate the runFunctions task 

    response = dumps({'status':task.status,'currStep':currStep,'totSteps':totSteps}) 
    return HttpResponse(response, mimetype='application/json') 

tasks.py

@task() 
def runFunctions(shpId, codId, stepValues, bboxTuple): 
    # ... Code to define which functions to launch ... 

    stepsToLaunch = [fun1, fun2, fun3, fun4, fun5] 
    chainId = chain(stepsToLaunch).apply_async() 
    chainAsyncObjects = [node for node in reversed(list(nodes(chainId)))] 

    current_task.update_state(state="PROGRESS", meta={'step':1, 'total':numSteps}) 

    for t in range(10800): # The same max duration of a celery task 
     for i, step in enumerate(chainAsyncObjects): 
      currStep = i+1 
      if step.state == 'PENDING': 
       current_task.update_state(state="PROGRESS", meta={'step':currStep, 'total':numSteps}) 
       break 
      if step.state == 'SUCCESS': 
       if currStep == numSteps: 
        current_task.update_state(state="SUCCESS", meta={'step':currStep, 'total':numSteps}) 
        # This task will be terminated (revoked) by getProgressCod() 
      if step.state == 'FAILURE': 
       return 
    sleep(1) 

cods.js 이것은 무엇이다

function getProgressCod(taskId){ 
    var aoiStatus, allStatus, currStep, totSteps; 
    var interval = 2000; // Perform ajax call every tot milliseconds 

    var refreshId = setInterval(function(){ 
     $.ajax({ 
      type: 'get', 
      url: 'getProgressCod/', 
      data:{'taskId': taskId,}, 
      success: function(response){}, 
     }); 
    }, interval); 
} 

: 여기에 내 코드입니다 일어나는 :

  1. runCod() 비동기 작업을 시작합니다 runFunctions();
  2. runFunctions() 만들고 내가 하나의 사슬 하위 상태를보고 매 초마다 자신의 '진행'상태를 업데이트 runFunctions() 최종 루프와 하위의 사슬
  3. 를 시작합니다. (참조 12)
  4. 는 사용자가 getProgressCod()에 의해 통보, 무슨 일이 일어나고 있는지 알 수있는 자바 스크립트 함수, getProcessCod() 파이썬 함수에 아약스 요청이
  5. getProcessCod() 파이썬 기능이 runFunctions() 상태에서 보이는 2 초마다 만들고, 때입니다 'SUCCESS'를 실행하면 runFunctions() 실행이 취소 (종료)됩니다. getProcessCod() 얻을 것이기 때문에 나는 runFunctions()을 반환하는 경우 체인의 모든 하위 작업이 최종 루프의 내부에서 수행되는 경우 때문에

나는 다른 방법을 찾지 못했습니다, 나는 사용자에게 사용자의 'SUCCESS'상태를 알릴 수 없습니다 내가 getProgressCod() 평 기능을 삭제하고 runCod() 안에 if 문을 삽입 해결했다 한 task.status

답변

1

수행하는 None 객체입니다. 이런 식으로 runFunctions()을 사용하여 runCod()을 모니터 할 수 있으며 성공적으로 종료되면 결과를 얻으려면 5 초 정도 기다린 다음 반환 작업을 닫습니다.

views.py

def runCod(request): 
    taskId = request.GET['taskId'] 
    if taskId != '': # If the task is already running 
     task = AsyncResult(taskId) 
     currStep = task.result['step'] 
     totSteps = task.result['total'] 
     response = dumps({'status':task.status, 
          'currStep':currStep, 
          'totSteps':totSteps}) 
     return HttpResponse(response, mimetype='application/json') 
    else: # If the task must be started 
    runFunTask = runFunctions.delay(shpId, codId, stepValues, bboxTuple); 
    getRunFunStatus.delay(runFunTask) 
    return render_to_response('tss/append_runCod.html', 
          {'runFunTask': runFunTask}, 
           context_instance=RequestContext(request)) 

tasks.py

@task() 
def runFunctions(shpId, codId, stepValues, bboxTuple): 
    # ... Code to define which functions to launch ... 
    stepsToLaunch = [fun1, fun2, fun3, fun4, fun5] 
    chainId = chain(stepsToLaunch).apply_async() 
    chainAsyncObjects = [node for node in reversed(list(nodes(chainId)))] 

    current_task.update_state(state="PROGRESS", meta={'step':1, 'total':numSteps}) 

    for t in range(10800): # The same max duration of a celery task 
     for i, step in enumerate(chainAsyncObjects): 
      currStep = i+1 
      if step.state == 'PENDING': 
       current_task.update_state(state="PROGRESS", meta={'step':currStep, 'total':numSteps}) 
       break 
      if step.state == 'SUCCESS': 
       if currStep == numSteps: 
        current_task.update_state(state="SUCCESS", meta={'step':currStep, 'total':numSteps}) 
        sleep(5) # Wait before stop this task, in order for javascript to get the result! 
        return 

      if step.state == 'FAILURE': 
       return 
     sleep(1) 
: 내 유일하게 남아있는 의심이 대기 접근 방식이 올바른인지 아닌지 ... 가 여기 내 수정 된 코드라는 것이다

cods.js

function getProgressCod(taskId){ 
    var interval = 2000; // Perform ajax call every tot milliseconds 

    var refreshId = setInterval(function(){ 
     $.ajax({ 
      type: 'get', 
      url: 'runCod/', 
      data:{'taskId': taskId,}, 
      success: function(response){}, 
     }); 
    }, interval); 
} 
+1

'chainsAsyncObjects = [반전 된 노드의 노드 (list (nodes (chainId))]'줄에'nodes()'에 대한 코드가 있습니까? – Jon