Airflow 入門教程&示例


導入模塊

# 導入所需的模塊
# DAG用來實例化DAG對象,注意僅僅只是定義了一個對象,而不是進行真正的數據處理流程
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

設置默認參數

在我們創建任務的時候我們可以使用這些默認參數

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

實例化一個DAG

我們需要一個 DAG 對象來嵌入我們的任務,下面的代碼中,我們首先定義一個字符串,作為DAG的唯一標識,然后傳入默認的參數字典(上面定義的),然后定義調度的間隔為1天

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

任務

實例化 operator 時會生成任務。一個從 operator 實例化的對象也稱為構造器(constructor),第一個參數 task_id 作為任務的唯一標識

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

注意我們如何將各個Operator特定的參數(bash_command) 以及繼承自BaseOperator的所有Operator的通用參數(retries) 傳遞給Operator的constructor。這比將每個參數傳遞給每個constructor要簡單。當然我們也注意到,t2繼承的通用參數retries被我們重載,賦值成3了。
任務的前提規則如下:

  1. 明確傳遞參數
  2. 值在default_args字典中存在
  3. operator的默認值(如果存在)
    一個任務必須包含或者繼承參數 task_id 與 owner ,否則Airflow 將會拋出異常

Templating with Jinja

Airflow利用Jinja Templating的強大功能,為管道作者提供一組內置參數和宏。Airflow還為管道作者提供了定義自己的參數,宏和模板的鈎子(Hooks)。
本教程幾乎沒有涉及在Airflow中使用模板進行操作,本節的目的是讓您了解此功能的存在,讓您熟悉雙花括號和最常見的模板變量:{{ds}}(今天的“日期戳”)。

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

請注意,templated_command包含{%%}塊中的代碼邏輯,引用參數如{{ds}},調用{{macros.ds_add(ds,7)}}中的函數,並在{{params.my_param}}中引用用戶定義的參數。

BaseOperator中的params hook允許您將參數和/或對象的字典傳遞給模板。
請花點時間了解參數my_param如何通過模板。

文件也可以傳遞給bash_command參數,例如bash_command ='templated_command.sh',其中文件位置相對於包含管道文件的目錄(在本例中為tutorial.py)。
這可能有許多種原因,例如分離腳本的邏輯和管道代碼,允許在用不同語言編寫的文件中執行正確的代碼突出顯示,以及構造管道的通用性及靈活性。
也可以將template_searchpath定義為指向DAG構造函數調用中的任何文件夾位置。

使用相同的DAG構造函數調用,可以定義user_defined_macros,它允許您指定自己的變量。
例如,將dict(foo ='bar')傳遞給此參數允許您在模板中使用{{foo}}
此外,指定user_defined_filters允許您注冊自己的過濾器。
例如,將dict(hello = lambda name:'Hello%s'%name)傳遞給此參將允許您在自己的模板中使用{{'world'|{{ 'world' | hello }}
有關自定義過濾器的更多信息,請查看Jinja文檔

關於可在模板中引用的變量和宏的更多信息,請務必閱讀宏參考

設置依賴關系

我們有互不依賴的三個任務 t1,t2,t3。接下來有一些定義它們之間依賴關系的方法

t1.set_downstream(t2)

# 這個表示t2將依賴於t1
# 等價於
t2.set_upstream(t1)

# 位移運算符也可以完成 t2依賴於t1 的設置
t1 >> t2

# 位移運算符完成 t1依賴於t2 的設置
t2 << t1

# 使用位移運算符更加簡潔地設置多個連鎖依賴關系
t1 >> t2 >> t3

# 任務列表也可以被設置成依賴,以下幾種表達方式是等效的
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

請注意,在執行腳本時,Airflow會在DAG中找到循環或多次引用依賴項時引發異常。

簡要重述以上內容

我們已經有了十分基礎的 DAG 了,你的代碼應當看起來和下面給出的差不多

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

測試

運行腳本

是時候進行一些測試了,我們先確保管道解析成功。
首先確認上述的代碼已經存入tutorial.py,文件的位置位於你的 airflow.cfg指定的 dags 文件夾內,你的DAGs文件夾默認在 ~/airflow/dags
在命令行執行:

python ~/airflow/dags/tutorial.py

如果這個腳本不拋出異常,則意味着你沒有犯任何可怕的錯誤,而且你的Airflow環境還不錯。

命令行元數據驗證

讓我們運行一些命令來進一步驗證上一個腳本

# 打印激活的DAGs 的列表
airflow list_dags

# 打印dag_id 為 "tutorial"  的任務的列表
airflow list_tasks tutorial

#  打印 tutorial DAG中任務的層級關系
airflow list_tasks tutorial --tree

測試

讓我們通過在特定日期運行實際任務實例來進行測試。
在此上下文中指定的日期是execution_date,它模擬特定日期+時間調度運行任務或dag:

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

現在還記得我們之前用模板做過的事嗎?
通過運行此命令,了解如何呈現和執行此模板:

# testing templated
airflow test tutorial templated 2015-06-01

這應該會顯示詳細的事件日志並最終運行bash命令並打印結果
請注意,airflow test命令在本地運行任務實例,將其日志輸出到stdout(在屏幕上),不依賴於依賴項,並且不向數據庫傳達狀態(運行,成功,失敗,...)。
它只允許測試單個任務實例。

backfill

一切看起來都運行良好,讓我們運行backfill。
backfill將遵守您的依賴關系,將日志發送到文件並與數據庫通信以記錄狀態。如果您有網絡服務器,您還可以跟蹤進度。如果您有興趣在backfill過程中直觀地跟蹤進度,
airflow webserver將啟動Web服務器。
請注意,如果使用depends_on_past = True,則單個任務實例將取決於前面任務實例的成功。但是如果指定了這個任務的start_date,此依賴關系將被忽略。

backfill: 在指定的日期范圍內運行DAG的子部分。
如果使用reset_dag_run選項,則backfill將首先提示用戶airflow是否應清除backfill日期范圍內的所有先前dag_runtask_instances
如果使用rerun_failed_tasks,則backfill將自動重新運行backfill日期范圍內的先前失敗的任務實例。
``cmd
airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE] [-m] [-l]
[-x] [-i] [-I] [-sd SUBDIR] [--pool POOL]
[--delay_on_limit DELAY_ON_LIMIT] [-dr] [-v] [-c CONF]
[--reset_dagruns] [--rerun_failed_tasks] [-B]
dag_id


此上下文中的日期范圍是start_date和可選的end_date,它們用於使用此dag中的任務實例填充運行計划。
```cmd
# 可選,在后台開啟一個web服務器
# airflow webserver --debug &

# 在一個時間范圍內開始你的 backfill
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM