Airflow 入門及使用

什么是 Airflow?

Airflow 是一個使用 python 語言編寫的 data pipeline 調度和監控工作流的平台。 Airflow 是通過 DAG(Directed acyclic graph 有向無環圖)來管理任務流程的任務調度工具, 不需要知道業務數據的具體內容,設置任務的依賴關系即可實現任務調度。

這個平台擁有和 Hive、Presto、MySQL、HDFS、Postgres 等數據源之間交互的能力,並且提供了鈎子(hook)使其擁有很好地擴展性。 除了使用命令行,該工具還提供了一個 WebUI 可以可視化的查看依賴關系、監控進度、觸發任務等。

Airflow 的架構

在一個可擴展的生產環境中,Airflow 含有以下組件:

  • 元數據庫:這個數據庫存儲有關任務狀態的信息。
  • 調度器:Scheduler 是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。調度器通常作為服務運行。
  • 執行器:Executor 是一個消息隊列進程,它被綁定到調度器中,用於確定實際執行每個任務計划的工作進程。有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。例如,LocalExecutor 使用與調度器進程在同一台機器上運行的並行進程執行任務。 其他像 CeleryExecutor 的執行器使用存在於獨立的工作機器集群中的工作進程執行任務。
  • Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器確定。
                                       

Airflow 解決哪些問題

通常,在一個運維系統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於:

  • 時間依賴:任務需要等待某一個時間點觸發。
  • 外部系統依賴:任務依賴外部系統需要調用接口去訪問。
  • 任務間依賴:任務 A 需要在任務 B 完成后啟動,兩個任務互相間會產生影響。 資源環境依賴:任務消耗資源非常多, 或者只能在特定的機器上執行。

crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。

Airflow 是一種 WMS,即:它將任務以及它們的依賴看作代碼,按照那些計划規范任務執行,並在實際工作進程之間分發需執行的任務。 Airflow 提供了一個用於顯示當前活動任務和過去任務狀態的優秀 UI,並允許用戶手動管理任務的執行和狀態。

Airflow 中的工作流是具有方向性依賴的任務集合。 具體說就是 Airflow 的核心概念 DAG(有向無環圖) —— 來表現工作流。 dag中的每個節點都是一個任務,dag中的邊表示的是任務之間的依賴(強制為有向無環,因此不會出現循環依賴,從而導致無限執行循環)。

Airflow 在 ETL 上的實踐

ETL,是英文 Extract,Transform,Load 的縮寫,用來描述將數據從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。 ETL 一詞較常用在數據倉庫,Airflow 在解決 ETL 任務各種依賴問題上的能力恰恰是我們所需要的。

在現階段的實踐中,我們使用 Airflow 來同步各個數據源數據到數倉,同時定時執行一些批處理任務及帶有數據依賴、資源依賴關系的計算腳本。

本文立意於科普介紹,故在后面的用例中只介紹了 BashOperatorPythonOperator 這倆個最為易用且在我們日常使用中最為常見的 operator。

Airflow 同時也具有不錯的集群擴展能力,可使用 CeleryExecuter 以及多個 Pool 來提高任務並發度。

Airflow 在 CeleryExecuter 下可以使用不同的用戶啟動 Worker,不同的 Worker 監聽不同的 Queue,這樣可以解決用戶權限依賴問題。 Worker 也可以啟動在多個不同的機器上,解決機器依賴的問題。

Airflow 可以為任意一個 Task 指定一個抽象的 Pool,每個 Pool 可以指定一個 Slot 數。 每當一個 Task 啟動時,就占用一個 Slot,當 Slot 數占滿時,其余的任務就處於等待狀態。這樣就解決了資源依賴問題。

Airflow 安裝及初始化

假設:你已經安裝好了 python 及配置好了其包管理工具 pip

pip install apache-airflow

# 初始化數據庫
airflow initdb

# 上面的命令默認在家目錄下創建 airflow 文件夾和相關配置文件
# 也可以使用以下命令來指定目錄
export AIRFLOW_HOME={yourpath}/airflow

# 配置數據庫
# vim airflow/airflow.cfg
# 修改 sql_alchemy_conn

# 守護進程運行webserver, 默認端口為8080,也可以通過`-p`來指定端口
airflow webserver -D  

# 守護進程運行調度器     
airflow scheduler -D

定義第一個DAG

在 AIRFLOW_HOME 目錄下新建 dags 文件夾,后面的所有 dag 文件都要存儲在這個目錄。

新建 demo.py,語句含義見注釋

from datetime import datetime, timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator


def default_options():
    default_args = {
        'owner': 'airflow',  # 擁有者名稱
        'start_date': days_ago(1),  # 第一次開始執行的時間,為 UTC 時間(注意不要設置為當前時間)
        'retries': 1,  # 失敗重試次數
        'retry_delay': timedelta(seconds=5)  # 失敗重試間隔
    }
    return default_args


# 定義DAG
def test1(dag):
    t = "echo 'hallo world'"
    # operator 支持多種類型, 這里使用 BashOperator
    task = BashOperator(
        task_id='test1',  # task_id
        bash_command=t,  # 指定要執行的命令
        dag=dag  # 指定歸屬的dag
    )
    return task


def hello_world_1():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))


def test2(dag):
    # PythonOperator
    task = PythonOperator(
        task_id='test2',
        python_callable=hello_world_1,  # 指定要執行的函數
        dag=dag)
    return task


def test3(dag):
    # DummyOperator
    task = DummyOperator(
        task_id='test3',
        dag=dag)
    return task


with DAG(
        'test_task',  # dag_id
        default_args=default_options(),  # 指定默認參數
        schedule_interval="@once"  # 執行周期
) as d:
    task1 = test1(d)
    task2 = test2(d)
    task3 = test3(d)
    task1 >> task2 >> task3  # 指定執行順序

寫完后執行 python $AIRFLOW_HOME/dags/demo.py 檢查是否有錯誤,如果命令行沒有報錯,就表示沒問題。

Web UI

打開 localhost:8080

主視圖

Airflow 的 WebUI 是其任務調度可視化的體現,可以再這個 WebUI 上監控幾乎所有任務調度運行的實時及歷史數據。 一些命令如 TriggerClear 均可以在 WebUI 上完成;一些全局參數也可以在主頁面導航欄 Admin 下配置。

點擊 dag_name, 進入任務預覽:

任務圖視

 

 

 
任務樹視圖

 

 

其他常用命令

# 測試任務,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10

# 查看生效的 dags
airflow list_dags -sd $AIRFLOW_HOME/dags

# 開始運行任務(同web界面點trigger按鈕)
airflow trigger_dag test_task    

# 暫停任務
airflow pause dag_id      

# 取消暫停,等同於在web管理界面打開off按鈕
airflow unpause dag_id     

# 查看task列表
airflow list_tasks dag_id  查看task列表

# 清空任務狀態
airflow clear dag_id       

# 運行task
airflow run dag_id task_id execution_date

Airflow 核心原理分析

概念及發展

  • JOB:最上層的工作。分為 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 創建,BackfillJob 由 Backfill 創建,LocalTaskJob 由前面兩種 Job 創建。
  • DAG:有向無環圖,用來表示工作流。
  • DAG Run:工作流實例,表示某個工作流的一次運行(狀態)。
  • Task:任務,工作流的基本組成部分。
  • TaskInstance:任務實例,表示某個任務的一次運行(狀態)。

在早期版本 Airflow 中,DAG 執行主要有兩種完全獨立的執行途徑:SchedulerJob 和 BackfillJob。在一次較大的重構中增加了 DagRun 方式,以跟蹤 DAG 的執行狀態。

 

 

 

結構關

DagRun 執行流程描述

DagRuns 表示某個時間點 DAG 的狀態(也稱為 DagInstances)。 要運行 DAG 或管理 DAG 的執行,必須首先創建一個 DagRun 實例。 但是僅創建 DagRun 不足以實際運行 DAG(就像創建 TaskInstance 與實際運行任務並不一樣)。 因此需要一種機制來實現上述流程

刷新 dags

  • 收集新的 DagRuns
  • 執行 DagRuns(包括更新 DagRuns 的狀態為成功或失敗)
  • 喚醒 executor/心跳檢查

Scheduler 的調度邏輯

調度器實際上就是一個 airflow.jobs.SchedulerJob 實例 job 持續運行 run 方法。job.run() 在開始時將自身的信息加入到 job 表中,並維護狀態和心跳,預期能夠正常結束,將結束時間也更新到表中。 但是實際上往往因為異常中斷,導致結束時間為空。不管是如何進行的退出,SchedulerJob 退出時會關閉所有子進程。

這里主要介紹下 Scheduler 的調度邏輯:

  • 遍歷 dags 路徑下的所有 dag 文件, 啟動一定數量的進程(進程池),並且給每個進程指派一個 dag 文件。 每個 DagFileProcessor 解析分配給它的dag文件,並根據解析結果在 DB中創建 DagRuns 和 TaskInstance。
  • 在 scheduler_loop 中,檢查與活動 DagRun 關聯的 TaskInstance 的狀態,解析 TaskInstance 之間的任何依賴,標識需要被執行的 TaskInstance,然后將它們添加至 executor 隊列,將新排列的 TaskInstance 狀態更新為QUEUED狀態。
  • 每個可用的 executor 從隊列中取一個 TaskInstance,然后開始執行它,將此 TaskInstance 的數據庫記錄更新為SCHEDULED
  • 當一個 TaskInstance 完成運行,關聯的 executor 就會報告到隊列並更新數據庫中的 TaskInstance 的狀態(例如“完成”、“失敗”等)。
  • 一旦所有的dag處理完畢后,就會進行下一輪循環處理。這里還有一個細節就是上一輪的某個dag的處理時間可能很長,導致到下一輪處理的時候這個dag還沒有處理完成。 Airflow 的處理邏輯是在這一輪不為這個dag創建進程,這樣就不會阻塞進程去處理其余dag。

Scheduler 模塊代碼結構

DagFileProcessor 在子進程中解析 DAG 定義文件。對於發現的 DAG,檢查 DagRun 和 TaskInstance 的狀態。如果有 TaskInstance 可以運行,將狀態標記為 SCHEDULED。 為每個 dag 文件分配一個進程,同時在 DagFileProcessorManager 中保存有 dag 和 processor的映射表。在 dag 沒有被任何 processor 處理的時候,才會給它創建新的處理進程。

DagFileProcessorManager 控制 DagFileProcessors 如何啟動。它追蹤哪些文件應該被處理並且確保一旦有一個 DagFileProcessor 完成解析,下一個 dag 文件應該得到處理。並且控制 DagFileProcessors 的數量。

SchedulerJob 通過 agent 獲取 manager 的 DAG 定義文件解析結果,並且將 SCHEDULED 狀態的 TaskInstance 發送給executor執行。

DagFileProcessorAgent 作為一個采集代理,scheduler可以借助 agent 獲取 manager 獲取到的 DAG 解析結果,並且可以控制 manager 的行為。

核心類分析

1.Dag

method

  • following_schedule() 計算當前dag的下一次調度時間
  • previous_schedule() 計算當前dag的上一次調度時間
  • get_dagrun() 返回給定執行日期的dagrun(如果存在)
  • create_dagrun() 創建一個包括與此dag相關任務的dagrun
  • ckear() 清除指定日期范圍內與當前dag相關的一組任務實例
  • run() 實例化為 BackfillJob 同時調用job.run()

2.DagRun

model

ID_PREFIX = 'scheduled__'
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'

id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(UtcDateTime, default=timezone.utcnow)
start_date = Column(UtcDateTime, default=timezone.utcnow)
end_date = Column(UtcDateTime)
_state = Column('state', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=True)
conf = Column(PickleType)

method

  • get_dag() 返回與當前 DagRun 相關的 Dag
  • get_task_instances() 返回與當前 DagRun 的所有 TaskInstances
  • update_state() 根據 TaskInstances 的狀態確定 DagRun 的總體狀態
  • get_latest_runs() 返回每個 Dag 的最新一次 DagRun

3.TaskInstance

model

__tablename__ = "task_instance"

task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
duration = Column(Float)
state = Column(String(20))
_try_number = Column('try_number', Integer, default=0)
max_tries = Column(Integer)
hostname = Column(String(1000))
unixname = Column(String(1000))
job_id = Column(Integer)
pool = Column(String(50), nullable=False)
queue = Column(String(256))
priority_weight = Column(Integer)
operator = Column(String(1000))
queued_dttm = Column(UtcDateTime)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))

method

  • get_dagrun() 返回當前 TaskInstance 的 DagRun
  • run() TaskInstance run
  • get_template_context() 通過 Jinja2 模板獲取上下文
  • xcom_push() 創建一個 XCom 可用於task發送參數
  • xcom_pull() 創建一個 XCom 可用於task接收參數

4.SchedulerJob

def _execute(self):
    """
    The actual scheduler loop. The main steps in the loop are:
        #. Harvest DAG parsing results through DagFileProcessorAgent
        #. Find and queue executable tasks
            #. Change task instance state in DB
            #. Queue tasks in executor
        #. Heartbeat executor
            #. Execute queued tasks in executor ake_aware(execution_date,
                                                     self.task.dag.timezone)
    """
    self.processor_agent = DagFileProcessorAgent()  # 通過檢查當前processor數量來控制進程個數
    self.executor.start()

    # Start after resetting orphaned tasks to avoid stressing out DB.
    self.processor_agent.start()  # 在解析dag文件時,只會對最近修改過的文件進行解析
    execute_start_time = timezone.utcnow()

    # For the execute duration, parse and schedule DAGs
    while (timezone.utcnow() - execute_start_time).total_seconds() < \
            self.run_duration or self.run_duration < 0:
        # Starting Loop...

        self.processor_agent.heartbeat()  # 控制 DagFileProcessor 解析 DAG 文件的速度

        # Harvesting DAG parsing results
        simple_dags = self.processor_agent.harvest_simple_dags()

        if len(simple_dags) > 0:
            self._execute_task_instances()
        ...

        # Call heartbeats
        self.executor.heartbeat()
        # heartbeat()中根據parallelism得出當前可用的slots數量,
        # 決定execute_async多少個task

        # Process events from the executor
        self._process_executor_events(simple_dag_bag)

        # Ran scheduling loop for all tasks done
        ...

    # Stop any processors
    self.processor_agent.terminate()

    # Verify that all files were processed, and if so, deactivate DAGs that
    # haven't been touched by the scheduler as they likely have been
    # deleted.
    ...

    self.executor.end()

method

  • create_dag_run() 根據調度周期檢查是否需要為 DAG 創建新的 DagRun。如果已調度,則返回 DagRun,否則返回 None
  • process_file() 解析 DAG 定義文件
  • _execute_task_instances() 嘗試執行調度器調度過的 TaskInstances TaskInstances in the executor.
  • reduce_in_chunks() 用來進行小的分批處理

總結

本文在第一部分着重介紹了 Airflow 的理念、使用場景及其一般架構。 提供了相對簡單易懂的安裝及操作命令,並附帶了一個使用案例用來介紹代碼如何編排以及 WebUI 的使用。

在第二部分開篇介紹了 Airflow 任務創建、調度和管理的一些基礎概念,以及 Airflow 版本迭代的一些重要變化。 Airflow 目前還是處於快速開發中,當前版本有很多遺留問題,版本升級也不是向后兼容的,變動很大。

Scheduler 毫無疑問是整個 Airflow 的核心模塊,邏輯結構復雜。 本文從 Scheduler 模塊的主要邏輯入手,分析了控制循環和代碼結構,重點分析了從 dag.py 代碼文件到可調度執行的 TaskInstances 所經歷的階段; 以及介紹了並發控制的實現和性能優化。

最后結合源碼介紹了 Airflow 核心類的模型定義和主要方法,以了解各個類所扮演的角色及其實現的功能。

 

參考 : https://zhuanlan.zhihu.com/p/90282578?utm_source=ZHShareTargetIDMore