2. 相關概念
2.1 服務進程
2.1.1. web server
2.1.2. scheduler
2.1.3. worker
2.1.4. celery flower
2.2 相關概念
2.2.1. dag
2.2.2.task
2.2.3.Operator
2.2.4 scheduler
2.2.5.worker
2.2.6.executor
2.2.7.Task Instances
2.2.8.pool
2.2.9.connection
2.2.10.Hooks
2.2.11.Queues
2.2.12.XComs
2.2.13.Variables
2.2.14.Branching
2.2.15.SLAs (Service Level Agreements)
2.2.16.Trigger Rules
2.2.17 宏
2.2.18 jinja2
2.2.19 Latest Run Only
3. 命令行
4. API
5. 使用
5.1 創建dag
5.2 示例dag
6. 總結
1. airflow簡介
airflow是Airbnb公司於2014年開始開發的一個工作流調度器.不同於其它調度器使用XML或者text文件方式定義工作流,airflow通過python文件作流,用戶可以通過代碼完全自定義自己的工作流。airflow的主要功能:工作流定義、任務調度、任務依賴、變量、池、分布式執行任務等。
2. 相關概念
2.1 服務進程
2.1.1. web server
web server是airflow的顯示與管理工具,在頁面中能看到任務及執行情況,還能配置變量、池等
2.1.2. scheduler
調度器用來監控任務執行時間並提交任務給worker執行。在airflow中scheduler做為獨立的服務來啟動。
2.1.3. worker
工作進程,負責任務的的執行。worker進程會創建SequentialExecutor、LocalExecutor、CeleryExecutor之一來執行任務。在airflow中作為獨立服務啟動。
2.1.4. celery flower
celery flower用來監控celery executor的信息。
url:http://host:5555
2.2 相關概念
2.2.1. dag
主dag
即有向無圖,相當於azkban中的project。dag中定義的了任務類型、任務依賴、調度周期等.dag由task組中,task定義了任務的類型、任務腳本等,dag定義task之間的依賴。airflow中的任務表現為一個個的dag.此外還有subdag,在dag中嵌套一個dag(具體作用需進一步研究)。
subdag
相當於azkban中project 中的flow.將dag中的某些task合並到一個子dag中,將這個子dag做為一個執行單元。
使用subdag時要注意:
1)by convention, a SubDAG’s dag_id should be prefixed by its parent and a dot. As in 'parent.child' 。
引用子dag時要加上父dag前綴,parent.child
2)share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
通過向子dag的operator傳入參數來實現在父dag和子dag信息共享。
3)SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
子dag必須要設置scheduler,如果沒有設置或者設置為@once,則子dag直接返回執行成功,但是不會執行任務操作
4)clearing a SubDagOperator also clears the state of the tasks within
清除子dag(的狀態?)也會清除其中的task狀態
5)marking success on a SubDagOperator does not affect the state of the tasks within
將子dag的狀態標記為success不會影響所包含的task的狀態
6)refrain from using depends_on_past=True! in tasks within the SubDAG as this can be confusing
不要在dag中使用depends_on_past=True!
7)it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
使用SequentialExecutor來運行子dag,其它的executor執行子dag會出問題
2.2.2.task
task定義任務的類型、任務內容、任務所依賴的dag等。dag中每個task都要有不同的task_id.
dag = DAG('testFile', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator( #任務類型是bash
task_id='echoDate', #任務id
bash_command='echo date > /home/datefile', #任務命令
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,[]()
dag=dag)
t2.set_upstream(t1) #定義任務信賴,任務2依賴於任務1
任務之間通過task.set_upstream\task.set_downstream來設置依賴,也可以用位運算:
t1>>t2<<t3 表示t2依賴於t1和t3.不建議用該種方式。
2.2.3.Operator
操作器,定義任務該以哪種方式執行。airflow有多種operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社區貢獻的operator等,其中BaseOperator是所有operator的基礎operator。
BaseOperator | 基礎operator,設置baseoperator會影響所有的operator |
BashOperator | executes a bash command |
DummyOperator | 空操作 |
PythonOperator | calls an arbitrary Python function |
EmailOperator | sends an email |
HTTPOperator | sends an HTTP request |
SqlOperator | executes a SQL command |
Sensor | waits for a certain time, file, database row, S3 key, etc… |
t1 = BashOperator( #任務類型是bash
task_id='echoDate', #任務id
bash_command='echo date > /home/datefile', #任務命令
dag=dag)
2.2.4 scheduler
scheduler監控dag的狀態,啟動滿足條件的dag,並將任務提交給具體的executor執行。dag通過scheduler來設置執行周期。
1.何時執行
注意:當使用schedule_interval
來調度一個dag,假設執行周期為1天,startdate=2016-01-01,則會在2016-01-01T23:59后執行這個任務。 airflow只會在執行周期的結尾執行任務。
2.設置dag執行周期
在dag中設置schedule_interval
來定義調度周期。該參數可以接收cron 表達式
和datetime.timedelta
對象,另外airflow還預置了一些調度周期。
preset | Run once a year at midnight of January 1 | cron |
---|---|---|
None |
Don’t schedule, use for exclusively “externally triggered” DAGs | |
@once |
Schedule once and only once | |
@hourly |
Run once an hour at the beginning of the hour | 0 * * * * |
@daily |
Run once a day at midnight | 0 0 * * * |
@weekly |
Run once a week at midnight on Sunday morning | 0 0 * * 0 |
@monthly |
Run once a month at midnight of the first day of the month | 0 0 1 * * |
@yearly |
Run once a year at midnight of January 1 | 0 0 1 1 * |
3.backfill和catchup
backfill:填充任務,手動重跑過去失敗的任務(指定日期)。
catchup:如果歷史任務出錯,調度器嘗試按調度順序重跑歷史任務(而不是按照當前時間執行當前任務)。可以在dag中設置dag.catchup = False
或者參數文件中設置catchup_by_default = False
來禁用這個功能。
4.External Triggers
我還沒整明白(等我翻下書再告訴你啊~)
2.2.5.worker
worker指工作節點,類似於yarn中的nodemanager。work負責啟動機器上的executor來執行任務。使用celeryExecutor后可以在多個機器上部署worker服務。
2.2.6.executor
執行任務的進程,dag中的task由executor來執行。有三個executor:SequentialExecutor(順序執行)、LocalExecutor(本地執行)、CeleryExecutor(遠程執行)。
2.2.7.Task Instances
dag中被實例化的任務。
2.2.8.pool
池用來控制同個pool的task並行度。
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
上例中,aggregate_db_message_job設置了pool,如果pool的最大並行度為1,當其它任務也設置該池時,如果aggregate_db_message_job在運行,則其它任務必須等待。
2.2.9.connection
定義對airflow之外的連接,如對mysql hive hdfs等工具的連接。airflow中預置了一些連接類型,如mysql hive hdfs postgrey等。
2.2.10.Hooks
Hooks 是對外的connection接口,通過自定義hooks實現connection中不支持的連接。
2.2.11.Queues
airflow中的隊列嚴格來說不叫Queues,叫"lebal"更為合適。在operator中,可以設置queue參數如queue=spark,然后在啟動worker時:airflow worker -q spark,那么該worker只會執行spark任務。相當於節點標簽。、
2.2.12.XComs
默認情況下,dag與dag之間 、task與task之間信息是無法共享的。如果想在dag、task之間實現信息共享,要使用XComs,通過設置在一個dag(task)中設置XComs參數在另一個中讀取來實現信息共享。
2.2.13.Variables
在airflow中可以設置一些變量,在dag和task中可以引用這些變量:
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
設置變量:
此外,airflow預置了一些變量:
具體參考:http://airflow.incubator.apache.org/code.html#macros
2.2.14.Branching
dag中的任務可以選擇分支! BranchPythonOperator允許用戶通過函數返回下一步要執行的task的id,從而根據條件選擇執行的分支。azkaban沒有該功能。注意,BranchPythonOperator下級task是被"selected"或者"skipped"的分支。
2.2.15.SLAs (Service Level Agreements)
SLAs指在一段時間內應該完全的操作,比如在一個小時內dag應該執行成功,如果達不目標可以執行其它任務比如發郵件發短信等。
2.2.16.Trigger Rules
Trigger Rules定義了某個task在何種情況下執行。默認情況下,某個task是否執行,依賴於其父task(直接上游任務)全部執行成功。airflow允許創建更復雜的依賴。通過設置operator中的trigger_rule參數來控制:
all_success
: (default) all parents have succeeded 父task全failed
all_failed
: all parents are in afailed
orupstream_failed
state 父task全failed
或者upstream_failed
狀態all_done
: all parents are done with their execution 父task全執行過,不管success or failedone_failed
: fires as soon as at least one parent has failed, it does not wait for all parents to be done 當父task中有一個是failed
狀態時執行,不必等到所有的父task都執行one_success
: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 當父task中有一個是success
狀態時執行,不必等到所有的父task都執行dummy
: dependencies are just for show, trigger at will 無條件執行
該參數可以和depends_on_past
結合使用,當設置為true時,如果上一次沒有執行成功,這一次無論如何都不會執行。
2.2.17 宏
airflow中內置了一些宏,可以在代碼中引用。
通用宏:
airflow特定的宏:
airflow.macros.ds_add(ds, days) |
airflow.macros.ds_format(ds, input_format, output_format) |
airflow.macros.random() → x in the interval [0, 1) |
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default') |
airflow.macros.hive.max_partition(table, schema='default', field=None, filter=None, metastore_conn_id='metastore_default') |
詳細說明:
http://airflow.incubator.apache.org/code.html#macros
2.2.18 jinja2
airflow支持jinja2語法。Jinja2是基於python的模板引擎,功能比較類似於於PHP的smarty,J2ee的Freemarker和velocity。關於jinja2:
http://10.32.1.149:7180/cmf/login
2.2.19 Latest Run Only
這個太復雜,待近一步研究
3. 命令行
airflow命令的語法結構:
airflow 子命令 [參數1][參數2]….
如 airflow test example_dag print_date 2017-05-06子命令
子命令包括:
resetdb | Burn down and rebuild the metadata database |
render | Render a task instance’s template(s) |
variables | CRUD operations on variables |
connections | List/Add/Delete connections |
pause | Pause a DAG |
task_failed_deps | Returns the unmet dependencies for a task instance from the perspective of the scheduler |
version | Show the version |
trigger_dag | Trigger a DAG run |
initdb | Initialize the metadata database |
test | Test a task instance. This will run a task without checking for dependencies or recording it’s state in the database. |
unpause | Resume a paused DAG |
dag_state | Get the status of a dag run |
run | Run a single task instance |
list_tasks | List the tasks within a DAG |
backfill | Run subsections of a DAG for a specified date range |
list_dags | List all the DAGs |
kerberos | Start a kerberos ticket renewer |
worker | Start a Celery worker node |
webserver | Start a Airflow webserver instance |
flower | Start a Celery Flower |
scheduler | Start a scheduler instance |
task_state | Get the status of a task instance |
pool | CRUD operations on pools |
serve_logs | Serve logs generate by worker |
clear | Clear a set of task instance, as if they never ran |
upgradedb | Upgrade the metadata database to latest version |
使用:
[bqadm@sitbqbm1~]$ airflow webserver -p 8080
詳細命令參考:
http://airflow.incubator.apache.org/cli.html#
4. API
airflow的api分為Operator、Macros、Modles、Hooks、Executors幾個部分,主要關注Operator、Modles這兩部分
詳細API文檔:
http://airflow.incubator.apache.org/code.html
5. 使用
5.1 創建dag
1.創建一個pthon文件testBashOperator.py:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'yangxw',
'depends_on_past': False,
'start_date': datetime(2017, 5, 9),
'email': ['xiaowen.yang@bqjr.cn'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('printDate', default_args=default_args,schedule_interval='*/1 * * * *')
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='datefile',
bash_command='date > /home/bqadm/datefile',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
2.編譯該文件
把文件放到$AIRFLOW_HIME/dags下,然后執行:
[bqadm@bqdpsit1 dags]$ python testFile.py
[2017-05-18 10:04:17,422] {__init__.py:57} INFO - Using executor CeleryExecutor
這樣dag就被創建了
3.啟動dag
在web上,點擊最左邊按鈕,將off切換為on
這樣dag就啟動了。dag啟后,會根據自生的調度情況執行。上列中的dag每分鍾執行一次,將時間寫入/home/bqadm/datafile里。
如果執行出錯還會發郵件通知:
5.2 示例dag
airflow內置了16個示例dag,通過學習這些dag的源碼可掌握operator、調度、任務依賴的知識,能快速入門。
6. 總結
airflow是功能強大並且極其靈活的pipeline工具,通過python腳本能控制ETL中各個環節,其缺點是使用比較復雜,需要一定的編程水平。此外,當一個dag中有數十個task時,python文件將變的非常長導致維護不便
。airflow在國內並未廣泛使用,面臨一定的技術風險
。