최신 버전의 아파치 에어 플로우를 사용합니다. LocalExecutor로 시작한이 모드에서 CeleryExecutor가 웹 UI 상태를 사용하기 위해 필요한 일부 상호 작용을 제외하고는 모든 것이 잘 작동했다. Redis로 Celery 실행 프로그램을 설치 및 구성하고 Redis를 브로커 URL 및 결과 백엔드로 구성했습니다.Apache Airflow Celery Redis DecodeError
작업이 예약 될 때까지하는 것이 다음과 같은 오류주는 시점에서, 처음에 작업 표시:
File "/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler
job.run()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run
self._execute()
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute
self._execute_helper(processor_manager)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper
self.executor.heartbeat()
File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat
self.sync()
File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync
state = async.state
File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state
return self._get_task_meta()['status']
File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta
meta = self._get_task_meta_for(task_id)
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for
return self.decode_result(meta)
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result
return self.meta_from_decoded(self.decode(payload))
File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode
accept=self.accept)
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads
return load(BytesIO(s))
kombu.exceptions.DecodeError: invalid load key, '{'.
는 피클 직렬화 오류가있는 것 같다,하지만 난을 추적하는 방법을 잘 모르겠어요 원인. 어떤 제안?
이 문제는 하위 태그 기능을 사용하는 워크 플로에 일관되게 영향을 미치며 그와 관련된 문제 일 수 있습니다.
참고 : 나는 또한 rabbitMQ를 사용하여 테스트했는데 거기에는 다른 문제가있었습니다. 클라이언트가 "피어에 의한 연결 재설정"을 표시하고 충돌합니다. RabbitMQ 로그는 "클라이언트가 예기치 않게 TCP 연결을 닫았습니다"를 보여줍니다.