導入模塊
# 導入所需的模塊
# DAG用來實例化DAG對象,注意僅僅只是定義了一個對象,而不是進行真正的數據處理流程
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
設置默認參數
在我們創建任務的時候我們可以使用這些默認參數
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
實例化一個DAG
我們需要一個 DAG 對象來嵌入我們的任務,下面的代碼中,我們首先定義一個字符串,作為DAG的唯一標識,然后傳入默認的參數字典(上面定義的),然后定義調度的間隔為1天
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
任務
實例化 operator 時會生成任務。一個從 operator 實例化的對象也稱為構造器(constructor),第一個參數 task_id
作為任務的唯一標識
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
注意我們如何將各個Operator特定的參數(bash_command) 以及繼承自BaseOperator的所有Operator的通用參數(retries) 傳遞給Operator的constructor。這比將每個參數傳遞給每個constructor要簡單。當然我們也注意到,t2繼承的通用參數retries被我們重載,賦值成3了。
任務的前提規則如下:
- 明確傳遞參數
- 值在default_args字典中存在
- operator的默認值(如果存在)
一個任務必須包含或者繼承參數 task_id 與 owner ,否則Airflow 將會拋出異常
Templating with Jinja
Airflow利用Jinja Templating的強大功能,為管道作者提供一組內置參數和宏。Airflow還為管道作者提供了定義自己的參數,宏和模板的鈎子(Hooks)。
本教程幾乎沒有涉及在Airflow中使用模板進行操作,本節的目的是讓您了解此功能的存在,讓您熟悉雙花括號和最常見的模板變量:{{ds}}(今天的“日期戳”)。
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
請注意,templated_command包含{%%}
塊中的代碼邏輯,引用參數如{{ds}}
,調用{{macros.ds_add(ds,7)}
}中的函數,並在{{params.my_param}}
中引用用戶定義的參數。
BaseOperator中的params hook允許您將參數和/或對象的字典傳遞給模板。
請花點時間了解參數my_param如何通過模板。
文件也可以傳遞給bash_command
參數,例如bash_command ='templated_command.sh'
,其中文件位置相對於包含管道文件的目錄(在本例中為tutorial.py)。
這可能有許多種原因,例如分離腳本的邏輯和管道代碼,允許在用不同語言編寫的文件中執行正確的代碼突出顯示,以及構造管道的通用性及靈活性。
也可以將template_searchpath
定義為指向DAG構造函數調用中的任何文件夾位置。
使用相同的DAG構造函數調用,可以定義user_defined_macros
,它允許您指定自己的變量。
例如,將dict(foo ='bar')
傳遞給此參數允許您在模板中使用{{foo}}
。
此外,指定user_defined_filters
允許您注冊自己的過濾器。
例如,將dict(hello = lambda name:'Hello%s'%name)
傳遞給此參將允許您在自己的模板中使用{{'world'|{{ 'world' | hello }}
有關自定義過濾器的更多信息,請查看Jinja文檔
關於可在模板中引用的變量和宏的更多信息,請務必閱讀宏參考
設置依賴關系
我們有互不依賴的三個任務 t1,t2,t3。接下來有一些定義它們之間依賴關系的方法
t1.set_downstream(t2)
# 這個表示t2將依賴於t1
# 等價於
t2.set_upstream(t1)
# 位移運算符也可以完成 t2依賴於t1 的設置
t1 >> t2
# 位移運算符完成 t1依賴於t2 的設置
t2 << t1
# 使用位移運算符更加簡潔地設置多個連鎖依賴關系
t1 >> t2 >> t3
# 任務列表也可以被設置成依賴,以下幾種表達方式是等效的
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
請注意,在執行腳本時,Airflow會在DAG中找到循環或多次引用依賴項時引發異常。
簡要重述以上內容
我們已經有了十分基礎的 DAG 了,你的代碼應當看起來和下面給出的差不多
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
測試
運行腳本
是時候進行一些測試了,我們先確保管道解析成功。
首先確認上述的代碼已經存入tutorial.py
,文件的位置位於你的 airflow.cfg
指定的 dags
文件夾內,你的DAGs文件夾默認在 ~/airflow/dags
在命令行執行:
python ~/airflow/dags/tutorial.py
如果這個腳本不拋出異常,則意味着你沒有犯任何可怕的錯誤,而且你的Airflow環境還不錯。
命令行元數據驗證
讓我們運行一些命令來進一步驗證上一個腳本
# 打印激活的DAGs 的列表
airflow list_dags
# 打印dag_id 為 "tutorial" 的任務的列表
airflow list_tasks tutorial
# 打印 tutorial DAG中任務的層級關系
airflow list_tasks tutorial --tree
測試
讓我們通過在特定日期運行實際任務實例來進行測試。
在此上下文中指定的日期是execution_date
,它模擬特定日期+時間調度運行任務或dag:
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
現在還記得我們之前用模板做過的事嗎?
通過運行此命令,了解如何呈現和執行此模板:
# testing templated
airflow test tutorial templated 2015-06-01
這應該會顯示詳細的事件日志並最終運行bash命令並打印結果
請注意,airflow test
命令在本地運行任務實例,將其日志輸出到stdout(在屏幕上),不依賴於依賴項,並且不向數據庫傳達狀態(運行,成功,失敗,...)。
它只允許測試單個任務實例。
backfill
一切看起來都運行良好,讓我們運行backfill。
backfill將遵守您的依賴關系,將日志發送到文件並與數據庫通信以記錄狀態。如果您有網絡服務器,您還可以跟蹤進度。如果您有興趣在backfill過程中直觀地跟蹤進度,
airflow webserver
將啟動Web服務器。
請注意,如果使用depends_on_past = True
,則單個任務實例將取決於前面任務實例的成功。但是如果指定了這個任務的start_date,此依賴關系將被忽略。
backfill: 在指定的日期范圍內運行DAG的子部分。
如果使用reset_dag_run
選項,則backfill
將首先提示用戶airflow
是否應清除backfill
日期范圍內的所有先前dag_run
和task_instances
。
如果使用rerun_failed_tasks
,則backfill
將自動重新運行backfill
日期范圍內的先前失敗的任務實例。
``cmd
airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE] [-m] [-l]
[-x] [-i] [-I] [-sd SUBDIR] [--pool POOL]
[--delay_on_limit DELAY_ON_LIMIT] [-dr] [-v] [-c CONF]
[--reset_dagruns] [--rerun_failed_tasks] [-B]
dag_id
此上下文中的日期范圍是start_date和可選的end_date,它們用於使用此dag中的任務實例填充運行計划。
```cmd
# 可選,在后台開啟一個web服務器
# airflow webserver --debug &
# 在一個時間范圍內開始你的 backfill
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07