0
안녕하세요 Airflow
입니다. 나는 아래와 같이 txt
파일의 목록을 저장하는 간단한 코드를 작성 : 나는 웹 서버 인터페이스를 사용할 때공기 흐름에서 출력을 참조하십시오.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime
DAG = DAG(
dag_id='example_dag',
start_date=datetime.datetime.now(),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
with open('test.txt','w') as out:
out.write(ls)
out.close()
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
내가 내 DAG를 참조하십시오. 또한 airflow list_dags
을 CLI
에 썼을 때 나는 처신을 본다.
python code.py
를 사용하여 내 코드를 컴파일하고, 결과는 오류없이 다음과 같았다
: 할 수 없습니다,
[2017-12-16 14:21:30,609] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-12-16 14:21:30,709] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-12-16 14:21:30,741] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
나는 모두 UI와 airflow trigger_dag Mydag
그러나 명령을 사용하여 DAG를 실행하려고 실행 후 내 txt 결과 파일을보십시오. 로그 파일에도 오류가 없습니다.
어떻게하면 txt 파일을 찾을 수 있습니까?