airflow 介紹
- airflow是一款開源的,分布式任務調度框架,它將一個具有上下級依賴關系的工作流,組裝成一個有向無環圖。
- 特點:
- 分布式任務調度:允許一個工作流的task在多台worker上同時執行
- 可構建任務依賴:以有向無環圖的方式構建任務依賴關系
- task原子性:工作流上每個task都是原子可重試的,一個工作流某個環節的task失敗可自動或手動進行重試,不必從頭開始任務
- 工作流示意圖
- 一個dag表示一個定時的工作流,包含一個或者多個具有依賴關系的task
- task依賴圖
- 架構圖及集群角色
- webserver : 提供web端服務,以及會定時生成子進程去掃描對應的目錄下的dags,並更新數據庫
- scheduler : 任務調度服務,根據dags生成任務,並提交到消息中間件隊列中 (redis或rabbitMq)
- celery worker : 分布在不同的機器上,作為任務真正的的執行節點。通過監聽消息中間件: redis或rabbitMq 領取任務
- flower : 監控worker進程的存活性,啟動或關閉worker進程,查看運行的task
實戰
構建docker鏡像
- 采用的airflow是未發行的1.10.0版本,原因是從1.10.0開始,支持時區的設置,而不是統一的UTC
//self.registry.domain 為docker私有鏡像倉庫
//self.mvn.registry.com maven 私有鏡像倉庫
//data0 為數據目錄,data1為日志目錄,運維統一配置日志清楚策略
#docker build --network host -t self.registry.domain/airflow_base_1.10.7:1.0.0 .
FROM self.registry.domain/airflow/centos_base_7.4.1708:1.0.0
LABEL AIRFLOW=1.10.7
ARG CELERY_REDIS=4.1.1
ARG DOCKER_VERSION=1.13.1
ARG AIRFLOW_VERSION=1.10.7
ADD sbin /data0/airflow/sbin
ENV SLUGIFY_USES_TEXT_UNIDECODE=yes \
#如果構建鏡像的機器需要代理才能連接外網的話,配置https_proxy
https_proxy=https://ip:port
RUN curl http://self.mvn.registry.com/python/python-3.5.6.jar -o /tmp/Python-3.5.6.tgz && \
curl http://self.mvn.registry.com/airflow/${AIRFLOW_VERSION}/airflow-${AIRFLOW_VERSION}.jar -o /tmp/incubator-airflow-${AIRFLOW_VERSION}.tar.gz && \
curl http:/self.mvn.registry.com/docker/${DOCKER_VERSION}/docker-${DOCKER_VERSION}.jar -o /tmp/docker-${DOCKER_VERSION}.tar.gz && \
tar zxf /tmp/docker-${DOCKER_VERSION}.tar.gz -C /data0/software && \
tar zxf /tmp/Python-3.5.6.tgz -C /data0/software && \
tar zxf /tmp/incubator-airflow-${AIRFLOW_VERSION}.tar.gz -C /data0/software && \
yum install -y libtool-ltdl policycoreutils-python && \
rpm -ivh --force --nodeps /data0/software/docker-${DOCKER_VERSION}/docker-engine-selinux-${DOCKER_VERSION}-1.el7.centos.noarch.rpm && \
rpm -ivh --force --nodeps /data0/software/docker-${DOCKER_VERSION}/docker-engine-${DOCKER_VERSION}-1.el7.centos.x86_64.rpm && \
yum -y install gcc && yum -y install gcc-c++ && yum -y install make && \
yum -y install zlib-devel mysql-devel python-devel cyrus-sasl-devel cyrus-sasl-lib libxml2-devel libxslt-devel && \
cd /data0/software/Python-3.5.6 && ./configure && make && make install && \
ln -sf /usr/local/bin/pip3 /usr/local/bin/pip && \
ln -sf /usr/local/bin/python3 /usr/local/bin/python && \
cd /data0/software/incubator-airflow-${AIRFLOW_VERSION} && python setup.py install && \
pip install -i https://pypi.douban.com/simple/ apache-airflow[crypto,celery,hive,jdbc,mysql,hdfs,password,redis,devel_hadoop] && \
pip install -i https://pypi.douban.com/simple/ celery[redis]==$CELERY_REDIS && \
pip install -i https://pypi.douban.com/simple/ docutils && \
ln -sf /usr/local/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow /data0/software/airflow && \
mkdir -p /data0/airflow/bin && \
ln -sf /data0/airflow/sbin/airflow-200.sh /data0/airflow/bin/200.sh && \
ln -sf /data0/airflow/sbin/airflow-503.sh /data0/airflow/bin/503.sh && \
chown -R root:root /data0/software/ && \
chown -R root:root /data0/airflow/ && \
chmod -R 775 /data0/airflow/sbin/* && \
chmod -R 775 /data0/airflow/bin/* && \
echo 'source /data0/airflow/sbin/init-airflow.sh' >> ~/.bashrc && \
rm -rf /tmp/* /data0/software/Python-3.5.6 /data0/software/incubator-airflow-${AIRFLOW_VERSION} /data0/software/docker-${DOCKER_VERSION}
ENV PATH=$PATH:/data0/software/jdk/bin:/data0/software/airflow/bin:/data0/airflow/sbin/:/data0/airflow/sbin/airflow/:/data0/airflow/bin/
WORKDIR /data0/airflow/bin/
- 通過docker 啟動容器的話需要暴露幾個端口
webserver: 8081
worker: 8793
flower: 5555
//啟動示例
docker run --name airflow -it -d --privileged --net=host -p 8081:8081 -p 5555:5555 -p 8793:8793 -v /var/run/docker.sock:/var/run/docker.sock -v /data1:/data1 -v /data0/airflow:/data0/airflow self.registry.domain/airflow_1.10.7:1.0.0
- airflow 升級到未release的1.10.0的版本
- 如果之前用的是低版本的話,需要執行airflow upgradedb 來更新遷移數據庫的schema
- 由於1.10.0版本對一些參數配置名稱進行了修改,需要重新進行對號入座,如:
celeryd_concurrency -> worker_concurrency
celery_result_backend -> result_backend
- 1.10.0版本開始支持自定義時區,而不需要再統一使用默認的UTC時區,但是界面上的顯示還是統一UTC,所以需要對代碼進行定制化,以支持界面按照我們想要的時區顯示
//airflow.cfg
default_timezone = Etc/GMT-8
//views.py
def utc2local(utc):
epoch = time.mktime(utc.timetuple())
offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
return utc + offset
//dags.html
utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")
airflow plugins 定制化開發
- 官方plugins文檔
- 由於dag的刪除現在官方沒有暴露直接的api,而完整的刪除又牽扯到多個表,總結出刪除dag的sql如下
set @dag_id = 'BAD_DAG';
delete from airflow.xcom where dag_id = @dag_id;
delete from airflow.task_instance where dag_id = @dag_id;
delete from airflow.sla_miss where dag_id = @dag_id;
delete from airflow.log where dag_id = @dag_id;
delete from airflow.job where dag_id = @dag_id;
delete from airflow.dag_run where dag_id = @dag_id;
delete from airflow.dag where dag_id = @dag_id;
實現的200和503腳本,用於集群統一的上下線操作
- 200腳本用於統一操作集群服務上線
#!/usr/bin/env bash
function usage() {
echo -e "\n A tool used for starting airflow services
Usage: 200.sh {webserver|worker|scheduler|flower}
"
}
PORT=8081
ROLE=webserver
ENV_ARGS=""
check_alive() {
PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`
[ -n "$PID" ] && return 0 || return 1
}
check_scheduler_alive() {
PIDS=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk '{print $2}'`
[ -n "$PIDS" ] && return 0 || return 1
}
function get_host_ip(){
local host=$(ifconfig | grep "inet " | grep "\-\->" | awk '{print $2}' | tail -1)
if [[ -z "$host" ]]; then
host=$(ifconfig | grep "inet " | grep "broadcast" | awk '{print $2}' | tail -1)
fi
echo "${host}"
}
start_service() {
if [ $ROLE = 'scheduler' ];then
check_scheduler_alive
else
check_alive
fi
if [ $? -ne 0 ];then
nohup airflow $ROLE $ENV_ARGS > $BASE_LOG_DIR/$ROLE/$ROLE.log 2>&1 &
sleep 5
if [ $ROLE = 'scheduler' ];then
check_scheduler_alive
else
check_alive
fi
if [ $? -ne 0 ];then
echo "service start error"
exit 1
else
echo "service start success"
exit 0
fi
else
echo "service alreay started"
exit 0
fi
}
function main() {
if [ -z "${POOL}" ]; then
echo "the environment variable POOL cannot be empty"
exit 1
fi
source /data0/hcp/sbin/init-hcp.sh
case "$1" in
webserver)
echo "starting airflow webserver"
ROLE=webserver
PORT=8081
start_service
;;
worker)
echo "starting airflow worker"
ROLE=worker
PORT=8793
local host_ip=$(get_host_ip)
ENV_ARGS="-cn ${host_ip}@${host_ip}"
start_service
;;
flower)
echo "starting airflow flower"
ROLE=flower
PORT=5555
start_service
;;
scheduler)
echo "starting airflow scheduler"
ROLE=scheduler
start_service
;;
*)
usage
exit 1
esac
}
main "$@"
- 503腳本用於統一操作集群服務下線
#!/usr/bin/env bash
function usage() {
echo -e "\n A tool used for stop airflow services
Usage: 200.sh {webserver|worker|scheduler|flower}
"
}
function get_host_ip(){
local host=$(ifconfig | grep "inet " | grep "\-\->" | awk '{print $2}' | tail -1)
if [[ -z "$host" ]]; then
host=$(ifconfig | grep "inet " | grep "broadcast" | awk '{print $2}' | tail -1)
fi
echo "${host}"
}
function main() {
if [ -z "${POOL}" ]; then
echo "the environment variable POOL cannot be empty"
exit 1
fi
source /data0/hcp/sbin/init-hcp.sh
case "$1" in
webserver)
echo "stopping airflow webserver"
cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9
;;
worker)
echo "stopping airflow worker"
PORT=8793
PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`
kill -9 $PID
local host_ip=$(get_host_ip)
ps -ef | grep celeryd | grep ${host_ip}@${host_ip} | awk '{print $2}' | xargs kill -9
;;
flower)
echo "stopping airflow flower"
PORT=5555
PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`
kill -9 $PID
start_service
;;
scheduler)
echo "stopping airflow scheduler"
PID=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk '{print $2}'`
kill -9 $PID
;;
*)
usage
exit 1
esac
}
main "$@"
All in docker
為了使宿主機盡量clean,woker/master 節點便捷的橫向擴展,我們所有服務(airflow worker/master/具體task) 都通過docker 的形式發布/調度。同時這樣做也便於結合我們內部的代碼構建發布工具
通過gitlab CI 規范dag發布流程
隨着任務托管規模的擴大,使用業務方的增多,不規范的dag提交流程(上機器直接操作目錄)可能導致所有任務災難性不可恢復。於是dag的提交應該規范起來。
因為我們是通過gitlab對代碼進行托管的,而某種程度上,dag的管理也是一種代碼托管。所以我們把dag放在了gitlab上,結合gitlab CI工具,業務方提交dag之后需經過上一級的review之后合入主分支,此時觸發一個事件,通知airflow master上的dag更新服務對dag目錄進行更新。整個過程如下圖
自定義Spark流/批任務托管插件
由於團隊主要使用Spark計算框架進行離線和實時計算,Spark一般以Cluster模式提交任務。
對於批任務而言,此時計算結果是異步的,在airflow中的表現是上游任務提交完就成功退出,下游任務開始執行。如果在airflow中兩個spark任務有明顯的上下游數據依賴關系的話,會導致下游任務雪崩式的失敗,很明顯達不到任務托管的目的。當然,如果spark最終寫入hive table的話還可以用airlfow自帶的Sensor,但是我們只是產出到某個目錄,所以此方案行不通。
於是我們定制了數據spark任務提交以及數據質量檢查插件,在提交任務后不斷輪詢任務狀態以及最后產出路徑數據狀態,當數據質量校驗通過后,才成功退出當前任務;否則根據超時機制和重試次數對任務進行重跑。提交示例如下(通過response_check回調函數對數據質量進行校驗):
對於流任務而言,雖然airflow不太適合托管常駐型任務,但我們想的是統一流批任務的提交,管理,重試機制的方式,所以也通過插件的方式定制化。提交示例如下:
遇到的坑以及定制化解決方案
問題1: airflow worker 角色不能使用根用戶啟動
- 原因:不能用根用戶啟動的根本原因,在於airflow的worker直接用的celery,而celery 源碼中有參數默認不能使用ROOT啟動,否則將報錯, 源碼鏈接
C_FORCE_ROOT = os.environ.get('C_FORCE_ROOT', False)
ROOT_DISALLOWED = """\
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
User information: uid={uid} euid={euid} gid={gid} egid={egid}
"""
ROOT_DISCOURAGED = """\
You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid={uid} euid={euid} gid={gid} egid={egid}
"""
- 解決方案一:修改airlfow源碼,在celery_executor.py中強制設置C_FORCE_ROOT
from celery import Celery, platforms
在app = Celery(…)后新增
platforms.C_FORCE_ROOT = True
重啟即可
- 解決方案二:在容器初始化環境變量的時候,設置C_FORCE_ROOT參數,以零侵入的方式解決問題
強制celery worker運行采用root模式
export C_FORCE_ROOT=True
問題2: docker in docker
- 在dags中以docker方式調度任務時,為了container的輕量化,不做重型的docker pull等操作,我們利用了docker cs架構的設計理念,只需要將宿主機的/var/run/docker.sock文件掛載到容器目錄下即可 docker in docker 資料
問題3: 多個worker節點進行調度反序列化dag執行的時候,報找不到module的錯誤
- 當時考慮到文件更新的一致性,采用所有worker統一執行master下發的序列化dag的方案,而不依賴worker節點上實際的dag文件,開啟這一特性操作如下
worker節點上: airflow worker -cn=ip@ip -p //-p為開關參數,意思是以master序列化的dag作為執行文件,而不是本地dag目錄中的文件
master節點上: airflow scheduler -p
- 錯誤原因: 遠程的worker節點上不存在實際的dag文件,反序列化的時候對於當時在dag中定義的函數或對象找不到module_name
- 解決方案一:在所有的worker節點上同時發布dags目錄,缺點是dags一致性成問題
- 解決方案二:修改源碼中序列化與反序列化的邏輯,主體思路還是替換掉不存在的module為main。修改如下:
//models.py 文件,對 class DagPickle(Base) 定義修改
import dill
class DagPickle(Base):
id = Column(Integer, primary_key=True)
# 修改前: pickle = Column(PickleType(pickler=dill))
pickle = Column(LargeBinary)
created_dttm = Column(UtcDateTime, default=timezone.utcnow)
pickle_hash = Column(Text)
__tablename__ = "dag_pickle"
def __init__(self, dag):
self.dag_id = dag.dag_id
if hasattr(dag, 'template_env'):
dag.template_env = None
self.pickle_hash = hash(dag)
raw = dill.dumps(dag)
# 修改前: self.pickle = dag
reg_str = 'unusual_prefix_\w*{0}'.format(dag.dag_id)
result = re.sub(str.encode(reg_str), b'__main__', raw)
self.pickle =result
//cli.py 文件反序列化邏輯 run(args, dag=None) 函數
// 直接通過dill來反序列化二進制文件,而不是通過PickleType 的result_processor做中轉
修改前: dag = dag_pickle.pickle
修改后:dag = dill.loads(dag_pickle.pickle)
- 解決方案三:源碼零侵入,使用python的types.FunctionType重新創建一個不帶module的function,這樣序列化與反序列化的時候不會有問題
new_func = types.FunctionType((lambda df: df.iloc[:, 0].size == xx).__code__, {})
問題4: 在master節點上,通過webserver無法查看遠程執行的任務日志
- 原因:由於airflow在master查看task執行日志是通過各個節點的http服務獲取的,但是存入task_instance表中的host_name不是ip,可見獲取hostname的方式有問題.
- 解決方案:修改airflow/utils/net.py 中get_hostname函數,添加優先獲取環境變量中設置的hostname的邏輯
//models.py TaskInstance
self.hostname = get_hostname()
//net.py 在get_hostname里面加入一個獲取環境變量的邏輯
import os
def get_hostname():
"""
Fetch the hostname using the callable from the config or using
`socket.getfqdn` as a fallback.
"""
# 嘗試獲取環境變量
if 'AIRFLOW_HOST_NAME' in os.environ:
return os.environ['AIRFLOW_HOST_NAME']
# First we attempt to fetch the callable path from the config.
try:
callable_path = conf.get('core', 'hostname_callable')
except AirflowConfigException:
callable_path = None
# Then we handle the case when the config is missing or empty. This is the
# default behavior.
if not callable_path:
return socket.getfqdn()
# Since we have a callable path, we try to import and run it next.
module_path, attr_name = callable_path.split(':')
module = importlib.import_module(module_path)
callable = getattr(module, attr_name)
return callable()