1. Airflow
Airflow是一個調度、監控工作流的平台。用於將一個工作流制定為一組任務的有向無環圖(DAG),並指派到一組計算節點上,根據相互之間的依賴關系,有序執行。
2. 安裝
pip安裝airflow:
pip3 install apache-airflow
初始化db:
airflow initdb
啟動web server:
airflow webserver -p 8081
啟動scheduler:
airflow scheduler
3. 例子
下面是一個基本的管道定義,接下來我們會對它們進行詳細解釋:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'tang-airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 23),
'email': ['xxxxxxx@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('first', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
它是一個DAG定義文件
一件必須要注意的一件事是:Airflow Python腳本僅僅是一個配置文件,以代碼的方式指定了DAG的結構。而真正執行的任務會以不同的上下文執行,不是以這個腳本的上下文。
對於這個DAG定義文件來說,它們並不執行任何真正的數據處理,它也不是用於此用途。這個腳本的目的是:定義一個DAG對象。它需要很快地執行(秒級別,而不是分級別),因為scheduler會定期執行它,以反映出任何變化(如果有的話)
引入模塊
一個Airflow pipeline 僅僅是一個Python腳本,用於定義一個Airflow DAG對象。首先我們需要import需要的庫:
# DAG對象;我們需要它實例化一個DAG
from airflow import DAG
# Operators;我們需要它去做操作
from airflow.operators.bash_operator import BashOperator
默認參數
我們接下來會創建一個DAG以及一些tasks任務,並且可以顯式地傳遞一組參數到每個task的構造器中(但是此操作會有些重復工作)。另外一種更好的方法是:我們可以定義一個默認參數的字典,在創建task時使用。
from datetime import datetime, timedelta
default_args = {
'owner': 'tang-airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 23),
'email': ['402877015@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
以上僅是一組參數定義示范,在實際應用中可以有更多且多樣的參數配置。
實例化一個DAG
我們需要一個DAG對象用於放置tasks。這里我們傳遞一個String定義dag_id,作為DAG的唯一標識符。我們也會將之前定義的參數字典傳遞給此方法,並定義調度DAG的間隔為1天(schedule_interval)。
dag = DAG('first', default_args=default_args, schedule_interval=timedelta(days=1))
Tasks
Task任務是在實例化operator對象時生成的。從operator實例化的對象稱為constructor。第一個參數task_id作為task的唯一標志符。
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
這里我們使用的是BashOperator,執行bash命令,參數部分較為簡單。在一個task中,使用的參數優先級為:
1. 顯式傳遞的參數值
2. 在default_args 字典中存在的參數值
3. operator的默認值(如果有的話)
一個task必須包含的兩個參數為:task_id以及owner,否則Airflow會拋出異常。
使用Jinja構建模版
Jinja為Python設計的一種模板語言。Airflow使用Jinja模板語言,為pipeline編寫者提供了一組內置的的參數與宏。同時,它也提供了hooks,讓用戶定義它們自己的參數、宏、以及模板。
提供的例子僅片面地介紹了在Airflow使用模板語言,不過提供這個例子的主要的目的有兩個:1.讓讀者知道模板這個功能是存在的;2. 讓讀者了解雙花括號的使用,以及最常見的模板變量: {{ ds }} (今天的”data stamp”):
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
需要注意的是 templated_command 的代碼邏輯包含在{% %} 塊中,引用的參數為{{ ds }}。調用的方法例如 {{ macros.ds_add(ds, 7) }},並且在 {{ params.my_param }} 中引用了一個用戶定義的參數。
在BashOperator 中的params hook,允許你傳遞一個參數字典、以及/或對象到你的模板中。這里需要仔細看一下傳遞參數時的對應映射關系。
文件也可以作為參數傳遞給bash_command,例如 bash_command=’templated_command.sh’,文件的地址為pipeline文件(這里是tutorial.py)所在文件夾的相對地址。這個功能對於很多場景是有用的,例如將腳本邏輯與pipeline代碼分離、允許執行其他語言的代碼文件、以及構建pipeline更多的靈活性等。也可以在DAG構造器調用中定義你的template_searchpath,指向任何目錄地址。
使用同樣的DAG構造器調用,也可以定義user_defined_macros,指定你自己的變量。例如,傳遞dict(foo=’bar’)到這個參數,可以讓你在模板中使用{{ foo }}。此外,指定user_defined_filters,可以注冊自定義的過濾器。例如,傳遞dict(hello=lambda name: ‘Hello %s’ % name) 到這個變量,可以讓你在模板中使用{{ ‘world’ | hello }}。對於更多的用戶自定義過濾器,可以閱讀以下Jinja官方文檔:
http://jinja.pocoo.org/docs/dev/api/#writing-filters
對於更多有關可在模板中使用的變量與宏的信息,可以參考以下文檔:
https://airflow.apache.org/macros.html
設置依賴關系
現在我們有三個tasks:t1, t2 和 t3。它們之間並沒有相互依賴關系。下面是幾種可以用於定義它們之間依賴的方法:
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
需要注意的是,在執行腳本時,如果Airflow發現在DAG中有回環、或是一個依賴被引用超過一次,會拋出異常。
4. 測試
我們將以上代碼保存在文件tutorial.py中,保存位置為airflow.cfg文件中定義的DAGs目錄。默認的DAGs目錄地址為~/airflow/dags:
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /home/hadoop/airflow/dags
執行腳本:
python3 ~/airflow/dags/tutorial.py
命令行驗證元數據
執行腳本后,我們執行幾個命令進一步驗證腳本:
# 打印出active的DAGs
> airflow list_dags
tutorial
# 打印 tutorial DAG的tasks
> airflow list_tasks tutorial
print_date
sleep
templated
# 打印tutorial DAG中 tasks 的樹狀結構
> airflow list_tasks tutorial --tree
<Task(BashOperator): sleep>
<Task(BashOperator): print_date>
<Task(BashOperator): templated>
<Task(BashOperator): print_date>
測試
我們可以通過執行task實例進行測試,這里除了傳入task外,還需要傳入一個date(日期)。這里的date在執行上下文中是一個execution_date,模擬了scheduler在某個特定時間點(data + time)執行task:
# command layout: command subcommand dag_id task_id date
# testing print_date
> airflow test tutorial print_date 2019-02-02
[2019-06-25 03:51:36,370] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=tutorial
AIRFLOW_CTX_TASK_ID=print_date
AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00
[2019-06-25 03:51:36,370] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmpc9ntvif0/print_datehrv9r95p
[2019-06-25 03:51:36,370] {bash_operator.py:114} INFO - Running command: date
[2019-06-25 03:51:36,374] {bash_operator.py:123} INFO - Output:
[2019-06-25 03:51:36,376] {bash_operator.py:127} INFO - Tue 25 Jun 03:51:36 UTC 2019
[2019-06-25 03:51:36,376] {bash_operator.py:131} INFO - Command exited with return code 0
# testing sleep
> airflow test tutorial sleep 2019-02-02
[2019-06-25 03:53:15,203] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=tutorial
AIRFLOW_CTX_TASK_ID=sleep
AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00
[2019-06-25 03:53:15,203] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp175xwnf8/sleepdsa5lg3t
[2019-06-25 03:53:15,203] {bash_operator.py:114} INFO - Running command: sleep 5
[2019-06-25 03:53:15,207] {bash_operator.py:123} INFO - Output:
[2019-06-25 03:53:20,209] {bash_operator.py:131} INFO - Command exited with return code 0
# testing 模板
> airflow test tutorial templated 2019-02-02
...
[2019-06-25 05:00:21,412] {bash_operator.py:114} INFO - Running command:
echo "2019-02-02"
echo "2019-02-09"
echo "Parameter I passed in"
echo "2019-02-02"
echo "2019-02-09"
echo "Parameter I passed in"
echo "2019-02-02"
echo "2019-02-09"
echo "Parameter I passed in"
echo "2019-02-02"
echo "2019-02-09"
echo "Parameter I passed in"
echo "2019-02-02"
echo "2019-02-09"
echo "Parameter I passed in"
...
需要注意的是,airflow test 命令是在本地運行task實例,將輸出打印到stdout,並沒有依賴考慮,也沒有與數據庫溝通狀態(running, success, failed, …)。此命令僅測試一個單task實例。
Backfill
從本地運行來看,未出現任何問題,現在我們運行一個backfill。Backfill可以測試某個DAG在設定的日期區間的運行狀況。它會考慮到task之間的依賴、寫入日志文件、與數據庫交互並記錄狀態信息。如果啟動了一個webserver,則可以在webserver上跟蹤它的進度。
需要注意的是,如果使用depends_on_past=True,則單個task實例的運行取決於它的上游task實例的成功運行。
在這個上下文中,時間區間是start_date,以及一個可選的end_date。
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2019-02-02 -e 2019-02-09
執行之后可在Web Server 界面跟蹤它們的執行狀態。
References: