2017-09-19 2 views
2

최신 버전의 아파치 에어 플로우를 사용합니다. LocalExecutor로 시작한이 모드에서 CeleryExecutor가 웹 UI 상태를 사용하기 위해 필요한 일부 상호 작용을 제외하고는 모든 것이 잘 작동했다. Redis로 Celery 실행 프로그램을 설치 및 구성하고 Redis를 브로커 URL 및 결과 백엔드로 구성했습니다.Apache Airflow Celery Redis DecodeError

작업이 예약 될 때까지하는 것이 다음과 같은 오류주는 시점에서, 처음에 작업 표시

:

File "/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler 
    job.run() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run 
    self._execute() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute 
    self._execute_helper(processor_manager) 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper 
    self.executor.heartbeat() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat 
    self.sync() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync 
    state = async.state 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state 
    return self._get_task_meta()['status'] 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta 
    return self._maybe_set_cache(self.backend.get_task_meta(self.id)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta 
    meta = self._get_task_meta_for(task_id) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for 
    return self.decode_result(meta) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result 
    return self.meta_from_decoded(self.decode(payload)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode 
    accept=self.accept) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors 
    reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors 
    yield 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

는 피클 직렬화 오류가있는 것 같다,하지만 난을 추적하는 방법을 잘 모르겠어요 원인. 어떤 제안?

이 문제는 하위 태그 기능을 사용하는 워크 플로에 일관되게 영향을 미치며 그와 관련된 문제 일 수 있습니다.

참고 : 나는 또한 rabbitMQ를 사용하여 테스트했는데 거기에는 다른 문제가있었습니다. 클라이언트가 "피어에 의한 연결 재설정"을 표시하고 충돌합니다. RabbitMQ 로그는 "클라이언트가 예기치 않게 TCP 연결을 닫았습니다"를 보여줍니다.

답변

0

나는 우리의 스케줄러 로그에 동일한 역 추적을보고 한 후이 우연히 :

File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

셀러리는 '{'의심스러운 듯 시작 무언가를 unpickle하려고했다, 그래서 나는의 tcpdump에 걸렸다는 사실 트래픽과 웹 UI를 통해 작업을 촉발했다.

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2" 
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}" 

명확하게 JSON은 왜 셀러리를 unpickle 위해 노력하고있다 레디 스의 응답의 페이로드 : 결과 캡처이 교환 거의 정확히 같은 순간 위의 역 추적이 스케줄러 로그에 나타난 것을 포함? 우리는 Airflow 1.7에서 1.8로 이전하는 과정에 있으며, 롤아웃 중에는 v1.7을 실행하는 Airflow 작업자와 v1.8을 실행하는 Airflow 작업자가 있습니다. 작업자는 분리 된 작업 부하가있는 대기열에서 벗어나려고했지만 DAG 중 하나의 버그로 인해 Airflow 1.8에서 예약 한 TaskInstance를 Airflow 1.7을 통해 시작된 샐러리 작업자가 실행했습니다.

AIRFLOW-1038은 샐러리 작업 상태에 대한 serializer를 JSON (기본값)에서 pickle로 변경했기 때문에이 변경 전에 코드 버전을 실행중인 작업자는 JSON에서 결과를 serialize하고이를 포함하는 코드 버전을 실행하는 스케줄러 change는 unpickle로 결과를 비 직렬화하려고 시도하는데, 위의 오류가 발생합니다.

0

airflow.cfg에서 구성한 celery_result_backend의 종류를 확인하십시오. 그렇지 않은 경우 데이터베이스 백엔드 (mysql 등)로 전환 해보십시오.

ampq 백엔드 (셀러 3.1에서만 사용 가능), redis 및 rpc 백엔드에서 가끔 문제가 있음을 알 수 있습니다.

관련 문제