Airflow的第一個DAG
考慮了很久,要不要記錄airflow相關的東西, 應該怎么記錄. 官方文檔已經有比較詳細的介紹了,還有各種博客,我需要有一份自己的筆記嗎?
答案就從本文開始了.
本文將從一個陌生視角開始認知airflow,順帶勾勒出應該如何一步步搭建我們的數據調度系統.
現在是9102年9月上旬, Airflow最近的一個版本是1.10.5.
ps. 查資料發現自己好多文章被爬走,換了作者.所以,接下里的內容會隨機添加一些防偽標識,忽略即可.
什么數據調度系統?
中台這個概念最近比較火, 其中就有一個叫做數據中台, 文章數據中台到底是什么給出了一個概念.
我粗糙的理解, 大概就是: 收集各個零散的數據,標准化,然后服務化, 提供統一數據服務. 而要做到數據整理和處理,必然涉及數據調度,也就需要一個調度系統.[本文出自Ryan Miao]
數據調度系統可以將不同的異構數據互相同步,可以按照規划去執行數據處理和任務調度. Airflow就是這樣的一個任務調度平台.
前面Airflow1.10.4介紹與安裝已經
安裝好了我們的airflow, 可以直接使用了. 這是第一個DAG任務鏈.
創建一個任務Hello World
目標: 每天早上8點執行一個任務--打印
Hello World
在Linux上,我們可以在crontab插入一條記錄:
使用Springboot, 我們可以使用@Scheduled(cron="0 0 8 * * ?")
來定時執行一個method.
使用quartz, 我們可以創建一個CronTrigger
, 然后去執行對應的JobDetail.
CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 8 * * ?"))
.build();
使用Airflow, 也差不多類似.
在docker-airflow中,我們將dag掛載成磁盤,現在只需要在dag目錄下編寫dag即可.
volumes:
- ./dags:/usr/local/airflow/dags
創建一個hello.py
"""
Airflow的第一個DAG
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
"owner": "ryan.miao",
"start_date": datetime(2019, 9, 1)
}
dag = DAG("Hello-World",
description="第一個DAG",
default_args=default_args,
schedule_interval='0 8 * * *')
t1 = BashOperator(task_id="hello", bash_command="echo 'Hello World, today is {{ ds }}'", dag=dag)
這是一個Python腳本, 主要定義了兩個變量.
DAG
表示一個有向無環圖,一個任務鏈, 其id全局唯一. DAG是airflow的核心概念, 任務裝載到dag中, 封裝成任務依賴鏈條. DAG決定這些任務的執行規則,比如執行時間.這里設置為從9月1號開始,每天8點執行.
TASK
task表示具體的一個任務,其id在dag內唯一. task有不同的種類,通過各種Operator插件來區分任務類型. 這里是一個BashOperator, 來自airflow自帶的插件, airflow自帶了很多拆箱即用的插件.
ds
airflow內置的時間變量模板, 在渲染operator的時候,會注入一個當前執行日期的字符串. 后面會專門講解這個執行日期.
[本文出自Ryan Miao]
部署dag
將上述hello.py
上傳到dag目錄, airflow會自動檢測文件變化, 然后解析py文件,導入dag定義到數據庫.
訪問airflow地址,刷新即可看到我們的dag.
開啟dag, 進入dag定義, 可以看到已經執行了昨天的任務.
點擊任務實例, 點擊view log可以查看日志
我們的任務在這台機器上執行,並打印了hello, 注意, 這個打印的日期.
這樣就是一個基本的airflow任務單元了, 這個任務每天8點會執行.
理解調度系統的概念
任務定義
定義一個任務的具體內容,比如這里就是打印Hello World,today is {{ ds }}
.
任務實例
任務設定了運行時間,每次運行時會生成一個實例,即 dag-task-executiondate 標記一個任務實例.任務實例和任務當前代表的執行時間綁定. 本demo中,每天會生成一個任務實例.
執行日期
今天是2019-09-07, 但我們日志里打印的任務執行日期是2019-09-06.
執行日期是任務實例運行所代表的任務時間, 我們通常叫做execute-date或bizdate, 類似hive表的的分區.
為什么今天執行的任務,任務的時間變量是昨天呢?
因為任務實例是一個時間段的任務, 比如計算每天的訪問量, 我們只有6號這一天過去了才能計算6號這一天的的總量. 那這個任務最早要7號0點之后才能計算, 計算6號0點到7號0點之間的訪問量.所以,這個任務時間就代表任務要處理的數據時間, 就是6號. 任務真正執行時間不固定的, 可以7號, 也可以8號, 只要任務執行計算的數據區間是6號就可以了.
因此, 調度系統中的ds(execution date)通常是過去的一個周期, 即本周期執行上周期的任務.
任務依賴
最典型的任務模型etl(Extract & Transformation & Loading,即數據抽取,轉換,加載)最少也要分成3步. 對於每天要統計訪問量這個目標來說, 我必須要抽取訪問日志, 找到訪問量的字段, 計算累加. 這3個任務之間有先后順序,必須前一個執行完畢之后,后一個才可以執行. 這叫任務依賴. 不同的任務之間的依賴.在airflow里, 通過在關聯任務實現依賴.
還有同一個任務的時間依賴. 比如,計算新增用戶量, 我必須知道前天的數據和昨天的數據, 才能計算出增量. 那么, 這個任務就必須依賴於昨天的任務狀態. 在airflow里,通過設置depends_on_past
來決定.
任務補錄backfill
airflow里有個功能叫backfill, 可以執行過去時間的任務. 我們把這個操作叫做補錄或者補數,為了計算以前沒計算的數據.
我們的任務是按時間執行的, 今天創建了一個任務, 計算每天的用戶量, 那么明天會跑出今天的數據. 這時候,我想知道過去1個月每天的用戶增量怎么辦?
自己寫code, 只要查詢日期范圍的數據,然后分別計算就好. 但調度任務是固定的, 根據日期去執行的. 我們只能創建不同日期的任務實例去執行這些任務. backfill就是實現這種功能的.
任務重跑
讓跑過的任務再跑一次.
有時候, 我們的任務需要重跑. 比如, etl任務, 今天突然發現昨天抽取的數據任務有問題,少抽取一個app的數據, 那后面的計算用戶量就不准確, 我們就需要重新抽取,重新計算.
在airflow里, 通過點擊任務實例的clear按鈕, 刪除這個任務實例, 然后調度系統會再次創建並執行這個實例.
關於調度系統這個實現邏輯, 我們后面有機會來查看源碼了解.
后記
本文沒太實質性的任務具體介紹, 而是引出Hello World, 先跑起來,我們接下來繼續完善我們的dag.