airflow原理


官網:

http://airflow.apache.org/installation.html

原理:

https://www.cnblogs.com/cord/p/9450910.html

原理介紹:

DAG:有向無環圖,有方向但沒有循環

airflow 的守護進程
airflow 系統在運行時有許多守護進程,它們提供了 airflow 的全部功能。守護進程包括 Web服務器-webserver、調度程序-scheduler、執行單元-worker、消息隊列監控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守護進程。

webserver
webserver 是一個守護進程,它接受 HTTP 請求,允許你通過 Python Flask Web 應用程序與 airflow 進行交互,webserver 提供以下功能: 中止、恢復、觸發任務。 監控正在運行的任務,斷點續跑任務。 執行 ad-hoc 命令或 SQL 語句來查詢任務的狀態,日志等詳細信息。 配置連接,包括不限於數據庫、ssh 的連接等。

webserver 守護進程使用 gunicorn 服務器(相當於 java 中的 tomcat )處理並發請求,可通過修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值來控制處理並發請求的進程數。 例如:

workers = 4 #表示開啟4個gunicorn worker(進程)處理web請求
啟動 webserver 守護進程:

$ airfow webserver -D
scheduler
scheduler 是一個守護進程,它周期性地輪詢任務的調度計划,以確定是否觸發任務執行。 啟動的 scheduler 守護進程:

$ airfow scheduler -D
worker
worker 是一個守護進程,它啟動 1 個或多個 Celery 的任務隊列,負責執行具體 的 DAG 任務。

當設置 airflow 的 executors 設置為 CeleryExecutor 時才需要開啟 worker 守護進程。推薦你在生產環境使用 CeleryExecutor :

executor = CeleryExecutor
啟動一個 worker守護進程,默認的隊列名為 default:

$ airfow worker -D
flower
flower 是一個守護進程,用於是監控 celery 消息隊列。啟動守護進程命令如下:

$ airflow flower -D
默認的端口為 5555,您可以在瀏覽器地址欄中輸入 "http://hostip:5555" 來訪問 flower ,對 celery 消息隊列進行監控。

這個監控隊列里有多少任務被schudle吊啟, worker去消費隊列里的任務

 

並發控制參數解釋:

parallelism:這個參數指定了整個Airflow系統,在任何一刻能同時運行的Task Instance的數量,這個數量跟DAG無關,只跟Executor和Task有關。舉個例子:如果parallelism=15, 這時你有兩個DAG,A和B,如果A需要同時開跑10個Task,B也要同時開跑10個Task,兩個DAG同時觸發,那么這時候同時在跑的Task數量只能是15,其余的5個會等之前的Task運行完了觸發,這時的狀態不會顯示在web上。而且在這種情況下,觸發的順序是不確定的。

concurrency :這個用來控制 每個dag運行過程中最大可同時運行的task實例數。如果你沒有設置這個值的話,scheduler 會從airflow.cfg里面讀取默認值 dag_concurrency

max_active_runs : 這個是用來控制在同一時間可以運行的最多的dag runs 數量。這里需要解釋一下dag runs ,比如你的dag設置的每天運行,那么在天的時間段內運行某個dag就算是一個dag runs 。按道理每天只會執行一次,但是保不齊,你前天和大前天的dag都沒運行,那么就需要補跑,或者你在某一次定時dag觸發了之后,又手動觸發了,那么就存在,同一個時間點有多個dag runs 。這個參數就是控制這個最大的dag runs

sql_alchemy_pool_size:  默認使用連接數據庫的最大連接,設置為0表示沒有限制

 

worker_concurrency = 80  #所有works的總task數
parallelism = 160               #所有DAG的總task數

 

#同時運行的schedule的線程: 

當定義的dag文件過多的時候,airflow的scheduler節點運行效率緩慢

[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默認是2這里改為100
max_threads = 20

查看:

 ps -ef |grep schedu

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM