聲明
本文摘錄了很多前輩的文章,原文如下:
https://www.jianshu.com/p/2ecef979c606
Airflow 簡介
Airflow是一個可編程,調度和監控的工作流平台,基於有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面同樣也可以方便的管控調度任務,並且對任務運行狀態進行實時監控,方便了系統的運維和管理。
基本概念
airflow守護進程
airflow 系統在運行時有許多守護進程,它們提供了 airflow 的全部功能。守護進程包括 Web服務器-webserver、調度程序-scheduler、執行單元-worker、消息隊列監控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守護進程。
webserver
webserver 是一個守護進程,它接受 HTTP 請求,允許你通過 Python Flask Web 應用程序與 airflow 進行交互,webserver 提供以下功能:
- 中止、恢復、觸發任務。
- 監控正在運行的任務,斷點續跑任務。
- 執行 ad-hoc 命令或 SQL 語句來查詢任務的狀態,日志等詳細信息。
- 配置連接,包括不限於數據庫、ssh 的連接等。
webserver 守護進程使用 gunicorn 服務器(相當於 java 中的 tomcat )處理並發請求,可通過修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值來控制處理並發請求的進程數。
例如:
workers = 4 #表示開啟4個gunicorn worker(進程)處理web請求
啟動 webserver 守護進程:
$ airflow webserver -D
scheduler
scheduler 是一個守護進程,它周期性地輪詢任務的調度計划,以確定是否觸發任務執行。
啟動的 scheduler 守護進程:
$ airflow scheduler -D
worker
worker 是一個守護進程,它啟動 1 個或多個 Celery 的任務隊列,負責執行具體 的 DAG 任務。
當設置 airflow 的 executors 設置為 CeleryExecutor 時才需要開啟 worker 守護進程。
推薦你在生產環境使用 CeleryExecutor :
executor = CeleryExecutor
啟動一個 worker守護進程,默認的隊列名為 default:
$ airflow worker -D
flower
flower 是一個守護進程,用於是監控 celery 消息隊列。啟動守護進程命令如下:
$ airflow flower -D
默認的端口為 5555,您可以在瀏覽器地址欄中輸入 "http://hostip:5555" 來訪問 flower ,對 celery 消息隊列進行監控。
執行器(Executor)
Airflow本身是一個綜合平台,它兼容多種組件,所以在使用的時候有多種方案可以選擇。比如最關鍵的執行器就有四種選擇:
- SequentialExecutor:單進程順序執行任務,默認執行器,通常只用於測試
- LocalExecutor:多進程本地執行任務
- CeleryExecutor:分布式調度,生產常用
- DaskExecutor :動態任務調度,主要用於數據分析
生產環境建議使用CeleryExecutor作為執行器。
celery是一個分布式調度框架,其本身無隊列功能,需要使用第三方組件,比如redis或者rabbitmq。
核心概念
- DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關系組織起來,描述的是所有tasks執行順序。
- Operators:可以簡單理解為一個class,描述了DAG中某個的task具體要做的事。其中,airflow內置了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用於發送郵件,HTTPOperator 用於發送HTTP請求, SqlOperator 用於執行SQL命令等等,同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。
- Tasks:Task 是 Operator的一個實例,也就是DAGs中的一個node。
- Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態,包括"running", "success", "failed", "skipped", "up for retry"等。
- Task Relationships:DAGs中的不同Tasks之間可以有依賴關系,如 Task1 >> Task2,表明Task2依賴於Task2了。
通過將DAGs和Operators結合起來,用戶就可以創建各種復雜的 工作流(workflow)。
操作符-Operators
DAG 定義一個作業流,Operators 則定義了實際需要執行的作業。airflow 提供了許多 Operators 來指定我們需要執行的作業:
- BashOperator - 執行 bash 命令或腳本。
- SSHOperator - 執行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
- PythonOperator - 執行 Python 函數。
- EmailOperator - 發送 Email。
- HTTPOperator - 發送一個 HTTP 請求。
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執行 SQL 任務。
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator 你懂得。除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。
airflow 的守護進程是如何一起工作的?
需要注意的是 airflow的守護進程彼此之間是獨立的,他們並不相互依賴,也不相互感知。每個守護進程在運行時只處理分配到自己身上的任務,他們在一起運行時,提供了 airflow 的全部功能。
調度器 scheduler 會間隔性的去輪詢元數據庫(Metastore)已注冊的 DAG(有向無環圖,可理解為作業流)是否需要被執行。如果一個具體的 DAG 根據其調度計划需要被執行,scheduler 守護進程就會先在元數據庫創建一個 DagRun 的實例,並觸發 DAG 內部的具體 task(任務,可以這樣理解:DAG 包含一個或多個task),觸發其實並不是真正的去執行任務,而是推送 task 消息至消息隊列(即 broker)中,每一個 task 消息都包含此 task 的 DAG ID,task ID,及具體需要被執行的函數。如果 task 是要執行 bash 腳本,那么 task 消息還會包含 bash 腳本的代碼。
用戶可能在 webserver 上來控制 DAG,比如手動觸發一個 DAG 去執行。當用戶這樣做的時候,一個DagRun 的實例將在元數據庫被創建,scheduler 使同 #1 一樣的方法去觸發 DAG 中具體的 task 。
worker 守護進程將會監聽消息隊列,如果有消息就從消息隊列中取出消息,當取出任務消息時,它會更新元數據中的 DagRun 實例的狀態為正在運行,並嘗試執行 DAG 中的 task,如果 DAG 執行成功,則更新任 DagRun 實例的狀態為成功,否則更新狀態為失敗。
airflow單節點部署
將以上所有守護進程運行在同一台機器上即可完成 airflow 的單結點部署,架構如下圖所示:
airflow多節點(集群)部署
在穩定性要求較高的場景,如金融交易系統中,一般采用集群、高可用的方式來部署。Apache Airflow 同樣支持集群、高可用的部署,airflow 的守護進程可分布在多台機器上運行,架構如下圖所示:
這樣做有以下好處
高可用
如果一個 worker 節點崩潰或離線時,集群仍可以被控制的,其他 worker 節點的任務仍會被執行。
分布式處理
如果你的工作流中有一些內存密集型的任務,任務最好是分布在多台機器上運行以便得到更快的執行。
擴展 worker 節點
水平擴展
你可以通過向集群中添加更多 worker 節點來水平地擴展集群,並使這些新節點指向同一個元數據庫,從而分發處理過程。由於 worker 不需要在任何守護進程注冊即可執行任務,因此worker 節點可以在不停機,不重啟服務下的情況進行擴展,也就是說可以隨時擴展。
垂直擴展
你可以通過增加單個 worker 節點的守護進程數來垂直擴展集群。可以通過修改 airflow 的配置文件-{AIRFLOW_HOME}/airflow.cfg 中 celeryd_concurrency 的值來實現,例如:
celeryd_concurrency = 30
#您可以根據實際情況,如集群上運行的任務性質,CPU 的內核數量等,增加並發進程的數量以滿足實際需求。
擴展 Master 節點
還可以向集群中添加更多主節點,以擴展主節點上運行的服務。您可以擴展 webserver 守護進程,以防止太多的 HTTP 請求出現在一台機器上,或者您想為 webserver 的服務提供更高的可用性。
需要注意的一點是,每次只能運行一個 scheduler 守護進程。如果您有多個 scheduler運行,那么就有可能一個任務被執行多次。這可能會導致您的工作流因重復運行而出現一些問題。
下圖為擴展 Master 節點的架構圖:
scheduler failover(HA)
看到這里,可能有人會問,scheduler 不能同時運行兩個,那么運行 scheduler 的節點一旦出了問題,任務不就完全不運行了嗎?
答案: 這是個非常好的問題,不過已經有解決方案了,我們可以在兩台機器上部署 scheduler ,只運行一台機器上的 scheduler 守護進程 ,一旦運行 scheduler 守護進程的機器出現故障,立刻啟動另一台機器上的 scheduler 即可。我們可以借助第三方組件 airflow-scheduler-failover-controller 實現 scheduler 的高可用。
具體步驟如下所示:
下載 failover
git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
使用 pip 進行安裝
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
pip install -e .
初始化 failover
scheduler_failover_controller init
注:初始化時,會向airflow.cfg中追加內容,因此需要先安裝 airflow 並初始化。
更改 failover 配置
scheduler_nodes_in_cluster= host1,host2
注:host name 可以通過scheduler_failover_controller get_current_host命令獲得
配置安裝 failover 的機器之間的免密登錄,配置完成后,可以使用如下命令進行驗證:
scheduler_failover_controller test_connection
啟動 failover
scheduler_failover_controller start
或者:nohup scheduler_failover_controller start > /softwares/airflow/logs/scheduler_failover/scheduler_failover_run.log &
因此更健壯的架構圖如下所示:
airflow 集群部署的具體步驟
前提條件
節點運行的守護進程如下:
節點 | 運行服務 |
master1 |
webserver, scheduler |
master2 | webserver |
worker1 | worker |
worker2 | worker |
隊列服務處於運行中. (RabbitMQ, Redis, etc)
安裝 RabbitMQ 方法參見: http://site.clairvoyantsoft.com/installing-rabbitmq/
如果正在使用 RabbitMQ, 推薦 RabbitMQ 也做成高可用的集群部署,並為 RabbitMQ 實例配置負載均衡。
步驟
- 在所有需要運行守護進程的機器上安裝 Apache Airflow。具體安裝方法可參考airflow 安裝,部署,填坑
- 修改 {AIRFLOW_HOME}/airflow.cfg 文件,確保所有機器使用同一份配置文件。
- 修改 Executor 為 CeleryExecutor
executor = CeleryExecutor
- 指定元數據庫(metestore)
sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
- 設置中間人(broker)
如果使用 RabbitMQ
broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/
如果使用 Redis
broker_url = redis://{REDIS_HOST}:6379/0 #使用數據庫 0
設定結果存儲后端 backend
result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
#當然您也可以使用 Redis :result_backend =redis://{REDIS_HOST}:6379/1
#密碼認證使用:broker_url = redis://:{yourpassword}@{REDIS_HOST}:6489/db
- 在 master1 和 master2 上部署您的工作流(DAGs)。
- 在 master 1,初始 airflow 的元數據庫
airflow initdb
- 在 master1, 啟動相應的守護進程
airflow webserver –D
airflow scheduler -D
- 在 master2,啟動 Web Server
airflow webserver -D
- 在 worker1 和 worker2 啟動 worker
airflow worker -D
- 使用負載均衡處理 webserver
可以使用 nginx,AWS 等服務器處理 webserver 的負載均衡,不在此詳述
至此,所有均已集群或高可用部署,apache-airflow 系統已堅不可摧。
官方文檔如下:
Documentation: https://airflow.incubator.apache.org/
Install Documentation: https://airflow.incubator.apache.org/installation.html
GitHub Repo: https://github.com/apache/incubator-airflow
如果您覺得此文章對您有幫助,請點擊右下方【推薦】讓更多人看到,thanks!