官網:
http://airflow.apache.org/installation.html
原理:
https://www.cnblogs.com/cord/p/9450910.html
原理介紹:
DAG:有向無環圖,有方向但沒有循環
airflow 的守護進程
airflow 系統在運行時有許多守護進程,它們提供了 airflow 的全部功能。守護進程包括 Web服務器-webserver、調度程序-scheduler、執行單元-worker、消息隊列監控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守護進程。
webserver
webserver 是一個守護進程,它接受 HTTP 請求,允許你通過 Python Flask Web 應用程序與 airflow 進行交互,webserver 提供以下功能: 中止、恢復、觸發任務。 監控正在運行的任務,斷點續跑任務。 執行 ad-hoc 命令或 SQL 語句來查詢任務的狀態,日志等詳細信息。 配置連接,包括不限於數據庫、ssh 的連接等。
webserver 守護進程使用 gunicorn 服務器(相當於 java 中的 tomcat )處理並發請求,可通過修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值來控制處理並發請求的進程數。 例如:
workers = 4 #表示開啟4個gunicorn worker(進程)處理web請求
啟動 webserver 守護進程:
$ airfow webserver -D
scheduler
scheduler 是一個守護進程,它周期性地輪詢任務的調度計划,以確定是否觸發任務執行。 啟動的 scheduler 守護進程:
$ airfow scheduler -D
worker
worker 是一個守護進程,它啟動 1 個或多個 Celery 的任務隊列,負責執行具體 的 DAG 任務。
當設置 airflow 的 executors 設置為 CeleryExecutor 時才需要開啟 worker 守護進程。推薦你在生產環境使用 CeleryExecutor :
executor = CeleryExecutor
啟動一個 worker守護進程,默認的隊列名為 default:
$ airfow worker -D
flower
flower 是一個守護進程,用於是監控 celery 消息隊列。啟動守護進程命令如下:
$ airflow flower -D
默認的端口為 5555,您可以在瀏覽器地址欄中輸入 "http://hostip:5555" 來訪問 flower ,對 celery 消息隊列進行監控。
這個監控隊列里有多少任務被schudle吊啟, worker去消費隊列里的任務
並發控制參數解釋:
parallelism:這個參數指定了整個Airflow系統,在任何一刻能同時運行的Task Instance的數量,這個數量跟DAG無關,只跟Executor和Task有關。舉個例子:如果parallelism=15, 這時你有兩個DAG,A和B,如果A需要同時開跑10個Task,B也要同時開跑10個Task,兩個DAG同時觸發,那么這時候同時在跑的Task數量只能是15,其余的5個會等之前的Task運行完了觸發,這時的狀態不會顯示在web上。而且在這種情況下,觸發的順序是不確定的。
concurrency :這個用來控制 每個dag運行過程中最大可同時運行的task實例數。如果你沒有設置這個值的話,scheduler 會從airflow.cfg里面讀取默認值 dag_concurrency
max_active_runs : 這個是用來控制在同一時間可以運行的最多的dag runs 數量。這里需要解釋一下dag runs ,比如你的dag設置的每天運行,那么在天的時間段內運行某個dag就算是一個dag runs 。按道理每天只會執行一次,但是保不齊,你前天和大前天的dag都沒運行,那么就需要補跑,或者你在某一次定時dag觸發了之后,又手動觸發了,那么就存在,同一個時間點有多個dag runs 。這個參數就是控制這個最大的dag runs
sql_alchemy_pool_size: 默認使用連接數據庫的最大連接,設置為0表示沒有限制
worker_concurrency = 80 #所有works的總task數
parallelism = 160 #所有DAG的總task數
#同時運行的schedule的線程:
當定義的dag文件過多的時候,airflow的scheduler節點運行效率緩慢
[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默認是2這里改為100
max_threads = 20
查看:
ps -ef |grep schedu