Airflow本身是一個綜合平台,它兼容多種組件,所以在使用的時候有多種方案可以選擇。比如最關鍵的執行器就有四種選擇:
SequentialExecutor:單進程順序執行任務,默認執行器,通常只用於測試
LocalExecutor:多進程本地執行任務
CeleryExecutor:分布式調度,生產常用
DaskExecutor :動態任務調度,主要用於數據分析
在當前項目使用CeleryExecutor
作為執行器。
celery是一個分布式調度框架,其本身無隊列功能,需要使用第三方組件,比如redis或者rabbitmq,當前項目使用的是rabbitmq,系統整體結構如下所示:
其中:
turing為外部系統
GDags服務幫助拼接成dag
master節點webui管理dags、日志等信息
scheduler負責調度,只支持單節點
worker負責執行具體dag中的task, worker支持多節點
在整個調度系統中,節點之間的傳遞介質是消息,而消息的本質內容是執行腳本的命令,也就是說,工作節點的dag文件必須和master節點的dag文件保持一致,不然任務的執行會出問題。
隊列的使用前提就是選擇CeleryExecutor執行器
以下如內容來自airflow官網
使用CeleryExecutor時,可以指定將任務發送到的Celery隊列。隊列是BaseOperator的屬性,因此可以將任何任務分配給任何隊列。在airflow.cfg的celery-> default_queue中定義了環境的默認隊列。這定義了未指定任務時分配給其的隊列,以及Airflow的worker在啟動時偵聽的隊列。
worker可以聽一個或多個任務隊列。啟動工作程序時(使用命令airflow worker),可以指定一組用逗號分隔的隊列名稱(例如airflow worker -q spark)。然后,此worker將僅拾取連接到指定隊列的任務。
如果從資源的角度(例如非常輕巧的任務,其中一個worker可以毫無問題地執行數千個任務)或從環境的角度(您需要一個在Spark集群中運行的worker)來需要專門的worker,這可能很有用。本身,因為它需要非常具體的環境和安全權限)。http://airflow.apache.org/docs/stable/concepts.html#queues
airflow中的隊列嚴格來說不叫Queues,叫"lebal"更為合適。在operator中,可以設置queue參數如queue=spark,然后在啟動worker時:airflow worker -q spark,那么該worker只會執行spark任務。相當於節點標簽。
實踐測試
測試地址 http://118.178.129.100:8888/admin/
測試DAG:cc_ods.test
根據官網內容對code進行修改
# -*- coding: utf8 -*-
import pprint from datetime import timedelta, datetime from os import sys, path import airflow from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator reload(sys) sys.setdefaultencoding('utf-8') sys.path.append(path.dirname(path.abspath(__file__))) sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))) from utils import basic_util, send_email, sms_service, env_conf, alarm_util import hallo pp = pprint.PrettyPrinter(indent=4) str_today = (datetime.today()).strftime('%Y-%m-%d') ## Define the DAG object default_args = { 'owner': 'yifan.hu', 'depends_on_past': True, 'start_date': airflow.utils.dates.days_ago(2), 'retries': 4, 'retry_delay': timedelta(minutes=1), 'priority_weight': 10, 'execution_timeout': timedelta(minutes=360), 'queue': 'test_spark' } dag_id = basic_util.get_dag_id(path.abspath(__file__)) dag = DAG(dag_id, default_args=default_args, schedule_interval='0 0 * * *', concurrency=2) # 元數據系統生成創建表的sql文件 t_meta_create_table_statements = PythonOperator( task_id='meta_create_table_statements', python_callable=hallo.test_task1, default_args=default_args, dag=dag )
在py文件中的default_args添加了'queue': 'test_spark',給這個任務指定了queue
然后重新調起這個任務
在修改work上的隊列標志之前,這個任務一直顯示queued,因為沒有任何一個work的隊列標簽是test_spark,所以,沒有work可以去執行它,
然后在啟動worker的腳本中添加了 -q test_spark
修改完成后重啟這個節點上的worker(這里我修改的是節點test31)
重啟worker以后,任務立刻執行完畢,可以看到執行的節點就是test31(多次重復實驗均是在test31上進行)
在官網中又一段話是
worker可以聽一個或多個任務隊列。啟動工作程序時(使用命令airflow worker),可以指定一組用逗號分隔的隊列名稱(例如airflow worker -q spark)。然后,此worker將僅拾取連接到指定隊列的任務。
也就是說我們在啟動這個節點上的worker的時候,加一個參數 -q test_spark,但是在airflow.cfg中不做修改,被標志test_spark的任務將會在這個節點上執行,並且在元數據中,queue列也會顯示test_spark,而其他沒有定義隊列的task,將會按照airflow.cfg中default_queue參數配置進行,也就是說並不是這個worker打了test_spark的標簽就只能執行帶有test_spark隊列標簽的任務,只是這樣的任務一定會在這個worker執行,其他task會按照airflow.cfg中定義的進行。
下圖是元數據所示
airflow.cfg中default_queue參數配置