3.Airflow使用


1. airflow簡介

airflow是Airbnb公司於2014年開始開發的一個工作流調度器.不同於其它調度器使用XML或者text文件方式定義工作流,airflow通過python文件作流,用戶可以通過代碼完全自定義自己的工作流。airflow的主要功能:工作流定義、任務調度、任務依賴、變量、池、分布式執行任務等。

2. 相關概念

2.1 服務進程

2.1.1. web server

web server是airflow的顯示與管理工具,在頁面中能看到任務及執行情況,還能配置變量、池等

2.1.2. scheduler

調度器用來監控任務執行時間並提交任務給worker執行。在airflow中scheduler做為獨立的服務來啟動。

2.1.3. worker

工作進程,負責任務的的執行。worker進程會創建SequentialExecutor、LocalExecutor、CeleryExecutor之一來執行任務。在airflow中作為獨立服務啟動。

2.1.4. celery flower

celery flower用來監控celery executor的信息。
url:http://host:5555

2.2 相關概念

2.2.1. dag

  • 主dag
    即有向無圖,相當於azkban中的project。dag中定義的了任務類型、任務依賴、調度周期等.dag由task組中,task定義了任務的類型、任務腳本等,dag定義task之間的依賴。airflow中的任務表現為一個個的dag.此外還有subdag,在dag中嵌套一個dag(具體作用需進一步研究)。

  • subdag
    相當於azkban中project 中的flow.將dag中的某些task合並到一個子dag中,將這個子dag做為一個執行單元。

使用subdag時要注意:
1)by convention, a SubDAG’s dag_id should be prefixed by its parent and a dot. As in 'parent.child' 。
引用子dag時要加上父dag前綴,parent.child

2)share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
通過向子dag的operator傳入參數來實現在父dag和子dag信息共享。

3)SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
子dag必須要設置scheduler,如果沒有設置或者設置為@once,則子dag直接返回執行成功,但是不會執行任務操作

4)clearing a SubDagOperator also clears the state of the tasks within
清除子dag(的狀態?)也會清除其中的task狀態

5)marking success on a SubDagOperator does not affect the state of the tasks within
將子dag的狀態標記為success不會影響所包含的task的狀態

6)refrain from using depends_on_past=True! in tasks within the SubDAG as this can be confusing
不要在dag中使用depends_on_past=True!

7)it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
使用SequentialExecutor來運行子dag,其它的executor執行子dag會出問題

2.2.2.task

task定義任務的類型、任務內容、任務所依賴的dag等。dag中每個task都要有不同的task_id.

dag = DAG('testFile', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(   #任務類型是bash
    task_id='echoDate', #任務id
    bash_command='echo date > /home/datefile', #任務命令
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,[]()
    dag=dag)

t2.set_upstream(t1) #定義任務信賴,任務2依賴於任務1

任務之間通過task.set_upstream\task.set_downstream來設置依賴,也可以用位運算:
t1>>t2<<t3 表示t2依賴於t1和t3.不建議用該種方式。

2.2.3.Operator

操作器,定義任務該以哪種方式執行。airflow有多種operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社區貢獻的operator等,其中BaseOperator是所有operator的基礎operator。

BaseOperator 基礎operator,設置baseoperator會影響所有的operator
BashOperator executes a bash command
DummyOperator 空操作
PythonOperator calls an arbitrary Python function
EmailOperator sends an email
HTTPOperator sends an HTTP request
SqlOperator executes a SQL command
Sensor waits for a certain time, file, database row, S3 key, etc…
t1 = BashOperator(   #任務類型是bash
    task_id='echoDate', #任務id
    bash_command='echo date > /home/datefile', #任務命令
    dag=dag)

2.2.4 scheduler

scheduler監控dag的狀態,啟動滿足條件的dag,並將任務提交給具體的executor執行。dag通過scheduler來設置執行周期。

1.何時執行
注意:當使用schedule_interval來調度一個dag,假設執行周期為1天,startdate=2016-01-01,則會在2016-01-01T23:59后執行這個任務。 airflow只會在執行周期的結尾執行任務。

2.設置dag執行周期
在dag中設置schedule_interval來定義調度周期。該參數可以接收cron 表達式datetime.timedelta對象,另外airflow還預置了一些調度周期。

preset Run once a year at midnight of January 1 cron
None Don’t schedule, use for exclusively “externally triggered” DAGs
@once Schedule once and only once
@hourly Run once an hour at the beginning of the hour 0 * * * *
@daily Run once a day at midnight 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 0 0 * * 0
@monthly Run once a month at midnight of the first day of the month 0 0 1 * *
@yearly Run once a year at midnight of January 1 0 0 1 1 *

3.backfill和catchup
backfill:填充任務,手動重跑過去失敗的任務(指定日期)。
catchup:如果歷史任務出錯,調度器嘗試按調度順序重跑歷史任務(而不是按照當前時間執行當前任務)。可以在dag中設置dag.catchup = False或者參數文件中設置catchup_by_default = False來禁用這個功能。

4.External Triggers
我還沒整明白(等我翻下書再告訴你啊~)

2.2.5.worker

worker指工作節點,類似於yarn中的nodemanager。work負責啟動機器上的executor來執行任務。使用celeryExecutor后可以在多個機器上部署worker服務。

2.2.6.executor

執行任務的進程,dag中的task由executor來執行。有三個executor:SequentialExecutor(順序執行)、LocalExecutor(本地執行)、CeleryExecutor(遠程執行)。

2.2.7.Task Instances

dag中被實例化的任務。

2.2.8.pool

池用來控制同個pool的task並行度。

aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

上例中,aggregate_db_message_job設置了pool,如果pool的最大並行度為1,當其它任務也設置該池時,如果aggregate_db_message_job在運行,則其它任務必須等待。

2.2.9.connection

定義對airflow之外的連接,如對mysql hive hdfs等工具的連接。airflow中預置了一些連接類型,如mysql hive hdfs postgrey等。

2.2.10.Hooks

Hooks 是對外的connection接口,通過自定義hooks實現connection中不支持的連接。

2.2.11.Queues

airflow中的隊列嚴格來說不叫Queues,叫"lebal"更為合適。在operator中,可以設置queue參數如queue=spark,然后在啟動worker時:airflow worker -q spark,那么該worker只會執行spark任務。相當於節點標簽。、

2.2.12.XComs

默認情況下,dag與dag之間 、task與task之間信息是無法共享的。如果想在dag、task之間實現信息共享,要使用XComs,通過設置在一個dag(task)中設置XComs參數在另一個中讀取來實現信息共享。

2.2.13.Variables

在airflow中可以設置一些變量,在dag和task中可以引用這些變量:

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

設置變量:

此外,airflow預置了一些變量:

具體參考:http://airflow.incubator.apache.org/code.html#macros

2.2.14.Branching

dag中的任務可以選擇分支! BranchPythonOperator允許用戶通過函數返回下一步要執行的task的id,從而根據條件選擇執行的分支。azkaban沒有該功能。注意,BranchPythonOperator下級task是被"selected"或者"skipped"的分支。

2.2.15.SLAs (Service Level Agreements)

SLAs指在一段時間內應該完全的操作,比如在一個小時內dag應該執行成功,如果達不目標可以執行其它任務比如發郵件發短信等。

2.2.16.Trigger Rules

Trigger Rules定義了某個task在何種情況下執行。默認情況下,某個task是否執行,依賴於其父task(直接上游任務)全部執行成功。airflow允許創建更復雜的依賴。通過設置operator中的trigger_rule參數來控制:

  • all_success: (default) all parents have succeeded 父task全failed
  • all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed狀態
  • all_done: all parents are done with their execution 父task全執行過,不管success or failed
  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 當父task中有一個是failed狀態時執行,不必等到所有的父task都執行
  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 當父task中有一個是success狀態時執行,不必等到所有的父task都執行
  • dummy: dependencies are just for show, trigger at will 無條件執行

該參數可以和depends_on_past結合使用,當設置為true時,如果上一次沒有執行成功,這一次無論如何都不會執行。

2.2.17 宏

airflow中內置了一些宏,可以在代碼中引用。
通用宏:

airflow特定的宏:

airflow.macros.ds_add(ds, days)
airflow.macros.ds_format(ds, input_format, output_format)
airflow.macros.random() → x in the interval [0, 1)
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default')
airflow.macros.hive.max_partition(table, schema='default', field=None, filter=None, metastore_conn_id='metastore_default')

詳細說明:
http://airflow.incubator.apache.org/code.html#macros

2.2.18 jinja2

airflow支持jinja2語法。Jinja2是基於python的模板引擎,功能比較類似於於PHP的smarty,J2ee的Freemarker和velocity。關於jinja2:
http://10.32.1.149:7180/cmf/login

2.2.19 Latest Run Only

這個太復雜,待近一步研究

3. 命令行

  • airflow命令的語法結構:

    airflow 子命令 [參數1][參數2]….
    如 airflow test example_dag print_date 2017-05-06

  • 子命令
    子命令包括:

resetdb Burn down and rebuild the metadata database
render Render a task instance’s template(s)
variables CRUD operations on variables
connections List/Add/Delete connections
pause Pause a DAG
task_failed_deps Returns the unmet dependencies for a task instance from the perspective of the scheduler
version Show the version
trigger_dag Trigger a DAG run
initdb Initialize the metadata database
test Test a task instance. This will run a task without checking for dependencies or recording it’s state in the database.
unpause Resume a paused DAG
dag_state Get the status of a dag run
run Run a single task instance
list_tasks List the tasks within a DAG
backfill Run subsections of a DAG for a specified date range
list_dags List all the DAGs
kerberos Start a kerberos ticket renewer
worker Start a Celery worker node
webserver Start a Airflow webserver instance
flower Start a Celery Flower
scheduler Start a scheduler instance
task_state Get the status of a task instance
pool CRUD operations on pools
serve_logs Serve logs generate by worker
clear Clear a set of task instance, as if they never ran
upgradedb Upgrade the metadata database to latest version

使用:

[bqadm@sitbqbm1~]$ airflow webserver -p 8080

詳細命令參考:
http://airflow.incubator.apache.org/cli.html#

4. API

airflow的api分為Operator、Macros、Modles、Hooks、Executors幾個部分,主要關注Operator、Modles這兩部分

詳細API文檔:
http://airflow.incubator.apache.org/code.html

5. 使用

5.1 創建dag

1.創建一個pthon文件testBashOperator.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'yangxw',
    'depends_on_past': False,
    'start_date': datetime(2017, 5, 9),
    'email': ['xiaowen.yang@bqjr.cn'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('printDate', default_args=default_args,schedule_interval='*/1 * * * *')

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='datefile',
    bash_command='date > /home/bqadm/datefile',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

2.編譯該文件
把文件放到$AIRFLOW_HIME/dags下,然后執行:

[bqadm@bqdpsit1 dags]$ python testFile.py 
[2017-05-18 10:04:17,422] {__init__.py:57} INFO - Using executor CeleryExecutor

這樣dag就被創建了

3.啟動dag
在web上,點擊最左邊按鈕,將off切換為on

這樣dag就啟動了。dag啟后,會根據自生的調度情況執行。上列中的dag每分鍾執行一次,將時間寫入/home/bqadm/datafile里。

如果執行出錯還會發郵件通知:

5.2 示例dag

airflow內置了16個示例dag,通過學習這些dag的源碼可掌握operator、調度、任務依賴的知識,能快速入門。

6. 總結

airflow是功能強大並且極其靈活的pipeline工具,通過python腳本能控制ETL中各個環節,其缺點是使用比較復雜,需要一定的編程水平。此外,當一個dag中有數十個task時,python文件將變的非常長導致維護不便。airflow在國內並未廣泛使用,面臨一定的技術風險






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM