一、Airflow是什么
airflow 是一個編排、調度和監控workflow的平台,由Airbnb開源,現在在Apache Software Foundation 孵化。airflow 將workflow編排為由tasks組成的DAGs(有向無環圖),調度器在一組workers上按照指定的依賴關系執行tasks。同時,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操作,並且airflow提供了監控和報警系統。
二、Airflow的核心概念
- DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關系組織起來,描述的是所有tasks執行的順序。
- Operators:airflow內置了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用於發送郵件,HTTPOperator 用於發送HTTP請求, SqlOperator 用於執行SQL命令...同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。可以理解為用戶需要的一個操作,是Airflow提供的類
- Tasks:Task 是 Operator的一個實例
- Task Instance:由於Task會被重復調度,每次task的運行就是不同的task instance了。Task instance 有自己的狀態,包括"running", "success", "failed", "skipped", "up for retry"等。
- Task Relationships:DAGs中的不同Tasks之間可以有依賴關系
三、使用AirFlow完成天級的任務調度
說了這么多抽象的概念,估計看官還是雲里霧里,下面就直接舉個例子來說明吧。
1. 安裝airflow
Airflow可以約等於只支持linux和mac,Windows上極其難裝,筆者放棄了.
安裝也很簡單,以下代碼來自官方文檔,使用了Python的pip管理:
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
安裝好了以后訪問localhost:8080即可訪問ui界面
2. 基本配置
- 需要創建
~/airflow/dags
目錄,這個目錄是默認的存放DAG的地方,想修改的話可以修改~/airflow/airflow.cfg
文件 - 修改airflow的數據庫
airflow會使用sqlite作為默認的數據庫,此情況下airflow進行調度的任務都只能單個的執行.在調度任務量不大的情況下,可以使用sqlite作為backend.如果想scale out的話,需要修改配置文件,官方推薦使用mysql或者postgresql作為backend數據庫.
3. 使用PostgresOperator執行SQL完成ETL任務
通過搜集信息,了解到PostgresOperator能執行SQL,並且還支持傳參數.能解決大多數ETL任務中的傳參問題.傳參使用的是Python的Jinjia模塊.
- 創建DAG
首先創建一個test_param_sql.py文件.內容如下:
from datetime import datetime, timedelta
import airflow
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 7, 26), #start_date會決定這個DAG從哪天開始生效
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# Variable是Airflow提供的用戶自定義變量的功能,在UI界面的Admin -> Variable下可以進行增刪改查,此處筆者定義了sql_path作為存放sql文件的地方
tmpl_search_path = Variable.get("sql_path")
dag = airflow.DAG(
'test_param_sql',
schedule_interval=timedelta(days=1), # schedule_interval是調度的頻率
template_searchpath=tmpl_search_path,
default_args=args,
max_active_runs=1)
test_param_sql = PostgresOperator(
task_id='test_param_sql',
postgres_conn_id='postgres_default',
sql='param_sql.sql',
dag=dag,
params={'period': '201905'},
pool='pricing_pool')
match_finish = DummyOperator(
task_id='match_finish',
dag=dag
)
test_param_sql >> match_finish
- 准備要執行的Sql文件
創建test_sql.sql文件.
SQL文件會被Jinjia解析,可以使用一些宏來實現時間的替換 例
{{ ds }} 會被轉換為當天的 YYYY-MM-DD 格式的日期
{{ ds_nodash }} 會被轉換為當天的 YYYYMMDD的格式的日期
在本例里則是通過{{params.period}} 取到了 DAG上傳入的參數,
insert into test.param_sql_test
select * from test.dm_input_loan_info_d
where period = {{params.period}};
-
整體的目錄結構如下
dags/
test_param_sql.py
sql/
test_sql.sql -
測試dag是否正確
可以使用airflow test dag_id task_id date
進行測試,測試會執行Operator,Operator指定的行為會進行調度. 但是不會將執行的行為記錄到Airflow的數據庫里 -
發布
把文件放到~/airflow/dags目錄下,sql文件不要放在dags目錄下,可以找其他地方(比如同級目錄),配置好上文說到的Variable,能找到即可.筆者的理解是,airflow會掃描dags目錄下的內容,並嘗試解析成dag,如果有不能成功解析的內容,ui界面上會有錯誤提示,導致dag顯示不出來等問題.
其他有用的信息
-
如何在dag.py里引入其他的本地python模塊
需要把本地的python模塊放到一個zip文件里,例如:
my_dag1.py
my_dag2.py
package1/init.py
package1/functions.py
然后把這個zip文件放到dags目錄下,才能被正確解析 -
pooling可以控制任務的並行度,如果給DAG指定了一個不存在的pooling,任務會一直處於scheduled的狀態,不繼續進行