Airflow 調度基礎


1. Airflow

Airflow是一個調度、監控工作流的平台。用於將一個工作流制定為一組任務的有向無環圖(DAG),並指派到一組計算節點上,根據相互之間的依賴關系,有序執行。

 

2. 安裝

pip安裝airflow

pip3 install apache-airflow

 

初始化db

airflow initdb

 

啟動web server

airflow webserver -p 8081

 

啟動scheduler

airflow scheduler

 

3. 例子

下面是一個基本的管道定義,接下來我們會對它們進行詳細解釋:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
   
'owner': 'tang-airflow',
   
'depends_on_past': False,
   
'start_date': datetime(2019, 6, 23),
   
'email': ['xxxxxxx@qq.com'],
   
'email_on_failure': False,
   
'email_on_retry': False,
   
'retries': 1,
   
'retry_delay': timedelta(minutes=5),
   
# 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
'first', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
   
task_id='print_date',
   
bash_command='date',
   
dag=dag)

t2 = BashOperator(
   
task_id='sleep',
   
bash_command='sleep 5',
   
retries=3,
   
dag=dag)

templated_command =
"""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
   
task_id='templated',
   
bash_command=templated_command,
   
params={'my_param': 'Parameter I passed in'},
   
dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

 

它是一個DAG定義文件

一件必須要注意的一件事是:Airflow Python腳本僅僅是一個配置文件,以代碼的方式指定了DAG的結構。而真正執行的任務會以不同的上下文執行,不是以這個腳本的上下文。

對於這個DAG定義文件來說,它們並不執行任何真正的數據處理,它也不是用於此用途。這個腳本的目的是:定義一個DAG對象。它需要很快地執行(秒級別,而不是分級別),因為scheduler會定期執行它,以反映出任何變化(如果有的話)

 

引入模塊

一個Airflow pipeline 僅僅是一個Python腳本,用於定義一個Airflow DAG對象。首先我們需要import需要的庫:

# DAG對象;我們需要它實例化一個DAG
from airflow import DAG

# Operators;我們需要它去做操作
from airflow.operators.bash_operator import BashOperator

 

默認參數

我們接下來會創建一個DAG以及一些tasks任務,並且可以顯式地傳遞一組參數到每個task的構造器中(但是此操作會有些重復工作)。另外一種更好的方法是:我們可以定義一個默認參數的字典,在創建task時使用。

from datetime import datetime, timedelta

default_args = {
   
'owner': 'tang-airflow',
   
'depends_on_past': False,
   
'start_date': datetime(2019, 6, 23),
   
'email': ['402877015@qq.com'],
   
'email_on_failure': False,
   
'email_on_retry': False,
   
'retries': 1,
   
'retry_delay': timedelta(minutes=5),
   
# 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

 

以上僅是一組參數定義示范,在實際應用中可以有更多且多樣的參數配置。

 

實例化一個DAG

我們需要一個DAG對象用於放置tasks。這里我們傳遞一個String定義dag_id,作為DAG的唯一標識符。我們也會將之前定義的參數字典傳遞給此方法,並定義調度DAG的間隔為1天(schedule_interval)。

dag = DAG('first', default_args=default_args, schedule_interval=timedelta(days=1))

 

Tasks

Task任務是在實例化operator對象時生成的。從operator實例化的對象稱為constructor。第一個參數task_id作為task的唯一標志符。

t1 = BashOperator(
   
task_id='print_date',
   
bash_command='date',
   
dag=dag)

t2 = BashOperator(
   
task_id='sleep',
   
bash_command='sleep 5',
   
retries=3,
   
dag=dag)

 

這里我們使用的是BashOperator,執行bash命令,參數部分較為簡單。在一個task中,使用的參數優先級為:

1.     顯式傳遞的參數值

2.     default_args 字典中存在的參數值

3.     operator的默認值(如果有的話)

一個task必須包含的兩個參數為:task_id以及owner,否則Airflow會拋出異常。

 

使用Jinja構建模版

JinjaPython設計的一種模板語言。Airflow使用Jinja模板語言,為pipeline編寫者提供了一組內置的的參數與宏。同時,它也提供了hooks,讓用戶定義它們自己的參數、宏、以及模板。

提供的例子僅片面地介紹了在Airflow使用模板語言,不過提供這個例子的主要的目的有兩個:1.讓讀者知道模板這個功能是存在的;2. 讓讀者了解雙花括號的使用,以及最常見的模板變量: {{ ds }} (今天的”data stamp”)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

 

t3 = BashOperator(
   
task_id='templated',
   
bash_command=templated_command,
   
params={'my_param': 'Parameter I passed in'},
   
dag=dag)

 

需要注意的是 templated_command 的代碼邏輯包含在{% %} 塊中,引用的參數為{{ ds }}。調用的方法例如 {{ macros.ds_add(ds, 7) }},並且在 {{ params.my_param }} 中引用了一個用戶定義的參數。

BashOperator 中的params hook,允許你傳遞一個參數字典、以及/或對象到你的模板中。這里需要仔細看一下傳遞參數時的對應映射關系。

文件也可以作為參數傳遞給bash_command,例如 bash_command=’templated_command.sh’,文件的地址為pipeline文件(這里是tutorial.py)所在文件夾的相對地址。這個功能對於很多場景是有用的,例如將腳本邏輯與pipeline代碼分離、允許執行其他語言的代碼文件、以及構建pipeline更多的靈活性等。也可以在DAG構造器調用中定義你的template_searchpath,指向任何目錄地址。

使用同樣的DAG構造器調用,也可以定義user_defined_macros,指定你自己的變量。例如,傳遞dict(foo=’bar’)到這個參數,可以讓你在模板中使用{{ foo }}。此外,指定user_defined_filters,可以注冊自定義的過濾器。例如,傳遞dict(hello=lambda name: ‘Hello %s’ % name) 到這個變量,可以讓你在模板中使用{{ ‘world’ | hello }}。對於更多的用戶自定義過濾器,可以閱讀以下Jinja官方文檔:

http://jinja.pocoo.org/docs/dev/api/#writing-filters

 

對於更多有關可在模板中使用的變量與宏的信息,可以參考以下文檔:

https://airflow.apache.org/macros.html

 

設置依賴關系

現在我們有三個taskst1, t2 t3。它們之間並沒有相互依賴關系。下面是幾種可以用於定義它們之間依賴的方法:

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

 

需要注意的是,在執行腳本時,如果Airflow發現在DAG中有回環、或是一個依賴被引用超過一次,會拋出異常。

 

4. 測試

我們將以上代碼保存在文件tutorial.py中,保存位置為airflow.cfg文件中定義的DAGs目錄。默認的DAGs目錄地址為~/airflow/dags

# The folder where your airflow pipelines live, most likely a

# subfolder in a code repository

# This path must be absolute

dags_folder = /home/hadoop/airflow/dags

執行腳本:

python3 ~/airflow/dags/tutorial.py

 

命令行驗證元數據

執行腳本后,我們執行幾個命令進一步驗證腳本:

# 打印出activeDAGs

> airflow list_dags

tutorial

 

# 打印 tutorial DAGtasks

> airflow list_tasks tutorial

print_date

sleep

templated

 

# 打印tutorial DAG tasks 的樹狀結構

> airflow list_tasks tutorial --tree

<Task(BashOperator): sleep>

    <Task(BashOperator): print_date>

<Task(BashOperator): templated>

    <Task(BashOperator): print_date>

 

測試

我們可以通過執行task實例進行測試,這里除了傳入task外,還需要傳入一個date(日期)。這里的date在執行上下文中是一個execution_date,模擬了scheduler在某個特定時間點(data + time)執行task

# command layout: command subcommand dag_id task_id date

# testing print_date

> airflow test tutorial print_date 2019-02-02

[2019-06-25 03:51:36,370] {bash_operator.py:90} INFO - Exporting the following env vars:

AIRFLOW_CTX_DAG_ID=tutorial

AIRFLOW_CTX_TASK_ID=print_date

AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00

[2019-06-25 03:51:36,370] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmpc9ntvif0/print_datehrv9r95p

[2019-06-25 03:51:36,370] {bash_operator.py:114} INFO - Running command: date

[2019-06-25 03:51:36,374] {bash_operator.py:123} INFO - Output:

[2019-06-25 03:51:36,376] {bash_operator.py:127} INFO - Tue 25 Jun 03:51:36 UTC 2019

[2019-06-25 03:51:36,376] {bash_operator.py:131} INFO - Command exited with return code 0

 

# testing sleep
> airflow test tutorial sleep 2019-02-02
 

[2019-06-25 03:53:15,203] {bash_operator.py:90} INFO - Exporting the following env vars:

AIRFLOW_CTX_DAG_ID=tutorial

AIRFLOW_CTX_TASK_ID=sleep

AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00

[2019-06-25 03:53:15,203] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp175xwnf8/sleepdsa5lg3t

[2019-06-25 03:53:15,203] {bash_operator.py:114} INFO - Running command: sleep 5

[2019-06-25 03:53:15,207] {bash_operator.py:123} INFO - Output:

 

[2019-06-25 03:53:20,209] {bash_operator.py:131} INFO - Command exited with return code 0

 

# testing 模板

> airflow test tutorial templated 2019-02-02

...

[2019-06-25 05:00:21,412] {bash_operator.py:114} INFO - Running command:

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

echo "Parameter I passed in"

...

需要注意的是,airflow test 命令是在本地運行task實例,將輸出打印到stdout,並沒有依賴考慮,也沒有與數據庫溝通狀態(running, success, failed, …)。此命令僅測試一個單task實例。

 

Backfill

從本地運行來看,未出現任何問題,現在我們運行一個backfillBackfill可以測試某個DAG在設定的日期區間的運行狀況。它會考慮到task之間的依賴、寫入日志文件、與數據庫交互並記錄狀態信息。如果啟動了一個webserver,則可以在webserver上跟蹤它的進度。

需要注意的是,如果使用depends_on_past=True,則單個task實例的運行取決於它的上游task實例的成功運行。

在這個上下文中,時間區間是start_date,以及一個可選的end_date

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2019-02-02 -e 2019-02-09

 

執行之后可在Web Server 界面跟蹤它們的執行狀態。

 

 

 

References:

https://airflow.apache.org/tutorial.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM