Python工作流-Airflow


Python工作流-Airflow 

Apache Airflow 是一個用於編排復雜計算工作流和數據處理流水線的開源工具。 如果您發現自己運行的是執行時間超長的 cron 腳本任務,或者是大數據的批處理任務,Airflow可能是能幫助您解決目前困境的神器。本文將為那些想要尋找新的工具或者說不知道有這款工具的同學了解 Airflow 編寫工作線提供入門教程。

Airflow 工作流設計稱為有向非循環圖(DAG)。這意味着,在編寫工作流時,您應該考慮如何將你的任務划分為多個可獨立執行的任務,然后,將這些任務合並為一個邏輯整體,將其組合成一個圖,從而實現我們的工作流結果,例如:

airflow-example-dag.png

圖的形狀決定了你工作流的整體邏輯。 Airflow DAG 可以包括多個分支,您可以決定在工作流執行時要走哪些分支或者說跳過哪一個分支。這就為我們的工作創建了一個非常有彈性的設計,因為如果發生錯誤,每個任務可以重試多次,甚至可以完全停止,並通過重新啟動最后一個未完成的任務來恢復運行的工作流程。

在設計 Airflow 操作節點時,務必要記住,它們可能被執行不止一次。 每個任務應該是冪等的,即具有多次應用的能力,而不會產生意想不到的后果。

Airflow 術語

以下是設計Airflow工作流程時使用的一些術語的簡要概述:

  • Airflow DAGs 是很多 Tasks 的組合.
  • 每個 Task 都被實現為一個 Operator
  • 當一個 DAG 啟動的時候,Airflow 都將在數據庫中創建一個 DagRun 記錄
  • 當一個任務執行的時候,實際上是創建了一個 Task實例運行,它運行在 DagRun 的上下文中。
  • AIRFLOW_HOME 是 Airflow 尋找 DAG 和插件的基准目錄。

環境准備

Airflow 是使用 Python 語言編寫的,這讓我們可以非常簡單得在機器上安裝。我這里使用的是 Python3.5 版本的 Python,還在使用 Python2 的兄弟們,趕緊出坑吧,3 會讓你對 Python 更加痴迷的。雖然 Airflow 是支持 Python2 版本的,好像最低可以支持到 Python2.6,但是我牆裂推薦大家使用 Python3.接下來,我將使用 virtualenv 來管理開發環境,並且進行后續的一系列實驗。

image_180.png

安裝 Airflow

為了方便,我這里單獨創建了一個 airflow 的用戶用於實驗,同時使用這個用戶的 home 目錄 /home/airflow 作為 airflow 的工作目錄,如果你希望和我看到一樣的效果,那么我希望是跟着我的步驟一起來:

image_181.png

這里只是進入 virtualenv 環境,接下來才是安裝 airflow 的步驟,截止到我寫博客的時候,airflow 的最新版本是 1.8,所以我這里就使用 1.8 的版本:

經過一段稍長的等待時間之后,我們的 airflow 應該是安裝成功了,在安裝過程我們可以看到,airflow 依賴於大量的其他庫,這個我們后續都會慢慢道來。現在是是否配置 airflow 的環境了。

第一個需要配置的就是 AIRFLOW_HOME 環境變量,這個是 airflow 工作的基礎,后續的 DAG 和 Plugin 都將以此為基礎展開,因為他們都是以 AIRFLOW_HOME 作為根目錄進行查找。根據我們之前的描述,我們的 HOME 目錄應該是 /home/airflow,所以可以這么設置:

image_183.png

哈哈,到這里我們可以說一個最簡單的配置就算是完成了,來看點有用的吧,嘗試一下輸入 airflow version 命令看看:

image_184.png

如果你看到了上面一般的輸出,那么說明我們的 airflow 是安裝和配置成功的,同時,我們使用 ls -al 命令看看,應該在 /home/airflow 目錄上能夠發現以下兩個文件:

image_186.png

打開 airflow.cfg 文件你會發現里面有很多配置項,而且大部分都是有默認值的,這個文件就是 airflow 的配置文件,在以后的詳解中我們會接觸到很多需要修改的配置,目前就以默認的配置來進行試驗。如果你現在就迫不及待得想自己修改着玩玩,那么 Airflow Configure 這篇文檔可以幫助你了解各個配置項的含義。

初始化 Airflow 數據庫

可能你會有點震驚了,為啥要初始化數據庫?是的,因為 airflow 需要維護 DAG 內部的狀態,需要保存任務執行的歷史信息,這些都是放在數據庫里面的,也就是說我們需要先在數據庫中創建表,但是,因為使用的是 Python,我們不需要自己使用原始的 SQL 來創建,airflow 為我們提供了方便的命令行,只需要簡單得執行:

這里值得注意的是,默認的配置使用的是 SQLite,所以初始化知道會在本地出現一個 airflow.db 的數據庫文件,如果你想要使用其他數據庫(例如 MySQL),都是可以的,只需要修改一下配置,我們后續會講到:

image_187.png

Airflow Web 界面

Airflow 提供了多種交互方式,主要使用到的有兩種,分別是:命令行 和 Web UI。Airflow 的 Web UI 是通過 Flask 編寫的,要啟動起來也是很簡單,直接在 AIRFLOW_HOME 目錄運行這條命令:

然后你就可以通過瀏覽器看到效果了,默認的訪問端口是:8080,所以打開瀏覽器,訪問以下 URL:http://localhost:8080/admin,神奇的事情就這么發生了,你將看到類似這樣的頁面:

image_185.png

第一個 DAG

從一開始就說了, Airflow 的兩個重大功能就是 DAG 和 Plugin,但是直到現在我們才開始講解 DAG。DAG 是離散數學中的一個概念,全稱我們稱之為:有向非循環圖(directed acyclic graphs)。圖的概念是由節點組成的,有向的意思就是說節點之間是有方向的,轉成工業術語我們可以說節點之間有依賴關系;非循環的意思就是說節點直接的依賴關系只能是單向的,不能出現 A 依賴於 B,B 依賴於 C,然后 C 又反過來依賴於 A 這樣的循環依賴關系。

那么在 Airflow 中,圖的每個節點都是一個任務,可以是一條命令行(BashOperator),可以是一段 Python 腳本(PythonOperator)等等,然后這些節點根據依賴關系構成了一條流程,一個圖,稱為一個 DAG,每個 Dag 都是唯一的 DagId。

創建一個 DAG 也是很簡單得,首先需要在 AIRFLOW_HOME 目錄下創建一個 dags 目錄,airflow 將會在這個目錄下去查找 DAG,所以,這里我們先創建一個,創建完之后新建一個 tutorial.py 文件:

image_189.png

然后,再來看下我們的 DAG 文件是怎么寫的:

image_191.png

我們可以從 Web UI 上看到這個 DAG 的依賴情況:

image_192.png

這就定義了幾個任務節點,然后組成了一個 DAG,同時也可以發現,依賴關系是通過 set_downstream 來實現的,這只是一種方式,在后面我們將會看到一個更加簡便的方式。

讓 DAG 跑起來

為了讓 DAG 能夠運行,我們需要觸發 DAG 任務,這里有幾種觸發的方式,但是,最天然的當屬定時器了,例如,在我們上面的任務中,可以發現設置了一個參數:schedule_interval,也就是任務觸發的周期。但是,你光設置了周期是沒有用的,我們還需要有個調度器讓他調度起來,所以需要運行調度器:

image_193.png

我這里使用的是 LocalExecutor, Airflow 目前有三種執行器,分別是:

  • SequentialExecutor:順序得指定 DAG
  • LocalExecutor:使用本地進程執行 DAG
  • CeleryExecutor:使用 Celery 執行 DAG

其中第一種 SequentialExecutor 可以用來在開發調試階段使用,千萬不要在生成環境中使用。第二種和第三種可以用於生產也可以用於開發測試,但是,對於任務較多的,推薦使用第三種: CeleryExecutor。

總結

本文從 Airflow 的環境安裝出發,簡單得介紹了一下如何使用 Airflow,但是本文的定位始終是一篇入門文章,對於 Airflow 的高級特性,在本博客中將會有大量的后續文章進行介紹,請大家自行搜索了解。

Reference

  1. Airflow Tutorial
  2. A Summer Intern’s Journey into Airflow @ Agari
  3. Get started developing workflows with Apache Airflow
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM