【原創】Airflow 簡介&如何部署一個健壯的 apache-airflow 調度系統


聲明

本文摘錄了很多前輩的文章,原文如下:

https://www.jianshu.com/p/2ecef979c606


Airflow 簡介

Airflow是一個可編程,調度和監控的工作流平台,基於有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面同樣也可以方便的管控調度任務,並且對任務運行狀態進行實時監控,方便了系統的運維和管理。

基本概念

airflow守護進程

airflow 系統在運行時有許多守護進程,它們提供了 airflow 的全部功能。守護進程包括 Web服務器-webserver、調度程序-scheduler、執行單元-worker、消息隊列監控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守護進程。

webserver

webserver 是一個守護進程,它接受 HTTP 請求,允許你通過 Python Flask Web 應用程序與 airflow 進行交互,webserver 提供以下功能:

  • 中止、恢復、觸發任務。
  • 監控正在運行的任務,斷點續跑任務。
  • 執行 ad-hoc 命令或 SQL 語句來查詢任務的狀態,日志等詳細信息。
  • 配置連接,包括不限於數據庫、ssh 的連接等。

webserver 守護進程使用 gunicorn 服務器(相當於 java 中的 tomcat )處理並發請求,可通過修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值來控制處理並發請求的進程數。

例如:

workers = 4 #表示開啟4個gunicorn worker(進程)處理web請求

啟動 webserver 守護進程:

$ airflow webserver -D

scheduler

scheduler 是一個守護進程,它周期性地輪詢任務的調度計划,以確定是否觸發任務執行。

啟動的 scheduler 守護進程:

$ airflow scheduler -D

worker

worker 是一個守護進程,它啟動 1 個或多個 Celery 的任務隊列,負責執行具體 的 DAG 任務。

當設置 airflow 的 executors 設置為 CeleryExecutor 時才需要開啟 worker 守護進程。

推薦你在生產環境使用 CeleryExecutor :

executor = CeleryExecutor

啟動一個 worker守護進程,默認的隊列名為 default:

$ airflow worker -D

flower

flower 是一個守護進程,用於是監控 celery 消息隊列。啟動守護進程命令如下:

$ airflow flower -D

默認的端口為 5555,您可以在瀏覽器地址欄中輸入 "http://hostip:5555" 來訪問 flower ,對 celery 消息隊列進行監控。

執行器(Executor)

Airflow本身是一個綜合平台,它兼容多種組件,所以在使用的時候有多種方案可以選擇。比如最關鍵的執行器就有四種選擇:

  • SequentialExecutor:單進程順序執行任務,默認執行器,通常只用於測試
  • LocalExecutor:多進程本地執行任務
  • CeleryExecutor:分布式調度,生產常用
  • DaskExecutor :動態任務調度,主要用於數據分析

生產環境建議使用CeleryExecutor作為執行器。

celery是一個分布式調度框架,其本身無隊列功能,需要使用第三方組件,比如redis或者rabbitmq。

核心概念

  • DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關系組織起來,描述的是所有tasks執行順序。
  • Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內置了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用於發送郵件,HTTPOperator 用於發送HTTP請求, SqlOperator 用於執行SQL命令等等,同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。
  • Tasks:Task 是 Operator的一個實例,也就是DAGs中的一個node。
  • Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態,包括"running", "success", "failed", "skipped", "up for retry"等。
  • Task Relationships:DAGs中的不同Tasks之間可以有依賴關系,如 Task1 >> Task2,表明Task2依賴於Task2了。

通過將DAGs和Operators結合起來,用戶就可以創建各種復雜的 工作流(workflow)。

操作符-Operators

DAG 定義一個作業流,Operators 則定義了實際需要執行的作業。airflow 提供了許多 Operators 來指定我們需要執行的作業:

  • BashOperator - 執行 bash 命令或腳本。
  • SSHOperator - 執行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
  • PythonOperator - 執行 Python 函數。
  • EmailOperator - 發送 Email。
  • HTTPOperator - 發送一個 HTTP 請求。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執行 SQL 任務。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator 你懂得。除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。

airflow 的守護進程是如何一起工作的?

需要注意的是 airflow的守護進程彼此之間是獨立的,他們並不相互依賴,也不相互感知。每個守護進程在運行時只處理分配到自己身上的任務,他們在一起運行時,提供了 airflow 的全部功能。

調度器 scheduler 會間隔性的去輪詢元數據庫(Metastore)已注冊的 DAG(有向無環圖,可理解為作業流)是否需要被執行。如果一個具體的 DAG 根據其調度計划需要被執行,scheduler 守護進程就會先在元數據庫創建一個 DagRun 的實例,並觸發 DAG 內部的具體 task(任務,可以這樣理解:DAG 包含一個或多個task),觸發其實並不是真正的去執行任務,而是推送 task 消息至消息隊列(即 broker)中,每一個 task 消息都包含此 task 的 DAG ID,task ID,及具體需要被執行的函數。如果 task 是要執行 bash 腳本,那么 task 消息還會包含 bash 腳本的代碼。
用戶可能在 webserver 上來控制 DAG,比如手動觸發一個 DAG 去執行。當用戶這樣做的時候,一個DagRun 的實例將在元數據庫被創建,scheduler 使同 #1 一樣的方法去觸發 DAG 中具體的 task 。
worker 守護進程將會監聽消息隊列,如果有消息就從消息隊列中取出消息,當取出任務消息時,它會更新元數據中的 DagRun 實例的狀態為正在運行,並嘗試執行 DAG 中的 task,如果 DAG 執行成功,則更新任 DagRun 實例的狀態為成功,否則更新狀態為失敗。

airflow單節點部署

將以上所有守護進程運行在同一台機器上即可完成 airflow 的單結點部署,架構如下圖所示:

clip_image001

airflow多節點(集群)部署

在穩定性要求較高的場景,如金融交易系統中,一般采用集群、高可用的方式來部署。Apache Airflow 同樣支持集群、高可用的部署,airflow 的守護進程可分布在多台機器上運行,架構如下圖所示:

clip_image002

這樣做有以下好處

高可用

如果一個 worker 節點崩潰或離線時,集群仍可以被控制的,其他 worker 節點的任務仍會被執行。

分布式處理

如果你的工作流中有一些內存密集型的任務,任務最好是分布在多台機器上運行以便得到更快的執行。

擴展 worker 節點

水平擴展

你可以通過向集群中添加更多 worker 節點來水平地擴展集群,並使這些新節點指向同一個元數據庫,從而分發處理過程。由於 worker 不需要在任何守護進程注冊即可執行任務,因此worker 節點可以在不停機,不重啟服務下的情況進行擴展,也就是說可以隨時擴展。

垂直擴展

你可以通過增加單個 worker 節點的守護進程數來垂直擴展集群。可以通過修改 airflow 的配置文件-{AIRFLOW_HOME}/airflow.cfg 中 celeryd_concurrency 的值來實現,例如:

celeryd_concurrency = 30

#您可以根據實際情況,如集群上運行的任務性質,CPU 的內核數量等,增加並發進程的數量以滿足實際需求。

擴展 Master 節點

還可以向集群中添加更多主節點,以擴展主節點上運行的服務。您可以擴展 webserver 守護進程,以防止太多的 HTTP 請求出現在一台機器上,或者您想為 webserver 的服務提供更高的可用性。

需要注意的一點是,每次只能運行一個 scheduler 守護進程。如果您有多個 scheduler運行,那么就有可能一個任務被執行多次。這可能會導致您的工作流因重復運行而出現一些問題。

下圖為擴展 Master 節點的架構圖:

clip_image003

scheduler failover(HA)

看到這里,可能有人會問,scheduler 不能同時運行兩個,那么運行 scheduler 的節點一旦出了問題,任務不就完全不運行了嗎?

答案: 這是個非常好的問題,不過已經有解決方案了,我們可以在兩台機器上部署 scheduler ,只運行一台機器上的 scheduler 守護進程 ,一旦運行 scheduler 守護進程的機器出現故障,立刻啟動另一台機器上的 scheduler 即可。我們可以借助第三方組件 airflow-scheduler-failover-controller 實現 scheduler 的高可用。

具體步驟如下所示:

下載 failover

git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller

使用 pip 進行安裝

cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}

pip install -e .

初始化 failover

scheduler_failover_controller init

注:初始化時,會向airflow.cfg中追加內容,因此需要先安裝 airflow 並初始化。

clip_image004

更改 failover 配置

scheduler_nodes_in_cluster= host1,host2

注:host name 可以通過scheduler_failover_controller get_current_host命令獲得

配置安裝 failover 的機器之間的免密登錄,配置完成后,可以使用如下命令進行驗證:

scheduler_failover_controller test_connection

clip_image005

啟動 failover

scheduler_failover_controller start

或者:nohup scheduler_failover_controller start > /softwares/airflow/logs/scheduler_failover/scheduler_failover_run.log &

因此更健壯的架構圖如下所示:

clip_image006

airflow 集群部署的具體步驟

前提條件

節點運行的守護進程如下:

節點

運行服務

master1

webserver, scheduler
master2 webserver
worker1 worker
worker2 worker

隊列服務處於運行中. (RabbitMQ, Redis, etc)

安裝 RabbitMQ 方法參見: http://site.clairvoyantsoft.com/installing-rabbitmq/
如果正在使用 RabbitMQ, 推薦 RabbitMQ 也做成高可用的集群部署,並為 RabbitMQ 實例配置負載均衡。

步驟

  • 在所有需要運行守護進程的機器上安裝 Apache Airflow。具體安裝方法可參考airflow 安裝,部署,填坑
  • 修改 {AIRFLOW_HOME}/airflow.cfg 文件,確保所有機器使用同一份配置文件。
  • 修改 Executor 為 CeleryExecutor

       executor = CeleryExecutor

  • 指定元數據庫(metestore)

       sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

  • 設置中間人(broker)

       如果使用 RabbitMQ

       broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/

       如果使用 Redis

       broker_url = redis://{REDIS_HOST}:6379/0 #使用數據庫 0

       設定結果存儲后端 backend

       result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

       #當然您也可以使用 Redis :result_backend =redis://{REDIS_HOST}:6379/1

       #密碼認證使用:broker_url = redis://:{yourpassword}@{REDIS_HOST}:6489/db

  • 在 master1 和 master2 上部署您的工作流(DAGs)。
  • 在 master 1,初始 airflow 的元數據庫

       airflow initdb

  • 在 master1, 啟動相應的守護進程

       airflow webserver –D

       airflow scheduler -D

  • 在 master2,啟動 Web Server

       airflow webserver -D

  • 在 worker1 和 worker2 啟動 worker

       airflow worker -D

  • 使用負載均衡處理 webserver

       可以使用 nginx,AWS 等服務器處理 webserver 的負載均衡,不在此詳述

至此,所有均已集群或高可用部署,apache-airflow 系統已堅不可摧。


官方文檔如下:

Documentation: https://airflow.incubator.apache.org/

Install Documentation: https://airflow.incubator.apache.org/installation.html

GitHub Repo: https://github.com/apache/incubator-airflow


如果您覺得此文章對您有幫助,請點擊右下方【推薦】讓更多人看到,thanks!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM