airfkow的queue(隊列)的使用


Airflow本身是一個綜合平台,它兼容多種組件,所以在使用的時候有多種方案可以選擇。比如最關鍵的執行器就有四種選擇:

SequentialExecutor:單進程順序執行任務,默認執行器,通常只用於測試

LocalExecutor:多進程本地執行任務

CeleryExecutor:分布式調度,生產常用

DaskExecutor :動態任務調度,主要用於數據分析

在當前項目使用CeleryExecutor作為執行器。

celery是一個分布式調度框架,其本身無隊列功能,需要使用第三方組件,比如redis或者rabbitmq,當前項目使用的是rabbitmq,系統整體結構如下所示:

其中:

turing為外部系統

GDags服務幫助拼接成dag

master節點webui管理dags、日志等信息

scheduler負責調度,只支持單節點

worker負責執行具體dag中的task, worker支持多節點

在整個調度系統中,節點之間的傳遞介質是消息,而消息的本質內容是執行腳本的命令,也就是說,工作節點的dag文件必須和master節點的dag文件保持一致,不然任務的執行會出問題。

隊列的使用前提就是選擇CeleryExecutor執行器

以下如內容來自airflow官網

使用CeleryExecutor時,可以指定將任務發送到的Celery隊列。隊列是BaseOperator的屬性,因此可以將任何任務分配給任何隊列。在airflow.cfg的celery-> default_queue中定義了環境的默認隊列。這定義了未指定任務時分配給其的隊列,以及Airflow的worker在啟動時偵聽的隊列。

worker可以聽一個或多個任務隊列。啟動工作程序時(使用命令airflow worker),可以指定一組用逗號分隔的隊列名稱(例如airflow worker -q spark)。然后,此worker將僅拾取連接到指定隊列的任務。

如果從資源的角度(例如非常輕巧的任務,其中一個worker可以毫無問題地執行數千個任務)或從環境的角度(您需要一個在Spark集群中運行的worker)來需要專門的worker,這可能很有用。本身,因為它需要非常具體的環境和安全權限)。http://airflow.apache.org/docs/stable/concepts.html#queues

 

 

airflow中的隊列嚴格來說不叫Queues,叫"lebal"更為合適。在operator中,可以設置queue參數如queue=spark,然后在啟動worker時:airflow worker -q spark,那么該worker只會執行spark任務。相當於節點標簽。

實踐測試

測試地址 http://118.178.129.100:8888/admin/

測試DAG:cc_ods.test

 

根據官網內容對code進行修改

# -*- coding: utf8 -*-
import pprint from datetime import timedelta, datetime from os import sys, path import airflow from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator reload(sys) sys.setdefaultencoding('utf-8') sys.path.append(path.dirname(path.abspath(__file__))) sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))) from utils import basic_util, send_email, sms_service, env_conf, alarm_util import hallo pp = pprint.PrettyPrinter(indent=4) str_today = (datetime.today()).strftime('%Y-%m-%d') ## Define the DAG object default_args = { 'owner': 'yifan.hu', 'depends_on_past': True, 'start_date': airflow.utils.dates.days_ago(2), 'retries': 4, 'retry_delay': timedelta(minutes=1), 'priority_weight': 10, 'execution_timeout': timedelta(minutes=360), 'queue': 'test_spark' } dag_id = basic_util.get_dag_id(path.abspath(__file__)) dag = DAG(dag_id, default_args=default_args, schedule_interval='0 0 * * *', concurrency=2) # 元數據系統生成創建表的sql文件 t_meta_create_table_statements = PythonOperator( task_id='meta_create_table_statements', python_callable=hallo.test_task1, default_args=default_args, dag=dag )

在py文件中的default_args添加了'queue''test_spark',給這個任務指定了queue

然后重新調起這個任務

 

在修改work上的隊列標志之前,這個任務一直顯示queued,因為沒有任何一個work的隊列標簽是test_spark,所以,沒有work可以去執行它,

然后在啟動worker的腳本中添加了 -q test_spark

 

修改完成后重啟這個節點上的worker(這里我修改的是節點test31)

重啟worker以后,任務立刻執行完畢,可以看到執行的節點就是test31(多次重復實驗均是在test31上進行)

 

 

在官網中又一段話是

worker可以聽一個或多個任務隊列。啟動工作程序時(使用命令airflow worker),可以指定一組用逗號分隔的隊列名稱(例如airflow worker -q spark)。然后,此worker將僅拾取連接到指定隊列的任務。

也就是說我們在啟動這個節點上的worker的時候,加一個參數 -q test_spark,但是在airflow.cfg中不做修改,被標志test_spark的任務將會在這個節點上執行,並且在元數據中,queue列也會顯示test_spark,而其他沒有定義隊列的task,將會按照airflow.cfg中default_queue參數配置進行,也就是說並不是這個worker打了test_spark的標簽就只能執行帶有test_spark隊列標簽的任務,只是這樣的任務一定會在這個worker執行,其他task會按照airflow.cfg中定義的進行。

下圖是元數據所示

 

airflow.cfg中default_queue參數配置


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM