airflow實戰總結


airflow 介紹

  • airflow是一款開源的,分布式任務調度框架,它將一個具有上下級依賴關系的工作流,組裝成一個有向無環圖。
  • 特點:
  • 分布式任務調度:允許一個工作流的task在多台worker上同時執行
  • 可構建任務依賴:以有向無環圖的方式構建任務依賴關系
  • task原子性:工作流上每個task都是原子可重試的,一個工作流某個環節的task失敗可自動或手動進行重試,不必從頭開始任務
  • 工作流示意圖

  • 一個dag表示一個定時的工作流,包含一個或者多個具有依賴關系的task
  • task依賴圖

  • 架構圖及集群角色

  • webserver : 提供web端服務,以及會定時生成子進程去掃描對應的目錄下的dags,並更新數據庫
  • scheduler : 任務調度服務,根據dags生成任務,並提交到消息中間件隊列中 (redis或rabbitMq)
  • celery worker : 分布在不同的機器上,作為任務真正的的執行節點。通過監聽消息中間件: redis或rabbitMq 領取任務
  • flower : 監控worker進程的存活性,啟動或關閉worker進程,查看運行的task

實戰

構建docker鏡像

  • 采用的airflow是未發行的1.10.0版本,原因是從1.10.0開始,支持時區的設置,而不是統一的UTC
    //self.registry.domain 為docker私有鏡像倉庫
    //self.mvn.registry.com maven 私有鏡像倉庫
    //data0 為數據目錄,data1為日志目錄,運維統一配置日志清楚策略
    #docker build --network host -t self.registry.domain/airflow_base_1.10.7:1.0.0 .
    FROM self.registry.domain/airflow/centos_base_7.4.1708:1.0.0
    LABEL AIRFLOW=1.10.7
    
    ARG CELERY_REDIS=4.1.1
    ARG DOCKER_VERSION=1.13.1
    ARG AIRFLOW_VERSION=1.10.7
    
    ADD sbin /data0/airflow/sbin
    
    ENV SLUGIFY_USES_TEXT_UNIDECODE=yes \
        #如果構建鏡像的機器需要代理才能連接外網的話,配置https_proxy
        https_proxy=https://ip:port 

    RUN curl http://self.mvn.registry.com/python/python-3.5.6.jar -o /tmp/Python-3.5.6.tgz && \
        curl http://self.mvn.registry.com/airflow/${AIRFLOW_VERSION}/airflow-${AIRFLOW_VERSION}.jar -o /tmp/incubator-airflow-${AIRFLOW_VERSION}.tar.gz && \
        curl http:/self.mvn.registry.com/docker/${DOCKER_VERSION}/docker-${DOCKER_VERSION}.jar -o /tmp/docker-${DOCKER_VERSION}.tar.gz && \
        tar zxf /tmp/docker-${DOCKER_VERSION}.tar.gz -C /data0/software && \
        tar zxf /tmp/Python-3.5.6.tgz -C /data0/software && \
        tar zxf /tmp/incubator-airflow-${AIRFLOW_VERSION}.tar.gz -C /data0/software && \
        yum install -y libtool-ltdl policycoreutils-python && \
        rpm -ivh --force --nodeps /data0/software/docker-${DOCKER_VERSION}/docker-engine-selinux-${DOCKER_VERSION}-1.el7.centos.noarch.rpm && \
        rpm -ivh --force --nodeps /data0/software/docker-${DOCKER_VERSION}/docker-engine-${DOCKER_VERSION}-1.el7.centos.x86_64.rpm && \
        yum -y install gcc && yum -y install gcc-c++ && yum -y install make && \
        yum -y install zlib-devel mysql-devel python-devel cyrus-sasl-devel cyrus-sasl-lib libxml2-devel libxslt-devel && \
        cd /data0/software/Python-3.5.6 && ./configure && make && make install && \
        ln -sf /usr/local/bin/pip3 /usr/local/bin/pip && \
        ln -sf /usr/local/bin/python3 /usr/local/bin/python && \
        cd /data0/software/incubator-airflow-${AIRFLOW_VERSION} && python setup.py install && \
        pip install -i https://pypi.douban.com/simple/ apache-airflow[crypto,celery,hive,jdbc,mysql,hdfs,password,redis,devel_hadoop] && \
        pip install -i https://pypi.douban.com/simple/ celery[redis]==$CELERY_REDIS && \
        pip install -i https://pypi.douban.com/simple/ docutils && \
        ln -sf /usr/local/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow /data0/software/airflow && \
        mkdir -p /data0/airflow/bin && \
        ln -sf /data0/airflow/sbin/airflow-200.sh /data0/airflow/bin/200.sh && \
        ln -sf /data0/airflow/sbin/airflow-503.sh /data0/airflow/bin/503.sh && \
        chown -R root:root /data0/software/ && \
        chown -R root:root /data0/airflow/ && \
        chmod -R 775 /data0/airflow/sbin/* && \
        chmod -R 775 /data0/airflow/bin/* && \
        echo 'source /data0/airflow/sbin/init-airflow.sh' >> ~/.bashrc && \
        rm -rf /tmp/* /data0/software/Python-3.5.6 /data0/software/incubator-airflow-${AIRFLOW_VERSION} /data0/software/docker-${DOCKER_VERSION}
        
    ENV PATH=$PATH:/data0/software/jdk/bin:/data0/software/airflow/bin:/data0/airflow/sbin/:/data0/airflow/sbin/airflow/:/data0/airflow/bin/

    WORKDIR /data0/airflow/bin/
 
  • 通過docker 啟動容器的話需要暴露幾個端口
    webserver: 8081
    worker: 8793
    flower: 5555
    //啟動示例
    docker run --name airflow -it -d --privileged --net=host -p 8081:8081 -p 5555:5555 -p 8793:8793 -v /var/run/docker.sock:/var/run/docker.sock -v /data1:/data1 -v /data0/airflow:/data0/airflow self.registry.domain/airflow_1.10.7:1.0.0
 
  • airflow 升級到未release的1.10.0的版本
  • 如果之前用的是低版本的話,需要執行airflow upgradedb 來更新遷移數據庫的schema
  • 由於1.10.0版本對一些參數配置名稱進行了修改,需要重新進行對號入座,如:
    celeryd_concurrency -> worker_concurrency
    celery_result_backend -> result_backend
  • 1.10.0版本開始支持自定義時區,而不需要再統一使用默認的UTC時區,但是界面上的顯示還是統一UTC,所以需要對代碼進行定制化,以支持界面按照我們想要的時區顯示
  //airflow.cfg
  default_timezone = Etc/GMT-8

  //views.py
  def utc2local(utc):
        epoch = time.mktime(utc.timetuple())
        offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
        return utc + offset
  //dags.html
  utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
  utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")
 

airflow plugins 定制化開發

  • 官方plugins文檔
  • 由於dag的刪除現在官方沒有暴露直接的api,而完整的刪除又牽扯到多個表,總結出刪除dag的sql如下
  set @dag_id = 'BAD_DAG';
    delete from airflow.xcom where dag_id = @dag_id;
    delete from airflow.task_instance where dag_id = @dag_id;
    delete from airflow.sla_miss where dag_id = @dag_id;
    delete from airflow.log where dag_id = @dag_id;
    delete from airflow.job where dag_id = @dag_id;
    delete from airflow.dag_run where dag_id = @dag_id;
    delete from airflow.dag where dag_id = @dag_id;
 

實現的200和503腳本,用於集群統一的上下線操作

  • 200腳本用於統一操作集群服務上線
 #!/usr/bin/env bash
    function usage() {
        echo -e "\n A tool used for starting airflow services
    Usage: 200.sh {webserver|worker|scheduler|flower}
    "
    }

    PORT=8081
    ROLE=webserver
    ENV_ARGS=""
    check_alive() {
        PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`
        [ -n "$PID" ] && return 0 || return 1
    }

    check_scheduler_alive() {
        PIDS=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk '{print $2}'`
        [ -n "$PIDS" ] && return 0 || return 1
    }

    function get_host_ip(){
        local host=$(ifconfig | grep "inet " | grep "\-\->" | awk '{print $2}' | tail -1)
        if [[ -z "$host" ]]; then
            host=$(ifconfig | grep "inet " | grep "broadcast" | awk '{print $2}' | tail -1)
        fi
        echo "${host}"
    }

    start_service() {
        if [ $ROLE = 'scheduler' ];then
            check_scheduler_alive
        else
            check_alive
        fi
        if [ $? -ne 0 ];then
            nohup airflow $ROLE $ENV_ARGS > $BASE_LOG_DIR/$ROLE/$ROLE.log 2>&1 &
            sleep 5
            if [ $ROLE = 'scheduler' ];then
                check_scheduler_alive
            else
                check_alive
            fi
            if [ $? -ne 0 ];then
                echo "service start error"
                exit 1
            else
                echo "service start success"
                exit 0
            fi
        else
            echo "service alreay started"
            exit 0
        fi
    }

    function main() {
        if [ -z "${POOL}" ]; then
            echo "the environment variable POOL cannot be empty"
            exit 1
        fi
        source /data0/hcp/sbin/init-hcp.sh
        case "$1" in
            webserver)
                echo "starting airflow webserver"
                ROLE=webserver
                PORT=8081
                start_service
                ;;
            worker)
                echo "starting airflow worker"
                ROLE=worker
                PORT=8793
                local host_ip=$(get_host_ip)
                ENV_ARGS="-cn ${host_ip}@${host_ip}"
                start_service
                ;;
            flower)
                echo "starting airflow flower"
                ROLE=flower
                PORT=5555
                start_service
                ;;
            scheduler)
                echo "starting airflow scheduler"
                ROLE=scheduler
                start_service
                ;;     
            *)
                usage
                exit 1
        esac
    }


    main "$@"
 
  • 503腳本用於統一操作集群服務下線
#!/usr/bin/env bash
    function usage() {
        echo -e "\n A tool used for stop airflow services
    Usage: 200.sh {webserver|worker|scheduler|flower}
    "
    }

    function get_host_ip(){
        local host=$(ifconfig | grep "inet " | grep "\-\->" | awk '{print $2}' | tail -1)
        if [[ -z "$host" ]]; then
            host=$(ifconfig | grep "inet " | grep "broadcast" | awk '{print $2}' | tail -1)
        fi
        echo "${host}"
    }

    function main() {
        if [ -z "${POOL}" ]; then
            echo "the environment variable POOL cannot be empty"
            exit 1
        fi
        source /data0/hcp/sbin/init-hcp.sh
        case "$1" in
            webserver)
                echo "stopping airflow webserver"
                cat $AIRFLOW_HOME/airflow-webserver.pid | xargs kill -9
                ;;
            worker)
                echo "stopping airflow worker"
                PORT=8793
                PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`
                kill -9 $PID
                local host_ip=$(get_host_ip)
                ps -ef | grep celeryd | grep ${host_ip}@${host_ip} | awk '{print $2}' | xargs kill -9
                ;;
            flower)
                echo "stopping airflow flower"
                PORT=5555
                PID=`netstat -nlpt | grep $PORT | awk '{print $7}' | awk -F "/" '{print $1}'`
                kill -9 $PID
                start_service
                ;;
            scheduler)
                echo "stopping airflow scheduler"
                PID=`ps -ef | grep "/usr/local/bin/airflow scheduler" | grep "python" | awk '{print $2}'`
                kill -9 $PID
                ;;     
            *)
                usage
                exit 1
        esac
    }

    main "$@"
 

All in docker

為了使宿主機盡量clean,woker/master 節點便捷的橫向擴展,我們所有服務(airflow worker/master/具體task) 都通過docker 的形式發布/調度。同時這樣做也便於結合我們內部的代碼構建發布工具

通過gitlab CI 規范dag發布流程

隨着任務托管規模的擴大,使用業務方的增多,不規范的dag提交流程(上機器直接操作目錄)可能導致所有任務災難性不可恢復。於是dag的提交應該規范起來。

因為我們是通過gitlab對代碼進行托管的,而某種程度上,dag的管理也是一種代碼托管。所以我們把dag放在了gitlab上,結合gitlab CI工具,業務方提交dag之后需經過上一級的review之后合入主分支,此時觸發一個事件,通知airflow master上的dag更新服務對dag目錄進行更新。整個過程如下圖

自定義Spark流/批任務托管插件

由於團隊主要使用Spark計算框架進行離線和實時計算,Spark一般以Cluster模式提交任務。

對於批任務而言,此時計算結果是異步的,在airflow中的表現是上游任務提交完就成功退出,下游任務開始執行。如果在airflow中兩個spark任務有明顯的上下游數據依賴關系的話,會導致下游任務雪崩式的失敗,很明顯達不到任務托管的目的。當然,如果spark最終寫入hive table的話還可以用airlfow自帶的Sensor,但是我們只是產出到某個目錄,所以此方案行不通。

於是我們定制了數據spark任務提交以及數據質量檢查插件,在提交任務后不斷輪詢任務狀態以及最后產出路徑數據狀態,當數據質量校驗通過后,才成功退出當前任務;否則根據超時機制和重試次數對任務進行重跑。提交示例如下(通過response_check回調函數對數據質量進行校驗):

對於流任務而言,雖然airflow不太適合托管常駐型任務,但我們想的是統一流批任務的提交,管理,重試機制的方式,所以也通過插件的方式定制化。提交示例如下:

遇到的坑以及定制化解決方案

問題1: airflow worker 角色不能使用根用戶啟動

  • 原因:不能用根用戶啟動的根本原因,在於airflow的worker直接用的celery,而celery 源碼中有參數默認不能使用ROOT啟動,否則將報錯, 源碼鏈接
    C_FORCE_ROOT = os.environ.get('C_FORCE_ROOT', False)

    ROOT_DISALLOWED = """\
    Running a worker with superuser privileges when the
    worker accepts messages serialized with pickle is a very bad idea!

    If you really want to continue then you have to set the C_FORCE_ROOT
    environment variable (but please think about this before you do).

    User information: uid={uid} euid={euid} gid={gid} egid={egid}
    """

    ROOT_DISCOURAGED = """\
    You're running the worker with superuser privileges: this is
    absolutely not recommended!

    Please specify a different user using the --uid option.

    User information: uid={uid} euid={euid} gid={gid} egid={egid}
    """
  • 解決方案一:修改airlfow源碼,在celery_executor.py中強制設置C_FORCE_ROOT
	from celery import Celery, platforms 
	在app = Celery(…)后新增 
	platforms.C_FORCE_ROOT = True
	重啟即可 
  • 解決方案二:在容器初始化環境變量的時候,設置C_FORCE_ROOT參數,以零侵入的方式解決問題
 強制celery worker運行采用root模式
 export C_FORCE_ROOT=True
 

問題2: docker in docker

  • 在dags中以docker方式調度任務時,為了container的輕量化,不做重型的docker pull等操作,我們利用了docker cs架構的設計理念,只需要將宿主機的/var/run/docker.sock文件掛載到容器目錄下即可 docker in docker 資料

問題3: 多個worker節點進行調度反序列化dag執行的時候,報找不到module的錯誤

  • 當時考慮到文件更新的一致性,采用所有worker統一執行master下發的序列化dag的方案,而不依賴worker節點上實際的dag文件,開啟這一特性操作如下
	worker節點上: airflow worker -cn=ip@ip -p //-p為開關參數,意思是以master序列化的dag作為執行文件,而不是本地dag目錄中的文件
	master節點上: airflow scheduler -p
 
  • 錯誤原因: 遠程的worker節點上不存在實際的dag文件,反序列化的時候對於當時在dag中定義的函數或對象找不到module_name
  • 解決方案一:在所有的worker節點上同時發布dags目錄,缺點是dags一致性成問題
  • 解決方案二:修改源碼中序列化與反序列化的邏輯,主體思路還是替換掉不存在的module為main。修改如下:
//models.py 文件,對 class DagPickle(Base) 定義修改
import dill
class DagPickle(Base):
    id = Column(Integer, primary_key=True)
    # 修改前: pickle = Column(PickleType(pickler=dill))
    pickle = Column(LargeBinary)
    created_dttm = Column(UtcDateTime, default=timezone.utcnow)
    pickle_hash = Column(Text)

    __tablename__ = "dag_pickle"
    def __init__(self, dag):
        self.dag_id = dag.dag_id
        if hasattr(dag, 'template_env'):
            dag.template_env = None
        self.pickle_hash = hash(dag)
        raw = dill.dumps(dag)
	# 修改前: self.pickle = dag
        reg_str = 'unusual_prefix_\w*{0}'.format(dag.dag_id)
        result = re.sub(str.encode(reg_str), b'__main__', raw)
        self.pickle =result

	//cli.py 文件反序列化邏輯 run(args, dag=None) 函數
	// 直接通過dill來反序列化二進制文件,而不是通過PickleType 的result_processor做中轉
	修改前: dag = dag_pickle.pickle
	修改后:dag = dill.loads(dag_pickle.pickle) 
  • 解決方案三:源碼零侵入,使用python的types.FunctionType重新創建一個不帶module的function,這樣序列化與反序列化的時候不會有問題
new_func = types.FunctionType((lambda df: df.iloc[:, 0].size == xx).__code__, {})

問題4: 在master節點上,通過webserver無法查看遠程執行的任務日志

  • 原因:由於airflow在master查看task執行日志是通過各個節點的http服務獲取的,但是存入task_instance表中的host_name不是ip,可見獲取hostname的方式有問題.
  • 解決方案:修改airflow/utils/net.py 中get_hostname函數,添加優先獲取環境變量中設置的hostname的邏輯
	//models.py TaskInstance
	self.hostname = get_hostname()
	//net.py 在get_hostname里面加入一個獲取環境變量的邏輯
	import os
	def get_hostname():
    	"""
    	Fetch the hostname using the callable from the config or using
    	`socket.getfqdn` as a fallback.
    	"""
		# 嘗試獲取環境變量
    	if 'AIRFLOW_HOST_NAME' in os.environ:
        	return os.environ['AIRFLOW_HOST_NAME']
    	# First we attempt to fetch the callable path from the config.
    	try:
        	callable_path = conf.get('core', 'hostname_callable')
    	except AirflowConfigException:
        	callable_path = None

    	# Then we handle the case when the config is missing or empty. This is the
    	# default behavior.
    	if not callable_path:
        	return socket.getfqdn()

    	# Since we have a callable path, we try to import and run it next.
    	module_path, attr_name = callable_path.split(':')
    	module = importlib.import_module(module_path)
    	callable = getattr(module, attr_name)
    	return callable()
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM