2017-03-27 2 views
0

Redis를 브로커로 사용하여 셀러리 작업자와 실험하고 있습니다. 이것은 셀러리 노동자에 대한 내 테스트 코드Redis 명령으로 셀러리 작업 테스트

:

from celery import Celery 
app = Celery('tasks', broker='redis://xxxxx.net:6379/0') 

@app.task 
def nextexec(payload): 
    print(payload) 

redis-cli을 사용하여, 나는 (셀러리에 의해 자동으로 생성)을 celery 큐에 값

RPUSH celery somekey 'somevalue' 
를 삽입하려면 다음 명령을 실행하고

하지만 쿼리를 실행할 때 작업자가 체계적으로 중단되면 Unrecoverable error: JSONDecodeError이 표시됩니다. 디코드 할 JSON 문자열 대신 None을받는 것으로 보입니다.

어떤 Redis 쿼리를 실행해야합니까? 또는 (아직 간단한) 작업자 스크립트에 어떤 변경 사항이 필요합니까?

답변

2

셀러리는 클라이언트와 작업자간에 데이터를 전송하기 위해 serializers을 사용합니다. 모든 메시지는 직렬화되어야하며이를 인코딩하는 데 사용 된 직렬화 방법을 설명하는 content_type 헤더가 있습니다.

다음은 json을 사용하여 일련 화되는 샘플 메시지입니다.

{'body': 'W1sxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d', 
'content-encoding': 'utf-8', 
'content-type': 'application/json', 
'headers': {'argsrepr': '(1,)', 
    'eta': None, 
    'expires': None, 
    'group': None, 
    'id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc', 
    'kwargsrepr': '{}', 
    'lang': 'py', 
    'origin': '[email protected]', 
    'parent_id': None, 
    'retries': 0, 
    'root_id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc', 
    'task': 't.wait', 
    'timelimit': [None, None]}, 
'properties': {'body_encoding': 'base64', 
    'correlation_id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc', 
    'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 
    'delivery_mode': 2, 
    'delivery_tag': '0177eb65-344e-4b1c-ab5f-8e2f5d75b8d3', 
    'priority': 0, 
    'reply_to': 'a5c611b8-18b3-3bbb-b598-c3757f06c4fd'}} 

셀러리 작업자는 지정된 형식으로 메시지를 받아야합니다. 중개인 (redis)에게 어떤 가치를 부여하고 셀러리가 그것을 실행할 것을 기대할 수는 없습니다.

작업을 큐에 저장하려면 python을 사용하십시오.

from mymodule import nextexec 
payload ='some payload' 
nextexec.delay(payload) 
+0

답변 주셔서 감사합니다.이 정보는 내가 찾고있는 정보 였고 셀러리 문서 및 자습서를 읽는 것이 명확하지 않았습니다. – Overdrivr