2012-03-06 2 views
10

나는 django와 rabbitmq에서 샐러리를 사용하여 메시지 큐를 만들었습니다. 나는 또한 다른 기계에서 기인 한 노동자가있다.셀러리 - 작업 수행 기능 호출

def processtask(request, name): 
    args = ["ls", "-l"] 
    MyTask.delay(args) 
    return HttpResponse("Task set to execute.") 

내 작업은 다음과 같이 구성되어 있습니다 : 장고보기에서이 같은 프로세스를 시작하고

class MyTask(Task): 
    def run(self, args): 
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    (out, err) = p.communicate() 
    return out 

내 질문은 이제 브로커 (내 장고 프로젝트) 지금을받을 수있는 방법입니다 작업자가 자신의 컴퓨터에서 실행 한 "ls -l"명령의 출력. 가장 좋은 방법은 작업자가 실행 된 명령의 출력을 보낼 준비가되었을 때마다 브로커의 함수를 호출하는 것입니다.

작업자의 출력을 비동기 적으로 받고 출력으로 웹 페이지를 업데이트하고 싶습니다.하지만 다른 시간입니다. 당분간 저는 근로자로부터 결과물을 받고 싶습니다.

업데이트

지금은 작업이 수행되는 웹 응용 프로그램 알리는 작업의 끝에서 트리거되는 HTTP GET 요청 추가 한 - 나는 또한 HTTP GET에 TASK_ID을 보내고있다을 . 는 HTTP GET 방법은 AsyncResult를 생성하고 결과를 얻을 수있는 장고보기를 호출하지만 문제는 result.get()를 호출 할 때 나는 다음과 같은 오류 얻을 :

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration. 
    "Polling results with transaction isolation level" 

어떤 아이디어 왜? 나는 AMQP와 함께 rabbitmq를 사용하고 있기 때문에 데이터베이스를 사용하지 않습니다.

업데이트.

작고 큰 반환 값을위한 최상의 옵션처럼 보이는 세 번째 옵션을 사용하고 싶습니다. 내 전체 작업은 다음과 같습니다 : 작업의 실행() 함수는 이미 값을 반환 이후

class MyTask(Task): 
    def __call__(self, *args, **kwargs): 
    return self.run(*args, **kwargs) 

    def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    if self.webhost is not None: 
     conn = httplib.HTTPConnection(self.webhost, self.webport) 
     conn.request("HEAD", "/vuln/task/output/"+task_id) 

    def run(self, args, webhost=None, webport=None): 
    self.webhost = webhost 
    self.webport = webport 
    r = "This is a basic result string used for code clarity" 
    return r 

그래서 나는 또한 나의 작업에 잠금을 해제해야 after_return 기능을 무시했습니다. HEAD 요청에서 기본적으로 task_id에 AsyncResult를 호출하는 django 함수를 호출합니다.이 함수는 작업 결과를 제공해야합니다. 필자는 테스트 목적으로 만 임의의 테스트 결과를 사용했다.

위의 코드가 작동하지 않는 이유를 알고 싶습니다. on_success를 사용할 수는 있지만 차이가 있다고 생각하지 않습니다. 그렇지 않습니까?

+0

데이터베이스의 명령 출력을 저장할 수 있습니까? – jpic

+0

안녕하세요, 아니요. 직원들이 브로커의 데이터베이스에 액세스 할 수 없으며 액세스 권한을 부여하지 않기 때문입니다. 결과를 돌려 보내서 브로커에서 처리해야합니다. – eleanor

+1

HTTP API를 만들어 결과를 다시 보낼 수도 있습니다. 장고에서 꽤 쉽게 할 수있는 몇 가지 방법이 있습니다. – jpic

답변

14

당신은 다음과 같은 찾을 수 here 보면 :

장고 - 셀러리는 기본적으로 통신 버스로 사용되는 모든 작업/결과, 토끼 MQ 추적하기 위해 MySQL을 사용합니다.

실제로 일어나고있는 일은 작업이 아직 실행 중일 때 작업자의 ASyncResult을 가져 오는 것입니다 (작업이 서버에 대한 HTTP 요청을 호출했으며 아직 반환하지 않았으므로 db 잠금 세션은 작업자는 여전히 활성 상태이고 결과 행은 여전히 ​​잠겨 있습니다. Django는 작업 결과 (상태와 실행 함수의 실제 반환 값)를 읽으려고하면 행이 잠겨있는 것을 확인하고 경고를 표시합니다.

이 문제를 해결하는 방법에 대한 갈 수있는 몇 가지 방법이 있습니다 :

  1. 결과를 얻을 그리고 그것은 당신의 처리 작업에 체인 또 다른 셀러리 작업을 설정가. 그런 식으로 원래 작업이 끝나고 DB에 잠금을 해제하고 새 작업을 획득하고 장고에서 결과를 읽고 필요한 작업을 수행합니다. 이것에 대한 셀러리 문서를보세요.

  2. 전혀 신경 쓰지 말고 DB를 통해 가져 오기를 시도하는 대신 페이로드로 첨부 된 전체 처리 결과로 간단하게 장고에 POST를 수행하십시오.

  3. 작업 클래스에서 on_success를 덮어 쓰고 Django에 대한 알림 요청을 POST하면 db 테이블에서 잠금이 해제되어야합니다.

실행 방법 (회수 가능)의 반환시 모든 처리 결과를 저장할 필요가 있다는 점에 유의하십시오. 결과가 얼마나 클지는 언급하지 않았으므로 위의 시나리오 2를 실제로 수행하는 것이 합리적 일 수 있습니다 (이것이 내가하는 일입니다). 또는 # 3과 함께 갈 것입니다. 또한 작업에서 on_failure 메서드를 처리하는 것을 잊지 마십시오.

+0

의견을 보내 주셔서 감사합니다. 추가 질문을하기 위해 내 대답을 업데이트했습니다. 대답을 수락하기 전에 대답해야합니다. 이것은 정말 좋은 것입니다. – eleanor