2017-02-06 1 views
1

을 작업을 실행하는 방법은 다음을 수행해야 down 나는 그것을 시작하려고 노력한다. 그것은 벌써 일어난다. 나는 나의 Spark 일을 달리기 위해 앞으로 나아 간다.는 이전 아파치에 실패 할 경우 공기 흐름

하지만 Spark 클러스터가 작동 중인지 확인하기위한 작업을 생성하려고합니다 (간단한 Spark 작업을 실행하려고 시도했을 수 있음). 실패하면 "Start Spark cluster"작업을 시작합니다.

공기 흐름을 사용하고 있지만 이전 작업이 실패한 경우 작업을 트리거하는 방법을 찾지 못했습니다. 그 외에도 이전 작업을 확인해야 성공할 수 있으므로 Spark 작업으로 분기하여 "Start Spark Cluster"작업을 건너 뜁니다.

훌륭한 샘플을 제공 할 수 있다면. trigger_rule과 지사 연산자를 사용해 보았지만 지금까지 아무 것도 얻지 못했습니다. 웹에 대한 코드 예제가 너무 적기 때문일 수 있습니다.

감사합니다.

+0

all_failed하는 트리거 규칙을 설정하거나 정확하게 당신이 원하는 일을해야 one_failed. 하류에 2 개의 작업을 만들고 성공에 1을 설정하고 예를 들어 1을 실패로 설정하십시오. Btw. 클러스터를 시작한다고하면 클라우드 공급자를 사용한다고 가정합니다. 클러스터가 작동하는지 확인하는 더 좋은 방법이 있어야합니다. 클라우드 공급 업체 명령 줄 도구를 사용합니다. GCP를 사용하는 경우 데이터 캐시 클러스터를 시작하고 작업을 제출하는 기류 운영자조차 있습니다. –

+0

감사합니다 Gindele, 나는 너의 접근 방식을 시도해 보겠다. 클러스터와 관련하여 자체 서버에서 실행되는 독립 실행 형 스파크 클러스터가 있으며 클라우드는 없습니다. 어쨌든, 당신은 어떤 제안이 있다면 그들은 매우 환영합니다. –

답변

0

PythonBranhOperator를 사용할 수 있습니다. 다음은 참조 용 코드 샘플입니다.

VALID_TEST_OUTPUT = 'valid-output' 
INVALID_TEST_OUTPUT = 'invalid-test-output' 


test_job=PythonOperator(
    ## your task 
) 

def select_branch_after_test(**kwargs): 
    ''' 
    :return: returns the task_id to proceed with. 
    ''' 
    if com.getoutput('hadoop fs -ls /user/praveen/.test_out/ | grep file1'): 
     follow_previous_job = VALID_TEST_OUTPUT 
    else: 
     follow_previous_job = INVALID_TEST_OUTPUT 
    return follow_previous_job 


# branching based on successful test output 
branching = BranchPythonOperator(task_id=BRANCH_TEST_OUTPUT, 
           python_callable=select_branch_after_test, 
           dag=dag) 
branching.set_upstream(test_job) 

valid_test_output = PythonOperator(
    task_id=VALID_TEST_OUTPUT, 
    python_callable=lambda **kwargs: logging.info("test genereated valid output, proceeding with other steps"), 
    provide_context=True, 
    dag=dag) 
valid_test_output.set_upstream(branching) 

end_task = PythonOperator(
    task_id=END, 
    python_callable=pipeline_run_time, 
    provide_context=True, 
    trigger_rule='one_success', ## Important 
    dag=dag) 

invalid_test_output = PythonOperator(
    task_id=INVALID_TEST_OUTPUT, 
    python_callable=lambda **kwargs: logging.info("test did not create valid output, completeing branching dag to other path"), 
    provide_context=True, 
    dag=dag) 
invalid_test_output.set_upstream(branching) 

valid_test_output.set_downstream(end_task) 
invalid_test_output.set_downstream(end_task) 

`

관련 문제