教程:https://airflow.apache.org/docs/stable/index.html
官網: http://airflow.incubator.apache.org/index.html
airflow源碼:https://github.com/apache/incubator-airflow
參考資料:http://www.open-open.com/lib/view/open1452002876105.html
簡介:http://www.cnblogs.com/xianzhedeyu/p/8047828.html
重要參數介紹:http://www.cnblogs.com/skyrim/p/7456166.html
http://blog.csdn.net/permike/article/details/52184621
FAQ :http://blog.csdn.net/yingkongshi99/article/details/52658660
容器:docker pull puckel/docker-airflow
啟動dag調度器, 注意啟動調度器, 並不意味着dag會被馬上觸發, dag觸發需要符合它自己的schedule規則
如果缺省了END_DATE參數, END_DATE等同於START_DATE.
使用 DummyOperator 來匯聚分支
使用 ShortCircuitOperator/BranchPythonOperator 做分支
使用 SubDagOperator 嵌入一個子dag
使用 TriggerDagRunOperator 直接trigger 另一個dag
在創建MyBashOperator的實例時候, 為on_failure_callback和on_success_callback參數設置兩個回調函數, 我們在回調函數中, 將success或failed狀態記錄到自己的表中.
DAG的schedule_interval參數設置成None, 表明這個DAG始終是由外部觸發。
如果將default_args
字典傳遞給DAG,DAG將會將字典應用於其內部的任何Operator上。這很容易的將常用參數應用於多個Operator,而無需多次鍵入。
default_args=dict( start_date=datetime(2016, 1, 1), owner='Airflow') dag = DAG('my_dag', default_args=default_args) op = DummyOperator(task_id='dummy', dag=dag) print(op.owner) # Airflow
initdb,初始化元數據DB,元數據包括了DAG本身的信息、運行信息等;
resetdb,清空元數據DB;
list_dags,列出所有DAG;
list_tasks,列出某DAG的所有task;
test,測試某task的運行狀況;
backfill,測試某DAG在設定的日期區間的運行狀況;
webserver,開啟webserver服務;
scheduler,用於監控與觸發DAG。
$ cd ${AIRFLOW_HOME}/dags $ python test_import.py # 保證代碼無語法錯誤 $ airflow list_dags # 查看dag是否成功加載 airflow list_tasks test_import_dag –tree # 查看dag的樹形結構是否正確 $ airflow test test_import_dag \ test_import_task 2016-3-7 # 測試具體的dag的某個task在某個時間的運行是否正常 $ airflow backfill test_import_dag -s 2016-3-4 \ -e 2016-3-7 # 對dag進行某段時間內的完整測試
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
請注意,airflow test命令在本地運行任務實例,將其日志輸出到stdout(屏幕上),不會影響依賴關系,並且不會將狀態(運行,成功,失敗,...)發送到數據庫。 它只是允許簡單的測試單個任務實例。
如果使用depends_on_past = True,則單個任務實例將取決於上一個任務實例的成功與否,如果指定本身的start_date,則忽略此依賴關系
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
使用Xcom在task之間傳參
可以直接使用jinja模板語言,在{{}}中調用ti的xcom_push和xcom_pull方法,下面的例子為t1使用xcom_push推出了一個kv,t2通過taskid和key來接收
dag = DAG( dag_id='xcomtest', default_args=default_args, schedule_interval='*/2 * ** *') t1 = BashOperator( task_id='xcom', bash_command='''''{{ ti.xcom_push(key='aaa', value='bbb') }}''', dag=dag) t2 = BashOperator( task_id='xcom2', bash_command='''''echo"{{ ti.xcom_pull(key='aaa', task_ids='xcom') }}" ''', dag=dag) t2.set_upstream(t1)
airflow提供了很多Macros Variables,可以直接使用jinja模板語言調用宏變量
execution_date並不是task的真正執行時間,而是上一周期task的執行時間。
我們在airflow上看到一個任務是6am執行的,而且interval=4hours,那么execution_date的值是2am,而不是6am
暫時無法hold或pause某個task,只支持以dag為單位pause
當使用BashOperator時,command需要調用腳本時,腳本后需要有個空格,否則報錯,暫時不清楚原因,但加空格后可以正常執行,如下例,run.sh后需加空格
Airflow為Operator提供許多常見任務,包括:
BashOperator - 執行bash命令
PythonOperator - 調用任意的Python函數
EmailOperator - 發送郵件
HTTPOperator - 發送 HTTP 請求
SqlOperator - 執行 SQL 命令
Sensor - 等待一定時間,文件,數據庫行,S3鍵等...
除了這些基本的構建塊之外,還有更多的特定Operator:DockerOperator,HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator,SlackOperator
使用supervisord進行deamon
airflow本身沒有deamon模式,所以直接用supervisord就ok了,我們只要寫4行代碼
[program:airflow_web] command=/home/kimi/env/athena/bin/airflow webserver -p 8080 [program:airflow_scheduler] command=/home/kimi/env/athena/bin/airflow scheduler 作者:yin1941 鏈接:https://www.jianshu.com/p/59d69981658a 來源:簡書 著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
airflow 執行的命令或這種消息是支持 jinja2 模板語言;{{ ds }}是一種宏,表示當前的日期,
形如2016-12-16,支持的宏在
https://airflow.incubator.apache.org/code.html#macros
test: 用於測試特定的某個task,不需要依賴滿足
run: 用於執行特定的某個task,需要依賴滿足
backfill: 執行某個DAG,會自動解析依賴關系,按依賴順序執行
unpause: 將一個DAG啟動為例行任務,默認是關的,所以編寫完DAG文件后一定要執行這和要命令,相反命令為pause
scheduler: 這是整個 airflow 的調度程序,一般是在后台啟動
clear: 清除一些任務的狀態,這樣會讓scheduler來執行重跑
============================
前面的腳本里用到了{{ ds }}
變量,每個DAG在執行時都會傳入一個具體的時間(datetime對象), 這個ds
就會在 render 命令時被替換成對應的時間。這里要特別強調一下, 對於周期任務,airflow傳入的時間是上一個周期的時間(划重點),比如你的任務是每天執行, 那么今天傳入的是昨天的日期,如果是周任務,那傳入的是上一周今天的值
==========================
executor
SequentialExecutor:表示單進程順序執行,通常只用於測試
LocalExecutor:表示多進程本地執行,它用python的多進程庫從而達到多進程跑任務的效果。
CeleryExecutor:表示使用celery作為執行器,只要配置了celery,就可以分布式地多機跑任務,一般用於生產環境。
sql_alchemy_conn :這個配置讓你指定 airflow 的元信息用何種方式存儲,默認用sqlite,如果要部署到生產環境,推薦使用 mysql。
smtp :如果你需要郵件通知或用到 EmailOperator 的話,需要配置發信的 smtp 服務器
======================
觸發條件有兩個維度, 以T1&T2->T3 這樣的dag為例:
一個維度是: 要根據dag上次運行T3的狀態確定本次T3是否被調用, 由DAG的default_args.depends_on_past參數控制, 為True時, 只有上次T3運行成功, 這次T3才會被觸發
另一個維度是: 要根據前置T1和T2的狀態確定本次T3是否被調用, 由T3.trigger_rule參數控制, 有下面6種情形, 缺省是all_success.
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy: dependencies are just for show, trigger at will
========================
airflow有兩個基於PythonOperator的Operator來支持dag分支功能.
ShortCircuitOperator, 用來實現流程的判斷. Task需要基於ShortCircuitOperator, 如果本Task返回為False的話, 其下游Task將被skip; 如果為True的話, 其下游Task將會被正常執行. 尤其適合用在其下游都是單線節點的場景.
BranchPythonOperator, 用來實現Case分支. Task需要基於BranchPythonOperator, airflow會根據本task的返回值(返回值是某個下游task的id),來確定哪個下游Task將被執行, 其他下游Task將被skip.
======================
connection 表:
我們的Task往往需要通過jdbc/ftp/http/webhdfs方式訪問其他資源, 一般地訪問資源時候都需要一些簽證, airflow允許我們將這些connection以及鑒證存放在connection表中. 可以現在WebUI的Admin->Connections管理這些連接, 在代碼中使用這些連接.
MySQL 應該使用 mysqlclient 包, 我簡單試了mysql-connector-python 有報錯
LocalExecutor 和 CeleryExecutor 都可用於生產環境, CeleryExecutor 將使用 Celery 作為Task執行的引擎, 擴展性很好, 當然配置也更復雜, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其實真正要求擴展性的場景並不多, 所以LocalExecutor 是一個很不錯的選擇了.
1. 配置OS環境變量 AIRFLOW_HOME, AIRFLOW_HOME缺省為 ~/airflow
2. 運行下面命令初始化一個Sqlite backend DB, 並生成airflow.cfg文件
your_python ${AIRFLOW_HOME}\bin\airflow initdb
3. 如果需要修改backend DB類型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新運行 airflow initdb .
官方推薦使用MySQL/PostgreSQL做DB Server.
有下面3個參數用於控制Task的並發度,
parallelism, 一個Executor同時運行task實例的個數
dag_concurrency, 一個dag中某個task同時運行的實例個數
max_active_runs_per_dag: 一個dag同時啟動的實例個數
start_date 有點特別,如果你設置了這個參數,那么airflow就會從start_date開始以 schedule_interval 的規則開始執行,例如設置成3天前每小時執行一次,那么在調度正常啟動時,就會立即調度 24*3 次,但注意,腳本執行環境的時間還是當前的系統時間,而不會說真是把系統時間模擬成3天前,所以感覺這個功能應用場景比較好限。
===========================
dags_folder目錄支持子目錄和軟連接,因此不同的dag可以分門別類的存儲起來
schedule_interval=timedelta(minutes=1) 或者 crontab格式
crontab格式的介紹:https://www.cnblogs.com/chenshishuo/p/5152068.html http://blog.csdn.net/liguohanhaha/article/details/52261192
sql_alchemy_conn = mysql://ct:152108@localhost/airflow
對應字段解釋如下: dialect+driver://username:password@host:port/database
當遇到不符合常理的情況時考慮清空 airflow backend的數據庫, 可使用airflow resetdb清空。
刪除dag文件后,webserver中可能還會存在相應信息,這時需要重啟webserver並刷新網頁。
關閉webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
界面的時候看起來比較蛋疼, utc-0的時間,
修改.../python2.7/site-packages/airflow/www/templates/admin/master.html如下(注釋掉UCTSeconds,新增一行UTCSeconds), 這樣時間就是本地時間了。
驗證腳本是否有問題:python xxx.py
看是否能查詢出新增的dags嗎:airflow list_dags
啟動schedule :airflow scheduler
這里有的 start_date 有點特別,如果你設置了這個參數,那么airflow就會從start_date開始以 schedule_interval 的規則開始執行,例如設置成3天前每小時執行一次,那么在調度正常啟動時,就會立即調度 24*3 次,但注意,腳本執行環境的時間還是當前的系統時間,而不會說真是把系統時間模擬成3天前,所以感覺這個功能應用場景比較好限
在centos6.8上裝特別順利(運行時貌似一切都正常,就是任務一直處於running狀態---debug了一番源代碼, 發現內存要必需夠大,發現必需用非root身份運行airflow worker, 務必保證核數夠用,否則需要調低dag_concurrency, max_active_runs_per_dag,max_threads,parallelism, 否則worker出現莫名其妙的問題)
airflow跑着跑着就掛了,一看內存還夠用(可能需要不要錢的加內存),如果你到處找不到想要的錯誤日志。那么看看AIRFLOW_HOME下面是不是莫名其妙的多了幾個 .err/.out 的文件,進去看看會有收獲。
在需要運行作業的機器上的安裝airflow airflow[celery] celery[redis] 模塊后,啟動airflow worker即可.這樣作業就能運行在多個節點上.
安裝主模塊
[airflow@airflow ~]$ pip install airflow
2.4.2 安裝數據庫模塊、密碼模塊
[airflow@airflow ~]$ pip install "airflow[postgres,password]"