1, 簡介
Airflow是一個可編程,調度和監控的工作流平台,基於有向無環圖(DAG),airflow可以定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面同樣也可以方便的管控調度任務,並且對任務運行狀態進行實時監控,方便了系統的運維和管理。
2,執行器(Executor)
Airflow本身是一個綜合平台,它兼容多種組件,所以在使用的時候有多種方案可以選擇。比如最關鍵的執行器就有四種選擇:
SequentialExecutor:單進程順序執行任務,默認執行器,通常只用於測試
LocalExecutor:多進程本地執行任務
CeleryExecutor:分布式調度,生產常用
DaskExecutor :動態任務調度,主要用於數據分析
在當前項目使用CeleryExecutor
作為執行器。
celery是一個分布式調度框架,其本身無隊列功能,需要使用第三方組件,比如redis或者rabbitmq,當前項目使用的是rabbitmq,系統整體結構如下所示:
其中:
turing為外部系統
GDags服務幫助拼接成dag
master節點webui管理dags、日志等信息
scheduler負責調度,只支持單節點
worker負責執行具體dag中的task, worker支持多節點
在整個調度系統中,節點之間的傳遞介質是消息,而消息的本質內容是執行腳本的命令,也就是說,工作節點的dag文件必須和master節點的dag文件保持一致,不然任務的執行會出問題。
3,任務處理器
airflow內置了豐富的任務處理器,用於實現不同類型的任務:
BashOperator : 執行bash命令
PythonOperator : 調用python代碼
EmailOperator : 發送郵件
HTTPOperator : 發送 HTTP 請求
SqlOperator : 執行 SQL 命令
除了這些基本的構建塊之外,還有更多的特定處理器:DockerOperator
,HiveOperator
,S3FileTransferOperator
,PrestoToMysqlOperator
,SlackOperator
...
在當前項目使用了HTTPOperator
作為執行器,用於調用JAVA服務,整體結構圖如下:
關於airflow的環境搭建可以參考另外一篇博客: https://www.cnblogs.com/cord/p/9226608.html
4,基本使用
4.1,常用命令
$ airflow webserver -D 守護進程運行webserver
$ airflow scheduler -D 守護進程運行調度器
$ airflow worker -D 守護進程運行調度器
$ airflow worker -c 1 -D 守護進程運行celery worker並指定任務並發數為1
$ airflow pause dag_id 暫停任務
$ airflow unpause dag_id 取消暫停,等同於在管理界面打開off按鈕
$ airflow list_tasks dag_id 查看task列表
$ airflow clear dag_id 清空任務實例
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 運行整個dag文件
$ airflow run dag_id task_id execution_date 運行task
4.2,web管控界面的使用
啟動web管控界面需要執行airflow webserver -D
命令,默認訪問端口是8080
http://110.55.63.51:8080/admin/
(1) 任務啟動暫停開關
(2) 任務運行狀態
(3) 待執行,未分發的任務
(4) 手動觸發執行任務
(5) 任務管控界面
選擇對應dag欄目,點擊(5) Graph View即可進入任務管控界面
點擊對應的任務,會彈出一個任務管控台,主要幾個功能如下:
View Log : 查看任務日志
Run : 運行選中任務
Clear:清空任務隊列
Mark Success : 標記任務為成功狀態
4.3 通過定義DAG文件實現創建定時任務
1) 普通任務
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = { #默認參數
'owner': 'jifeng.si', #dag擁有者,用於權限管控
'depends_on_past': False, #是否依賴上游任務
'start_date': datetime(2018, 5, 2), #任務開始時間,默認utc時間
'email': ['123456789@qq.com'], #告警通知郵箱地址
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_hello_world_dag', #dag的id
default_args=default_args,
description='my first DAG', #描述
schedule_interval='*/25 * * * *', # crontab
start_date=datetime(2018, 5, 28) #開始時間,覆蓋默認參數
)
def print_hello():
return 'Hello world!'
dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)
hello_operator = BashOperator( #通過BashOperator定義執行bash命令的任務
task_id='sleep_task',
depends_on_past=False,
bash_command='echo `date` >> /home/py/test.txt',
dag=dag
)
dummy_operator >> hello_operator #設置任務依賴關系
#dummy_operator.set_downstream(hello_operator)
2) 定義http任務並使用本地時間
import os
from datetime import timedelta, datetime
import pytz
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG
default_args = {
'owner': 'cord',
# 'depends_on_past': False,
'depends_on_past': True,
'wait_for_downstream': True,
'execution_timeout': timedelta(minutes=3),
'email': ['123456789@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
#將本地時間轉換為utc時間,再設置為start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'
dag = DAG(
'bm01',
default_args=default_args,
description='my DAG',
schedule_interval='*/2 * * * *',
start_date=utc_dt
)
#通過SimpleHttpOperator定義http任務
task1 = SimpleHttpOperator(
task_id='get_op1',
http_conn_id='http_test',
method='GET',
endpoint='test1',
data={},
headers={},
dag=dag)
task2 = SimpleHttpOperator(
task_id='get_op2',
http_conn_id='http_test',
method='GET',
endpoint='test2',
data={},
headers={},
dag=dag)
task1 >> task2
4.4 crontab語法
crontab
格式如下所示:
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │ 7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * * command to execute
域 | 是否必須 | 取值范圍 | 可用特殊符號 | 備注 |
---|---|---|---|---|
Minutes | Yes | 0–59 | * , - |
|
Hours | Yes | 0–23 | * , - |
|
Day of month | Yes | 1–31 | * , - ? L W |
? L W 部分實現可用 |
Month | Yes | 1–12 or JAN–DEC | * , - |
|
Day of week | Yes | 0–6 or SUN–SAT | * , - ? L # |
? L W 部分實現可用 |
Year | No | 1970–2099 | * , - |
標准實現里無這一項 |
特殊符號功能說明:
逗號(,
)
逗號用於分隔一個列表里的元素,比如 "MON,WED,FRI" 在第五域(day of week)表示Mondays, Wednesdays and Fridays。
連字符(-
)
連字符用於表示范圍,比如2000–2010表示2000到2010之間的每年,包括這兩年(閉區間)。
百分號(%
)
用於命令(command)中的格式化
L
表示last
,最后一個,比如第五域,5L
表示當月最后一個星期五
W
W
表示weekday(Monday-Friday),指離指定日期附近的工作日,比如第三域設置為15L
,這表示臨近當月15附近的工作日,假如15號是星期六,那么定時器會在14號執行,如果15號是星期天,那么定時器會在16號執行,也就是說只會在離指定日期最近的那天執行。
井號#
#
用於第五域(day of week),#后面跟着一個1~5之間的數字,這個用於表示第幾個星期,比如5#3
表示第三個星期五
?
在有些實現里面,?
與*
的功能相同,還有一些實現里面?
表示cron的啟動時間,比如 當cron服務在8:25am啟動,則? ? * * * *
會更新為25 8 * * * *
, 直到下一次cron服務重新啟動,定時器會再次更新。
/
/
一般與*
組合使用,后面跟着一個數字,表示頻率,比如在第一域(Minutes)中*/5
表示每5分鍾,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的縮寫
參考鏈接:
https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral