之前試用了azkaban一小段時間,雖然上手快速方便,但是功能還是太簡單,不夠靈活。
Airflow使用代碼來管理任務,這樣應該是最靈活的,決定試一下。
我是python零基礎,在使用airflow的過程中可謂吃盡了苦頭。。好歹最后實現所有要求,兩三周的時間沒有白費
看完這篇文章,可以達到如下目標:
- 安裝airflow
- 如何修改界面右上角顯示的時間到當前時區
- 如何添加任務
- 調試任務python代碼
- 如何啟動spark任務
- 如何限定任務同時執行的個數
- 如何手動觸發任務時傳入參數
- 如何在airflow界面上重新運行任務
- 如何查看任務log及所有任務的運行記錄
- 如何在任務失敗時發郵件(騰訊企業郵箱)
- 如何在任務失敗時發消息到企業微信
以下過程已經過去了有一段時間,當時記錄的也不一定很全面,如果有的不能執行,請留言告知。
安裝airflow
系統:Ubuntu 16
python: 3.7
airflow版本:1.10.10
保持pip3到最新版本
pip3 install --upgrade pip
安裝使用pip3
切換到root用戶執行: pip3 install apache-airflow
你以為敲完這條命令就可以去把個妹或者撩個漢再回來就裝好了,請坐下。
我碰到的錯誤:
Python.h not found
運行
sudo apt-get install python3.7-dev
某些依賴版本不對:
ERROR: pendulum 1.4.4 has requirement python-dateutil<3.0.0.0,>=2.6.0.0, but you'll have python-dateutil 2.4.2 which is incompatible.
ERROR: pandas 0.25.3 has requirement python-dateutil>=2.6.1, but you'll have python-dateutil 2.4.2 which is incompatible.
運行
pip install python-dateutil --upgrade
哪個包版本不對,更新哪個
數據庫使用mysql
相信你看這個文章的時候應該不會還沒有嘗試裝過airflow,所以airflow.cfg這個文件已經有了,在哪也很清楚
修改airflow.cfg:
sql_alchemy_conn = mysql://airflow:password@jjh1810:3306/airflow
使用root用戶連接到mysql:
create user 'airflow'@'%' identified by '123';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
set explicit_defaults_for_timestamp = 1; --這一行至關重要
再使用airflow用戶登錄mysql:
create database airflow CHARACTER SET = utf8mb4;
初始化數據庫
airflow initdb
這時候會報mysql依賴問題,如:
No module named '_mysql'
安裝python的mysql依賴:
No module named MySQLdb
python3: mysql錯誤:ModuleNotFoundError: No module named 'ConfigParser'
這個時候終於可以啟動airflow了:
啟的時候不要使用root用戶,回到普通用戶
airflow webserver -p 8080
airflow scheduler
如何修改界面右上角顯示的時間到當前時區
相信應該所有人都會干這個事情:
喲?airflow里有個時區的配置,改了應該就好了
default_timezone = Asia/Shanghai
然后去刷一下頁面
還是UTC嘛,這配置騙人的嗎?
那么看這一篇文章吧:
Airflow 修改時區
這個不僅改掉了界面上的時區顯示,而且schedule里的時間一起改掉了,絕對是居家旅行,編碼調試必備方案。
改的時候注意: python的代碼是根據縮進來區別代碼塊的,所以拷代碼的時候一定要注意縮進沒有問題
如何添加任務
在~/airflow下創建dags文件夾,把.py文件放入即可
airflow啟動了一個叫 DagFileProcessorManager 的進程來掃描dags目錄,一但有文件個數變更,或者內容變更都會很快被檢測到
這個進程有相應的log文件,可以提供一些文件處理錯誤信息
調試任務python代碼
關閉schedule
這個時候已經開始寫任務的python代碼了,對於python小白與剛開始接觸airflow的小哥或老哥來說,簡直就是痛不欲生
有一個配置在調試的時候比較實用,就是關掉任務的schudle,只有手動觸發才會執行。
把dag的schedule_interval設置為None
schedule_interval=None
python小白實用技巧
還有python代碼里單引號和雙引號是等價的,如果有引號嵌套可以分開使用,避免麻煩的轉義,如:
hour = '{{ dag_run.conf["hour"] }}'
Jinja template
反正我第一眼看到這個東西,特別是官方教程里那一大塊的模板文本的時候,心里只有一個字: WTF?!
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
其實也不是很復雜,這個玩意理解了以后還是比較方便的。除了在代碼中使用普通的python變量或者airflow內置變量之外,很多時候在字符串中也需要使用airflow的內置變量會比較靈活和方便,Jinja template提供的就是這個功能。
內置變量說明見:Macros reference
如何啟動spark任務
airflow是很強大很靈活的,官方提供了功能豐富的各種插件,各種JobOperator。來,簡單好用童叟無欺的SparkSubmitOperator了解一下?
我的需求很簡單,可以提交任務到不同的spark集群。這樣就要求不能使用機器默認的hadoop和spark環境變量,必須為每個任務指定獨立的配置文件。不知道是不是有大牛一次性成功的,反正我是試了無數次,一句話在心里不停的重復:“這什么吊東西!”
小可愚鈍,google能搜出來的都看過了,怎么都不行,死活都不行,主要是環境變量不對。
調用linux腳本執行spark-submit是最靈活方便的辦法
轉念一想,還是傳統的spark提交方式最好用啊,就是執行sh腳本利用spark-submit進行提交,這樣spark就與airflow無關了,而且不管是環境變量還是參數配置都最靈活,傳入參數也很方便。
這樣只要使用普通的BashOperator就可以了,而且airflow應該專注如何調度任務,而不是還要兼顧任務的配置,就算SparkSubmitOperator可以工作,也是使用sh腳本的方式更好。
如何限定任務同時執行的個數
像spark任務通常使用的資源都會比較多,如果dag執行開始時間到當前時間間隔很長,或是暫停很長時間再開啟,那么一開啟的時候schedule會瞬間創建大量任務,提交到默認的pool,這個pool默認的大小是128。這樣肯定是大家不希望看到的。
一個解決辦法,為每個spark任務創建單獨的pool,大小設置為1,這樣一個spark任務一次就只能有一個在運行狀態,后面都排隊。
界面上操作:[Admin] -> [Pools],slots設為1。
然后在spark task的operator里添加參數:pool='PoolName'
如何手動觸發任務時傳入參數
假設任務是每小時觸發一次,處理24小時前的數據,也就是今天8點處理昨天8點這一個小時的數據。除了schedule自動執行的邏輯,我們還希望可以手動觸發任務,並且指定某個小時重新處理。
注: 這個功能只有1.10.10才支持,就是在界面上點擊 [Trigger DAG] 的時候可以填入參數(固定為Json格式)。
先來看一下最終的結果
hour='{{ dag_run.conf["hour"] if dag_run.conf["hour"] else (execution_date - macros.timedelta(hours=24)).strftime("%Y%m%d%H") }}'
這里使用了Jinja template,通過dag_run對象可以獲取界面上傳入的參數,如果沒有這個參數(也就是通過schedule觸發),那么執行默認的邏輯(這里是24之前),並且格式化時間與界面輸入保持一致。
如何在airflow界面上重新運行任務
這個功能默認的Executor是不支持的,需要安裝CeleryExecutor,而CeleryExecutor需要一個存放消息的框架,這里我們選擇rabbitmq。
假定rabbitmq已經裝好。
安裝請看官方文檔:Celery Executor
配置
executor = CeleryExecutor
borker_url = amqp://user:password@host:port
注: 如果rabbitmq是集群模式,這里也是挑一台出來使用。指定所有節點我還沒有配置成功,如果有會配置的,請留言告知。
如何在界面上重跑任務呢?
界面上點擊dag進入dag管理界面,點擊[Tree View]。
Task每次運行都會用一個小方塊來表示,點擊小方塊,再點擊 [Run] 按鈕就可以了。
注: Tree View 這里最多只顯示固定數量的歷史記錄,如果再早的時間只能通過點擊 [Trigger DAG] 再指定參數運行。
任務運行時間的問題
這里有一個關鍵的問題,在界面上點擊8個小時以前任務執行,那么任務觸發的時候,運行的是8個小時之前的時間,還是當前時間呢?
如果我們是通過之前的hour變量的來指定時間的,那任務運行的時間就是8個小時之前,任務當時觸發的時間。為什么呢?
我們在Jinja template里使用的變量 dag_run, execute_date這個並不是運行時變量,每次task觸發,相關的上下文信息都會存到數據庫里。所以8個小時之后我們再重新運行task的時候,是從數據庫中讀取當時的上下文信息,而不是現在的信息。
如何查看任務log及所有任務的運行記錄
查看所有任務的運行記錄
- DAG界面里的 [Graph View] -> 點擊任務 -> [Task Instances]
- 主菜單里的 [Browser] -> [Task Instances]
查看log
這就比較簡單了
- 點擊 [Tree View] 里的小方塊,可以查看log
- Task Intances 列表最后一列,也可以查看log
如何在任務失敗時發郵件(騰訊企業郵箱)
首先DAG的default_args需要配置
'email':['name@mail.com'],
'email_on_failure': True
修改airflow.cfg
smtp_host = smtp.exmail.qq.com
smtp_starttls = False
smtp_ssl = True
smtp_port = 465
smtp_mail_from = name@mail.com
smtp_user = name@mail.com
smtp_password = password
首先 smtp_ssl = True, smtp_port = 465 是一個重點。再次smtp_mail_from和smtp_user都使用同一個有效的郵箱地址。
如何在任務失敗時發消息到企業微信
有時候覺得發郵件可能還不夠,想把失敗消息發到企業微信,這樣更能及時的發現問題。
添加企業微信依賴
airflow官方支持釘釘的通知,企業微信通知就是根據這個改動而來,代碼有兩個py文件:airflow企業微信支持
把這兩個py文件放到 dags 目錄,也就是和dag的py文件放在一起。
在企業微信群中創建機器人
-
右鍵點擊群
-
選擇 [Add Group Robot],並創建
-
獲取機器人的key:右鍵 [View Information],可以得到一個URL
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx-xx-xx
這個key的值就是機器人的ID -
在airflow中創建企業微信的連接:[主菜單] -> [Admin] -> [Connections],配置填寫:
Conn Id: wechat_robot
Conn Type: HTTP
Host: https://qyapi.weixin.qq.com
Password: 前面得到的key值,也就是機器人的ID
在代碼中使用
-
代碼中import WechatOperator
from wechat_operator import WechatOperator
-
創建 failure call 方法:
def failure_callback(context):
dagConf = context['dag_run'].conf
taskInst = context['task_instance']
hour = None
if 'hour' in dagConf:
hour = dagConf['hour']
else:
hour = (taskInst.execution_date - timedelta(hours=24)).strftime('%Y%m%d%H')
message = 'Airflow任務失敗:\n' \
'DAG: {}\n' \
'Task: {}\n' \
'時間: {}\n' \
.format(taskInst.dag_id,
taskInst.task_id,
hour)
return WechatOperator(
task_id='wechat_task',
wechat_conn_id='wechat_robot',
message_type='text',
message=message,
at_all=True,
).execute(context)
這個代碼應該還是很好懂的,主要是為了創建 WechatOperator 對象。
有個邏輯來重新獲取執行時間(這里必須使用代碼,而不能直接使用Jinja template),為的是在通知里面可以直接看到是哪個時間出錯了。
- default_args添加 failure callback配置
'on_failure_callbak': failure_callback
結束語
到這里,總算是搭建好一個可以正式投入生產使用的環境了。
Airflow雖然很靈活,但是想真正滿足生產需求,還是經歷了不少痛苦。特別是要求會使用python,加上airflow官方文檔也不是很詳細,這兩點導致入門曲線太陡峭了。