2017-12-16 2 views
0

안녕하세요 Airflow입니다. 나는 아래와 같이 txt 파일의 목록을 저장하는 간단한 코드를 작성 : 나는 웹 서버 인터페이스를 사용할 때공기 흐름에서 출력을 참조하십시오.

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 
import datetime 

DAG = DAG(
    dag_id='example_dag', 
    start_date=datetime.datetime.now(), 
    schedule_interval='@once' 
) 

def push_function(**kwargs): 
    ls = ['a', 'b', 'c'] 
    return ls 

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function, 
    provide_context=True, 
    dag=DAG) 

def pull_function(**kwargs): 
    ti = kwargs['ti'] 
    ls = ti.xcom_pull(task_ids='push_task') 
    with open('test.txt','w') as out: 
     out.write(ls) 
    out.close() 

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function, 
    provide_context=True, 
    dag=DAG) 

push_task >> pull_task 

내가 내 DAG를 참조하십시오. 또한 airflow list_dagsCLI에 썼을 때 나는 처신을 본다.

또한 python code.py를 사용하여 내 코드를 컴파일하고, 결과는 오류없이 다음과 같았다

: 할 수 없습니다,

[2017-12-16 14:21:30,609] {__init__.py:57} INFO - Using executor SequentialExecutor 
[2017-12-16 14:21:30,709] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt 
[2017-12-16 14:21:30,741] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt 

나는 모두 UI와 airflow trigger_dag Mydag

그러나 명령을 사용하여 DAG를 실행하려고 실행 후 내 txt 결과 파일을보십시오. 로그 파일에도 오류가 없습니다.

어떻게하면 txt 파일을 찾을 수 있습니까?

답변

0

절대 파일 경로를 사용하여 다시 시도하거나 os.getcwd() 메서드로 현재 작업 디렉토리를 로깅하여 파일을 찾을 수 있습니다.

관련 문제