PythonBranhOperator를 사용할 수 있습니다. 다음은 참조 용 코드 샘플입니다.
VALID_TEST_OUTPUT = 'valid-output'
INVALID_TEST_OUTPUT = 'invalid-test-output'
test_job=PythonOperator(
## your task
)
def select_branch_after_test(**kwargs):
'''
:return: returns the task_id to proceed with.
'''
if com.getoutput('hadoop fs -ls /user/praveen/.test_out/ | grep file1'):
follow_previous_job = VALID_TEST_OUTPUT
else:
follow_previous_job = INVALID_TEST_OUTPUT
return follow_previous_job
# branching based on successful test output
branching = BranchPythonOperator(task_id=BRANCH_TEST_OUTPUT,
python_callable=select_branch_after_test,
dag=dag)
branching.set_upstream(test_job)
valid_test_output = PythonOperator(
task_id=VALID_TEST_OUTPUT,
python_callable=lambda **kwargs: logging.info("test genereated valid output, proceeding with other steps"),
provide_context=True,
dag=dag)
valid_test_output.set_upstream(branching)
end_task = PythonOperator(
task_id=END,
python_callable=pipeline_run_time,
provide_context=True,
trigger_rule='one_success', ## Important
dag=dag)
invalid_test_output = PythonOperator(
task_id=INVALID_TEST_OUTPUT,
python_callable=lambda **kwargs: logging.info("test did not create valid output, completeing branching dag to other path"),
provide_context=True,
dag=dag)
invalid_test_output.set_upstream(branching)
valid_test_output.set_downstream(end_task)
invalid_test_output.set_downstream(end_task)
`
all_failed하는 트리거 규칙을 설정하거나 정확하게 당신이 원하는 일을해야 one_failed. 하류에 2 개의 작업을 만들고 성공에 1을 설정하고 예를 들어 1을 실패로 설정하십시오. Btw. 클러스터를 시작한다고하면 클라우드 공급자를 사용한다고 가정합니다. 클러스터가 작동하는지 확인하는 더 좋은 방법이 있어야합니다. 클라우드 공급 업체 명령 줄 도구를 사용합니다. GCP를 사용하는 경우 데이터 캐시 클러스터를 시작하고 작업을 제출하는 기류 운영자조차 있습니다. –
감사합니다 Gindele, 나는 너의 접근 방식을 시도해 보겠다. 클러스터와 관련하여 자체 서버에서 실행되는 독립 실행 형 스파크 클러스터가 있으며 클라우드는 없습니다. 어쨌든, 당신은 어떤 제안이 있다면 그들은 매우 환영합니다. –