第一章 Airflow基本原理


一、Airflow簡介

airflow是Airbnb開源的一個用python編寫的調度工具,項目於2014年啟動,2015年春季開源,2016年加入Apache軟件基金會的孵化計划,使用Python編寫實現的任務管理、調度、監控工作流平台。
Airflow 是基於DAG(有向無環圖)的任務管理系統,可以簡單理解為是高級版的crontab,但是它解決了crontab無法解決的任務依賴問題。與crontab相比Airflow可以方便查看任務的執行狀況(執行是否成功、執行時間、執行依 賴等),可追蹤任務歷史執行情況,任務執行失敗時可以收到郵件通知,查看錯誤日志。

二、Airflow使用場景

在實際項目中,我們經常遇到以下場景:

1.運維人員,定時對服務器執行腳本某些腳本,最簡單的方式是添加一些crond任務,但如果想追溯各個任務的執行結果時?
2.在大數據場景下,每隔一段時間需導出線上數據、導入到大數據平台、觸發數據處理等多個子操作,且各個子操作含有依賴關系時?
3.在管理大量主機時,想要一個統一的作業管理平台,能在上面定義各種任務來管理下面的設備?

airflow通過DAG配置文件,能輕松定義各種任務及任務之間的依賴關系和調度執行,並一個可視化的操作web界面。

三、Airflow優勢

#1.自帶web管理界面,易上手;
#2.業務代碼和調度代碼完全解耦;
#3.通過python代碼定義子任務,並支持各種Operate操作器,靈活性大,能滿足用戶的各種需求;
#4.python開源項目,支持擴展operate等插件,便於二次開發;
#5.類似的工具有akzban,quart等;

四、Airflow基本架構

在一個可擴展的生產環境中,Airflow 含有以下組件:
#1.元數據庫:
	這個數據庫存儲有關任務狀態的信息。

#2.調度器:
	Scheduler 是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。調度器通常作為服務運行。

#3.執行器:
	Executor 是一個消息隊列進程,它被綁定到調度器中,用於確定實際執行每個任務計划的工作進程。有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。
最關鍵的執行器就有四種選擇:
SequentialExecutor:單進程順序執行任務,默認執行器,通常只用於測試
LocalExecutor:多進程本地執行任務,使用與調度器進程在同一台機器上運行的並行進程執行任務
CeleryExecutor:分布式調度,生產常用,使用存在於獨立的工作機器集群中的工作進程執行任務
DaskExecutor :動態任務調度,主要用於數據分析

#4.Workers:
	這些是實際執行任務邏輯的進程,由正在使用的執行器確定。

五、Airflow 基本概念

1.DAG

DAG(Directed Acyclic Graph)是Airflow的核心概念之一,DAG體現的是你的工作流,它由Python腳本定義,其中包含了你想要運行的一系列task,同時其中還定義了這些task的依賴關系。 DAG代表了一個供調度的工作流,它的主要配置項包括owner,schedule。DAG支持多種調度方式,你可以指定該DAG定時調度,如每天的5am,也可以指定它周期性調度,如每二十分鍾調度一次。 DAG由其中的task組成。例如,一個簡單的DAG可以包括三個任務:A,B,C。我們可以讓B依賴於A,在A成功執行之后運行B,而C可以在任意時候運行。需要注意的是,DAG本身並不關注A,B,C三個任務的具體內容,它關注的是三個任務的執行順序以及依賴條件。

2.DAG Run

DAG run是指DAG的實例,DAG run通常由Airflow的scheduler創建,在特定時間運行,並包含了DAG中定義的task的實例。

3.execution_date

execution_date是logical datetime,是DAG指定運行的時間,它可以是過去或將來的某個時間盡管DAG實際上是正在運行的。

4.Task

Task是DAG中定義工作的基本單位,它的地位等同於工作流中的一個節點。Task和Operator是一枚硬幣的正反兩面,Task代表工作的抽象概念,Operator定義了對應Task要做的具體任務。 同一個DAG中的Task之間一般有先后順序和依賴關系,考慮如下代碼:
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    task_1 = DummyOperator('task_1')
    task_2 = DummyOperator('task_2')
    task_1 >> task_2 # Define dependencies
這段代碼定義了一個包含兩個task的DAG,task_1是task_2的上游,而task_2依賴於task_1,當DAG run被創建時,task_1首先運行,task_2在task_1成功執行后才會執行。

5.Task Instance

Task Instance是Task的實例,其關系等同於DAG和DAG run之間的關系。

6.Task的生命周期

Task的生命周期有一下八種:

Task的正常流程包括一下幾個階段

No status (scheduler 創建了空的task實例)
Scheduled (scheduler對該task實例進行了調度)
Queued (scheduler將task實例傳給executor,放入執行隊列)
Running (task開始執行)
Success (task成功結束)

7.Operators

DAG定義了一個工作流如何執行,而Operator定義了一個task執行的具體任務,是Airflow中編寫具體任務的類。Operator包括很多種類,BashOperator用來執行Bash命令,PythonOperator可以執行Python函數,MySOperator可以操作MySQL數據庫執行相關操作,當然你也可以從BaseOperator中繼承並開發自己的Operator。

8.Scheduler

Scheduler監控所有的task和DAG,同時觸發依賴已經滿足的task。Scheduler在后端開啟子進程,和DAG文件夾同步,並周期性(可配置時長)收集DAG的解析結果來找到可以條件滿足的task。Scheduler會將可以運行的task交給我們配置好的executor執行。 Scheduler是Airflow環境中的頂層服務,簡單地在命令行運行airflow scheduler可以開啟。

9.Executor

Executor是task的執行器,它有多種配置方式,如SequetialExecutor串行運行task(適用於開發環境,是默認的配置),LocalExecutor可以在本地並發運行task,CeleryExecutor可以分布式地運行task。

六、Airflow常用命令

4.1,常用命令
$ airflow webserver -D		 守護進程運行webserver
$ airflow scheduler -D		 守護進程運行調度器
$ airflow worker -D		     守護進程運行調度器
$ airflow worker -c 1 -D 	 守護進程運行celery worker並指定任務並發數為1
$ airflow pause dag_id  	暫停任務
$ airflow unpause dag_id 	 取消暫停,等同於在管理界面打開off按鈕
$ airflow list_tasks dag_id  查看task列表
$ airflow clear dag_id       清空任務實例
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE  運行整個dag文件
$ airflow run dag_id task_id execution_date       運行task

七、Airflow與同類產品的對比

系統名稱 介紹
Apache Oozie 使用XML配置, Oozie任務的資源文件都必須存放在HDFS上. 配置不方便同時也只能用於Hadoop.
Linkedin Azkaban web界面尤其很贊, 使用java properties文件維護任務依賴關系, 任務資源文件需要打包成zip, 部署不是很方便.
Airflow 具有自己的web任務管理界面,dag任務創建通過python代碼,可以保證其靈活性和適應性


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM