airflow xcom 數據傳遞


 t1 寫入參數, t2 獲取參數

#coding=utf-8
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import airflow.utils


# 定義默認參數
default_args = {
    'owner': 'airflow',  # 擁有者名稱
        'start_date': airflow.utils.dates.days_ago(1),#  第一次開始執行的時間,為格林威治時間,為了方便測試,一般設置為當前時間減去執行周期
    'email': ['lshan523@163.com'],  # 接收通知的email列表
    'email_on_failure': True,  # 是否在任務執行失敗時接收郵件
    'email_on_retry': True,  # 是否在任務重試時接收郵件
    'retries': 3,  # 失敗重試次數
    'retry_delay': timedelta(seconds=5),  # 失敗重試間隔
    'provide_context': True,
}

# 定義DAG
dag = DAG(
    dag_id='hello_world_args',  # dag_id
    default_args=default_args,  # 指定默認參數
    schedule_interval="@once",
    # schedule_interval="00, *, *, *, *"  # 執行周期,依次是分,時,天,月,年,此處表示每個整點執行
#     schedule_interval=timedelta(minutes=1)  # 執行周期,表示每分鍾執行一次
)


# 定義要執行的Python函數1
def hello_world_args_1(**context):
    current_time = str(datetime.today())
    with open('/tmp/hello_world_args_1.txt', 'a') as f:
        f.write('%s\n' % current_time)
    assert 1 == 1  # 可以在函數中使用assert斷言來判斷執行是否正常,也可以直接拋出異常
    context['task_instance'].xcom_push(key='sea1', value="seaseseseaaa")
    return "t2"

# 定義要執行的Python函數2
def hello_world_args_2(**context):
    sea = context['task_instance'].xcom_pull(key="sea1",task_ids='hello_world_args_1')#參數id,task id
    current_time = str(datetime.today())
    with open('/tmp/hello_world_args_2.txt', 'a') as f:
        f.write('%s\n' % sea)
        f.write('%s\n' % current_time)

# 定義要執行的task 1
t1 = PythonOperator(
    task_id='hello_world_args_1',  # task_id
    python_callable=hello_world_args_1,  # 指定要執行的函數
    dag=dag,  # 指定歸屬的dag
    retries=2,  # 重寫失敗重試次數,如果不寫,則默認使用dag類中指定的default_args中的設置
    provide_context=True,
)

# 定義要執行的task 2
t2 = PythonOperator(
    task_id='hello_world_args_2',  # task_id
    python_callable=hello_world_args_2,  # 指定要執行的函數
    dag=dag,  # 指定歸屬的dag
    provide_context=True,
)


t1>>t2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM