2014-04-11 4 views
2

승인되지 않은 메시지에 대한 정보를 얻는 방법을 알아 내려고합니다. 저장 위치는 어디입니까? 셀러리를 가지고 놀면서 검사가 끝나면 메시지가 처리되고 상태를 따를 수 있습니다. 결과 백엔드가 있다고 가정하면 결과를 볼 수 있습니다. 그러나 그것이 블랙홀에 있다고 인정 될 때까지 지연을 적용하는 시점부터.Celery/RabbitMQ - No Acks - Unacknowledged 메시지 확인

  1. noAcks가 저장된 위치는 어디입니까?
  2. noAcks 목록의 "깊은"상태를 어떻게 확인할 수 있습니까? 즉, 얼마나 많은 사람들이 거기에 있고 목록에 내 작업이 어디에 있는지.

정확히 여기서 문제가되는 것은 아니지만 제가 여기서 일하고있는 것이 있습니다.

from celery.app import app_or_default 

app = app_or_default() 
inspect = app.control.inspect() 

# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved() 

# or "ACTIVE" jobs.. 
data = inspect.active() 

# or "REVOKED" jobs.. 
data = inspect.revoked() 

# or scheduled jobs.. (Assuming these are time based??) 
data = inspect.scheduled() 

# FILL ME IN FOR UNACK JOBS!! 
# data = inspect.?? 

# This will never work for tasks that aren't in one of the above buckets.. 
pprint.pprint(inspect.query_task([tasks])) 

정말 감사드립니다.

답변

2

를 조회 할 수 있습니다 설치하는 가지고있다 'acknowleged': False

from celery.app import app_or_default 

app = app_or_default() 
inspect = app.control.inspect() 

# those that have been sent to a worker and are thus reserved 
# from being sent to another worker, but may or may not be acknowledged as received by that worker 
data = inspect.reserved() 

{'celery.tasks': [{'acknowledged': False, 
       'args': '[]', 
       'delivery_info': {'exchange': 'tasks', 
           'priority': None, 
           'routing_key': 'celery'}, 
       'hostname': 'celery.tasks', 
       'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8', 
       'kwargs': '{}', 
       'name': 'globalapp.tasks.task_loop_tick', 
       'time_start': None, 
       'worker_pid': None}, 
       {'acknowledged': False, 
       'args': '[]', 
       'delivery_info': {'exchange': 'tasks', 
           'priority': None, 
           'routing_key': 'celery'}, 
       'hostname': 'celery.tasks', 
       'id': '09d5b726-269e-48d0-8b0e-86472d795906', 
       'kwargs': '{}', 
       'name': 'globalapp.tasks.task_loop_tick', 
       'time_start': None, 
       'worker_pid': None}, 
       {'acknowledged': False, 
       'args': '[]', 
       'delivery_info': {'exchange': 'tasks', 
           'priority': None, 
           'routing_key': 'celery'}, 
       'hostname': 'celery.tasks', 
       'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c', 
       'kwargs': '{}', 
       'name': 'globalapp.tasks.task_loop_tick', 
       'time_start': None, 
       'worker_pid': None}], 
'fastlane.tasks': [], 
'images.tasks': [], 
'mailer.tasks': []} 
+0

니스 -이 문서는 어디에 기록되어 있습니까? – rh0dium

+0

방금 ​​귀하의 코드에서 시작하여 해킹되지 않은 코드가 예약되어 있음을 확인했습니다. 어제 나는 셀러리 3.0.11을 설치하면 당구 3.x를 설치하게되는데, 실제로 셀러리 3.0에서는 작동하지 않는다는 것을 발견했다. 그래서 과제는 인정하지 않고 시작 시간도 없었습니다. 당구 <3 점. 이야기의 도덕 : 항상 당신의 의존성을 고정 시키거나 동결시킵니다. – felix

+0

이것은 아주 좋습니다. 고마워. 왜 내가 이보다 더 일찍 이것을 보거나 생각하지 못했는지 모르겠다. Celery 3.1x (현재)에서 작동하는지 확인할 수 있습니까? – rh0dium

2

샐러리를 리뷰 한 후 순수 셀러리를 사용하는 것이 불가능하다는 결론에 도달했습니다. 그러나 전체 프로세스를 느슨하게 추적 할 수 있습니다. 여기에 미확인 카운트를 조회하는 데 사용한 코드가 있습니다. 이것의 대부분은 셀러리의 유틸리티를 사용하여 수행 할 수 있습니다.

난 아직도 기본이 확인되지 않은 ID로 작업하지만 ..를 쿼리 드릴 수 없습니다

당신이있는 경우 RabbitMQ management plug-in 당신은 그들이 inspect.reserved()에서 그 작업을 수있는 API

data = {} 
    base_url = "http://localhost:55672" 
    url = base_url + "/api/queues/{}/".format(vhost) 
    req = requests.get(url, auth=(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)) 
    if req.status_code != 200: 
     log.error(req.text) 
    else: 
     request_data = req.json() 
     for queue in request_data: 
      # TODO if we know what queue the task is then we can nail this. 
      if queue.get('name') == "celery": 
       data['state'] = "Unknown" 
       if queue.get('messages'): 
        data['messages'] = queue.get('messages') 
        data['messages_ready'] = queue.get('messages_ready') 
        data['messages_unacknowledged'] = queue.get('messages_unacknowledged') 
       break 
    return data