t1 寫入參數, t2 獲取參數
#coding=utf-8 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator import airflow.utils # 定義默認參數 default_args = { 'owner': 'airflow', # 擁有者名稱 'start_date': airflow.utils.dates.days_ago(1),# 第一次開始執行的時間,為格林威治時間,為了方便測試,一般設置為當前時間減去執行周期 'email': ['lshan523@163.com'], # 接收通知的email列表 'email_on_failure': True, # 是否在任務執行失敗時接收郵件 'email_on_retry': True, # 是否在任務重試時接收郵件 'retries': 3, # 失敗重試次數 'retry_delay': timedelta(seconds=5), # 失敗重試間隔 'provide_context': True, } # 定義DAG dag = DAG( dag_id='hello_world_args', # dag_id default_args=default_args, # 指定默認參數 schedule_interval="@once", # schedule_interval="00, *, *, *, *" # 執行周期,依次是分,時,天,月,年,此處表示每個整點執行 # schedule_interval=timedelta(minutes=1) # 執行周期,表示每分鍾執行一次 ) # 定義要執行的Python函數1 def hello_world_args_1(**context): current_time = str(datetime.today()) with open('/tmp/hello_world_args_1.txt', 'a') as f: f.write('%s\n' % current_time) assert 1 == 1 # 可以在函數中使用assert斷言來判斷執行是否正常,也可以直接拋出異常 context['task_instance'].xcom_push(key='sea1', value="seaseseseaaa") return "t2" # 定義要執行的Python函數2 def hello_world_args_2(**context): sea = context['task_instance'].xcom_pull(key="sea1",task_ids='hello_world_args_1')#參數id,task id current_time = str(datetime.today()) with open('/tmp/hello_world_args_2.txt', 'a') as f: f.write('%s\n' % sea) f.write('%s\n' % current_time) # 定義要執行的task 1 t1 = PythonOperator( task_id='hello_world_args_1', # task_id python_callable=hello_world_args_1, # 指定要執行的函數 dag=dag, # 指定歸屬的dag retries=2, # 重寫失敗重試次數,如果不寫,則默認使用dag類中指定的default_args中的設置 provide_context=True, ) # 定義要執行的task 2 t2 = PythonOperator( task_id='hello_world_args_2', # task_id python_callable=hello_world_args_2, # 指定要執行的函數 dag=dag, # 指定歸屬的dag provide_context=True, ) t1>>t2