Airflow 入門及使用
Airflow 入門及使用
什么是 Airflow?
Airflow 是一個使用 python 語言編寫的 data pipeline 調度和監控工作流的平台。 Airflow 是通過 DAG(Directed acyclic graph 有向無環圖)來管理任務流程的任務調度工具, 不需要知道業務數據的具體內容,設置任務的依賴關系即可實現任務調度。
這個平台擁有和 Hive、Presto、MySQL、HDFS、Postgres 等數據源之間交互的能力,並且提供了鈎子(hook)使其擁有很好地擴展性。 除了一個命令行界面,該工具還提供了一個基於 Web 的用戶界面可以可視化管道的依賴關系、監控進度、觸發任務等。
Airflow 的架構
在一個可擴展的生產環境中,Airflow 含有以下組件:
- 元數據庫:這個數據庫存儲有關任務狀態的信息。
- 調度器:Scheduler 是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。 調度器通常作為服務運行。
- 執行器:Executor 是一個消息隊列進程,它被綁定到調度器中,用於確定實際執行每個任務計划的工作進程。 有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。 例如,LocalExecutor 使用與調度器進程在同一台機器上運行的並行進程執行任務。 其他像 CeleryExecutor 的執行器使用存在於獨立的工作機器集群中的工作進程執行任務。
- Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器確定。

Airflow 解決哪些問題
通常,在一個運維系統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於: 時間依賴:任務需要等待某一個時間點觸發。 外部系統依賴:任務依賴外部系統需要調用接口去訪問。 任務間依賴:任務 A 需要在任務 B 完成后啟動,兩個任務互相間會產生影響。 資源環境依賴:任務消耗資源非常多, 或者只能在特定的機器上執行。
crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。
Airflow 的核心概念是 DAG (有向無環圖)。DAG 由一個或多個 task 組成,而這個 DAG 正是解決了上文所說任務間的依賴問題。 任務執行的先后依賴順序、多個 task 之間的依賴關系可以很好的用 DAG 表示完善。
Airflow 同樣完整的支持 crontab 表達式,也支持直接使用 python 的 datatime 模塊表述時間,還可以用 datatime 的 delta 表述時間差。
Airflow 安裝並運用
# 默認目錄在~/airflow,也可以使用以下命令來指定目錄 export AIRFLOW_HOME={yourpath}/airflow pip install apache-airflow # 配置文件中的 sql_alchemy_conn vim airflow/airflow.cfg # 初始化數據庫 airflow initdb
定義第一個DAG
在 $AIRFLOW_HOME 目錄下新建 dags 文件夾,后面的所有 dag 文件都要存儲在這個目錄。
新建 dag 文件 demo.py,語句含義見注釋
from datetime import datetime, timedelta from airflow import DAG from airflow.utils import dates from airflow.utils.helpers import chain from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator def default_options(): default_args = { 'owner': 'airflow', # 擁有者名稱 'start_date': dates.days_ago(1), # 第一次開始執行的時間,為 UTC 時間 'retries': 1, # 失敗重試次數 'retry_delay': timedelta(seconds=5) # 失敗重試間隔 } return default_args # 定義DAG def test1(dag): t = "pwd" # operator 支持多種類型, 這里使用 BashOperator task = BashOperator( task_id='test1', # task_id bash_command=t, # 指定要執行的命令 dag=dag # 指定歸屬的dag ) return task def hello_world_1(): current_time = str(datetime.today()) print('hello world at {}'.format(current_time)) def test2(dag): # PythonOperator task = PythonOperator( task_id='test2', python_callable=hello_world_1, # 指定要執行的函數 dag=dag) return task def test3(dag): t = "date" task = BashOperator( task_id='test3', bash_command=t, dag=dag) return task with DAG( 'test_task', # dag_id default_args=default_options(), # 指定默認參數 schedule_interval="20 8 * * *" # 執行周期 ) as d: task1 = test1(d) task2 = test2(d) task3 = test3(d) chain(task1, task2, task3) # 指定執行順序
寫完后執行 python $AIRFLOW_HOME/dags/demo.py
檢查是否有錯誤,如果命令行沒有報錯,就表示沒問題。
命令行輸入 airflow list_dags -sd $AIRFLOW_HOME/dags
查看生效的 dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
test_task
也可以用位位移指定任務執行順序
可以使用位移符號:
task1 >> task2 >> task3
等價於
task1.set_downstream(task2)
task2.set_downstream(task23)
Web UI
啟動命令 airflow webserver
任務圖視圖

任務樹視圖

# 測試任務,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10 # 開始運行任務(這一步也可以在web界面點trigger按鈕) airflow trigger_dag test_task # 守護進程運行webserver, 默認端口為8080,也可以通過`-p`來指定 airflow webserver -D # 守護進程運行調度器 airflow scheduler -D # 守護進程運行調度器 airflow worker -D # 暫停任務 airflow pause dag_id # 取消暫停,等同於在web管理界面打開off按鈕 airflow unpause dag_id # 查看task列表 airflow list_tasks dag_id 查看task列表 # 清空任務狀態 airflow clear dag_id # 運行task airflow run dag_id task_id execution_date
本文源文路徑:https://zhuanlan.zhihu.com/p/84332879