Airflow自定義插件, 使用datax抽數


Airflow自定義插件

Airflow之所以受歡迎的一個重要因素就是它的插件機制。Python成熟類庫可以很方便的引入各種插件。在我們實際工作中,必然會遇到官方的一些插件不足夠滿足需求的時候。這時候,我們可以編寫自己的插件。不需要你了解內部原理,甚至不需要很熟悉Python, 反正我連蒙帶猜寫的。

插件分類

Airflow的插件分為Operator和Sensor兩種。Operator是具體要執行的任務插件, Sensor則是條件傳感器,當我需要設定某些依賴的時候可以通過不同的sensor來感知條件是否滿足。

Airflow對插件提供的支持

插件肯定是Python文件了,系統必然需要加載才能執行。Airflow提供了一個簡單插件管理器,會掃描$AIRFLOW_HOME/plugins加載我們的插件。

所以,我們只需要將寫好的插件放入這個目錄下就可以了。

插件語法

Operator和Sensor都聲明了需要的參數,Operator通過調用execute來執行, sensor通過poke來確認。以Operator為例子。

插件的使用過程為:

dag  -> operator -> hook

Hook就是任務執行的具體操作了。

Operator通過繼承BaseOperator實現對dag相關屬性的綁定, Hook通過繼承BaseHook實現對系統配置和資源獲取的一些封裝。

自定義一個通知插件NotifyOperator

前文https://www.cnblogs.com/woshimrf/p/airflow-dag.html 提到我們通過自定義通知實現多功能任務告警,以下就是一個demo。

文件結構如下:

plugins
│   ├── hooks
│   └── operators

NotifyOperator

首先,在operators目錄下創建一個Operator.

# -*- coding: utf-8 -*-
#

from hooks.notify_hook import NotifyHook
from airflow.operators.bash_operator import BaseOperator

class NotifyOperator(BaseOperator):
    """
    使用通知服務發送通知

    :param message: 內容
    :type message: str or dict
    :param receivers: 英文逗號分割的羅盤賬號
    :type receivers: str
    :param subject: 郵件主題
    :type subject: str
    """
    template_fields = ('message', 'subject')

    @apply_defaults
    def __init__(self,
                 subject=None,
                 message=None,
                 receivers=None,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.message = message
        self.receivers = receivers
        self.subject = subject

    def execute(self, context):
        self.log.info('Sending notify message. receivers:{}  message:{}'.format(self.receivers, self.message))
        hook = NotifyHook(
            subject=self.subject,
            message=self.message,
            receivers=self.receivers
        )
        hook.send()
  • 繼承BaseOperator
  • 引入NotifyHook, 這個還沒創建,等下創建
  • template_fields, 想要使用模板變量替換,比如{{ds}}, 字段必須聲明到template_fields
  • Operator執行的時候會調用execute方法, 這個就是執行的內容

上面可以看出,operator就是接口聲明。

NotifyHook

在hooks目錄下創建NotifyHook

# -*- coding: utf-8 -*-
#

import json
import requests
from airflow import AirflowException
from airflow.hooks.http_hook import HttpHook

class NotifyHook(HttpHook):
    """
    使用通知服務發送通知

    :param send_type: 通知類型選填 MAIL,DINGDING,SMS,選填多個時中間用英文逗號隔開
    :type send_type: str
    :param message: 內容
    :type message: str or dict
    :param receivers: 英文逗號分割的賬號
    :type receivers: str
    :param subject: 郵件主題
    :type subject: str
    """

    def __init__(self,
                 notify_conn_id='notify_default',
                 send_type='MAIL',
                 subject=None,
                 message=None,
                 receivers=None,
                 *args,
                 **kwargs
                 ):
        super().__init__(http_conn_id=notify_conn_id, *args, **kwargs)
        self.send_type = send_type
        self.message = message
        self.subject = subject
        self.receivers = receivers


    def _build_message(self):
        """
        構建data
        """
        data = {
                "content": self.message,
                "contentType": "HTML",
                "receivers": self.receivers,
                "sendType": self.send_type,
                "sender": '【Airflow】',
                "subject": '【Airflow】' + self.subject
            }
        return json.dumps(data)

    def get_conn(self, headers=None):
        """
        Overwrite HttpHook get_conn because just need base_url and headers and
        not don't need generic params

        :param headers: additional headers to be passed through as a dictionary
        :type headers: dict
        """
        self.base_url = 'http://notify.ryan-miao.com'
        session = requests.Session()
        if headers:
            session.headers.update(headers)
        return session

    def send(self):
        """
        Send Notify message
        """
        data = self._build_message()
        self.log.info('Sending message: %s',  data)
        resp = self.run(endpoint='/api/v2/notify/send',
                        data=data,
                        headers={'Content-Type': 'application/json',
                                 'app-id': 'ryan',
                                 'app-key': '123456'})
        if int(resp.json().get('retCode')) != 0:
            raise AirflowException('Send notify message failed, receive error '
                                   'message %s', resp.text)
        self.log.info('Success Send notify message')
  • 這里使用的我自己的通知服務api調用。因為是http請求,所以直接繼承HttpHook來發送請求就可以了。
  • http_conn_id是用來讀取數據庫中connection里配置的host的,這里直接覆蓋,固定我們通知服務的地址。
  • 通過拋出異常的方式來終止服務

如何使用

將上面兩個文件放到airflow對應的plugins目錄下, airflow就自動加載了。然后,當做任務類型使用

from operators.notify_operator import NotifyOperator

notification = NotifyOperator(
                    task_id="we_are_done",
                    subject='發送郵件',
                    message='content',
                    receivers='ryanmiao'
                )

也可以直接執行。比如,我們前面提到任務失敗告警可以自定義通知。

from operators.notify_operator import NotifyOperator

def mail_failure_callback(receivers):
    """
    失敗后郵件通知
    :receivers  接收人,多個接收人用英文逗號分開
    """

    def mail_back(context):
        subject="【執行失敗】DAG {} TASK {} ds {}".format(
                                                context['task_instance'].dag_id,
                                                context['task_instance'].task_id,
                                                context['ds'])
        message="【執行失敗】DAG:  {};<br\> TASK:  {} <br\>; ds {} <br\>;   原因: {} .<br\>" \
                "查看地址: http://airflow.ryan-miao.com/admin/airflow/tree?dag_id={}" \
                .format(
                context['task_instance'].dag_id,
                context['task_instance'].task_id,
                context['ds'],
                context['exception'],
                context['task_instance'].dag_id)
            
        return NotifyOperator(
                    task_id="mail_failed_notify_callback",
                    subject=subject,
                    message=message,
                    receivers=receivers
                ).execute(context)

    return mail_back

default_args = {
    'owner': 'ryanmiao',
    'depends_on_past': False,
    'start_date': datetime(2019, 5, 1, 9),
    'on_failure_callback': mail_failure_callback(receivers='ryanmiao'),
    'retries': 0
}

dag = DAG(
    'example', default_args=default_args, schedule_interval=None)

自定義一個RDBMS2Hive插件

我們任務調度有個常見的服務是數據抽取到Hive,現在來制作這個插件,可以從關系數據庫中讀取數據,然后存儲到hive。這樣,用戶只要在airflow配置一下要抽數的database, table和目標hive table就可以實現每天數據入庫了。

異構數據傳輸轉換工具很多, 最簡單的就是使用原生的dump工具,將數據dump下來,然后import到另一個數據庫里。

比如postgres dump

${sql}查詢的列導出到文件${export_data_file}

psql -h$SRC_HOST_IP -U$SRC_USER_NAME -d$SRC_DB -p$SRC_HOST_PORT -c  "\copy (${sql}) to '${export_data_file}' WITH NULL AS ''"

然后導入hive

LOAD DATA LOCAL INPATH '${export_data_file}' INTO TABLE $TAR_TABLENAME PARTITION (BIZDATE='$BIZ_DATE')

對postgres來說,copy是最快的方案了, 但可能會遇到\t,\n等各種轉義符號,導出的txt文件或者cvs文件格式就會混亂,需要做對應符號轉義處理。

同樣, mysql 可以直接把數據查詢出來

cat search.sql | mysql -h"$SRC_HOST_IP" -u"$SRC_USER_NAME" -p"$SRC_USER_PWD" -P"$SRC_HOST_PORT" -D"$SRC_DB" --default-character-set=${mysql_charset} -N -s | sed "s/NULL/\\\\N/ig;s/\\\\\\\\n//ig" > result.txt

上述這些命令行的好處就是快,不好的地方在於shell命令的脆弱性和錯誤處理。最終,選擇了集成化的數據轉換工具datax. datax是阿里巴巴開源的一款異構數據源同步工具, 雖然看起來不怎么更新了,但簡單使用還是可以的。https://github.com/alibaba/DataX

datax的用法相對簡單,按照文檔配置一下讀取數據源和目標數據源,然后執行調用就可以了。可以當做命令行工具來使用。

結合airflow,可以自己實現datax插件。通過讀取connections拿到數據源鏈接配置,然后生成datax的配置文件json,最后調用datax執行。下面是一個從pg或者mysql讀取數據,導入hive的插件實現。

主要思路是:

  1. hdfs創建一個目錄
  2. 生成datax配置文件
  3. datax執行配置文件,將數據抽取到hdfs
  4. hive命令行load hdfs

RDBMS2HiveOperator

# -*- coding: utf-8 -*-
#
#

"""
postgres或者mysql 入庫到hdfs
"""
import os
import signal

from hooks.rdbms_to_hive_hook import RDBMS2HiveHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator


class RDBMS2HiveOperator(BaseOperator):
    """
    傳輸pg到hive
    https://github.com/alibaba/DataX

    :param conn_id: pg連接id
    :param query_sql : pg查詢語句
    :param split_pk  : pg分割主鍵, NONE表示不分割,指定后可以多線程分割,加快傳輸
    :param hive_db   : hive的db
    :param hive_table: hive的table
    :param hive_table_column  column數組, column={name:a, type: int} 或者逗號分割的字符串, column=a,b,c
    :param hive_table_partition 分區bizdate值
    """
    template_fields = ('query_sql',  'hive_db', 'hive_table','hive_table_partition')
    ui_color = '#edd5f1'

    @apply_defaults
    def __init__(self,
                 conn_id,
                 query_sql,
                 hive_db,
                 hive_table,
                 hive_table_column,
                 hive_table_partition,
                 split_pk=None,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.query_sql = query_sql
        self.split_pk = split_pk
        self.hive_db = hive_db
        self.hive_table = hive_table
        self.hive_table_column = hive_table_column
        self.hive_table_partition = hive_table_partition
        

    def execute(self, context):
        """
        Execute 
        """
        task_id = context['task_instance'].dag_id + "#" + context['task_instance'].task_id

        self.hook = RDBMS2HiveHook(
                        task_id = task_id,
                        conn_id = self.conn_id,
                        query_sql = self.query_sql, 
                        split_pk=self.split_pk, 
                        hive_db=self.hive_db, 
                        hive_table=self.hive_table,
                        hive_table_column=self.hive_table_column,
                        hive_table_partition=self.hive_table_partition
                        )
        self.hook.execute(context=context)

        
    def on_kill(self):
        self.log.info('Sending SIGTERM signal to bash process group')
        os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)

RDBMS2HiveHook

# -*- coding: utf-8 -*-
#

"""
datax入庫hive
"""
import subprocess
import uuid
import json
import os

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


class RDBMS2HiveHook(BaseHook):
    """
    Datax執行器 
    """

    def __init__(self, 
                 task_id,
                 conn_id,
                 query_sql,
                 hive_db,
                 hive_table,
                 hive_table_column,
                 hive_table_partition,
                 split_pk=None):
        self.task_id = task_id
        self.conn = self.get_connection(conn_id)
        self.query_sql = query_sql
        self.split_pk = split_pk
        self.hive_db = hive_db
        self.hive_table = hive_table
        self.hive_table_partition = hive_table_partition
        self.log.info("Using connection to: {}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema))
         
        self.hive_table_column = hive_table_column 
        if isinstance(hive_table_column, str):
            self.hive_table_column = []
            cl = hive_table_column.split(',') 
            for item in cl:
                hive_table_column_item = {
                    "name": item,
                    "type": "string"
                }
                self.hive_table_column.append(hive_table_column_item)

    
    def Popen(self, cmd, **kwargs):
        """
        Remote Popen

        :param cmd: command to remotely execute
        :param kwargs: extra arguments to Popen (see subprocess.Popen)
        :return: handle to subprocess
        """
        self.sp = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            **kwargs)

        for line in iter(self.sp.stdout):
            self.log.info(line.strip().decode('utf-8'))

        self.sp.wait()

        self.log.info("Command exited with return code %s", self.sp.returncode)

        if self.sp.returncode:
            raise AirflowException("Execute command failed")



    def generate_setting(self):
        """
         datax速度等設置
        """
        self.setting= {
            "speed": {
                 "byte": 104857600
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        }
        return self.setting
    
    def generate_reader(self):
        """
        datax reader
        """
        conn_type = 'mysql'
        reader_name = 'mysqlreader'
        if(self.conn.conn_type == 'postgres'):
            conn_type = 'postgresql'
            reader_name = 'postgresqlreader'
        
        self.jdbcUrl =  "jdbc:"+conn_type+"://"+self.conn.host.strip()+":"+str(self.conn.port)+"/"+ self.conn.schema.strip()
        self.reader =  {
            "name": reader_name,
            "parameter": {
                "username": self.conn.login.strip(),
                "password": self.conn.password.strip(),
                "connection": [
                    {
                        "querySql": [
                            self.query_sql
                        ],
                        "jdbcUrl": [
                            self.jdbcUrl
                        ]
                    }
                ]
            }
        }
        
        return self.reader

    def generate_writer(self):
        """
        datax hdafs writer
        """
        self.file_type = "text"
        self.hdfs_path = "/datax/"+self.hive_db+"/"+self.hive_table+"/"+self.hive_table_partition
        self.log.info("臨時存儲目錄:{}".format(self.hdfs_path))
        self.writer = {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://nameservice1",
                        "hadoopConfig": {
                            "dfs.nameservices": "nameservice1",
                            "dfs.ha.automatic-failover.enabled.nameservice1": True,
                            "ha.zookeeper.quorum": "bigdata2-prod-nn01.ryan-miao.com:2181,bigdata2-prod-nn02.ryan-miao.com:2181,bigdata2-prod-nn03.ryan-miao.com:2181",
                            "dfs.ha.namenodes.nameservice1": "namenode117,namenode124",
                            "dfs.namenode.rpc-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:8020",
                            "dfs.namenode.servicerpc-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:8022",
                            "dfs.namenode.http-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:50070",
                            "dfs.namenode.https-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:50470",
                            "dfs.namenode.rpc-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:8020",
                            "dfs.namenode.servicerpc-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:8022",
                            "dfs.namenode.http-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:50070",
                            "dfs.namenode.https-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:50470",
                            "dfs.replication": 3,
                            "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "fileType": self.file_type,
                        "path": self.hdfs_path,
                        "fileName": self.task_id,
                        "column": self.hive_table_column,
                        "writeMode": "nonConflict",
                        "fieldDelimiter": "\t"
                    }
            }
        return self.writer

    def generate_config(self):
        content = [{
            "reader": self.generate_reader(),
            "writer": self.generate_writer()
        }]

        job = {
            "setting": self.generate_setting(),
            "content": content
        }

        config = {
            "job": job
        }

        self.target_json = json.dumps(config)

        # write json to file
        self.json_file= '/tmp/datax_json_'+self.task_id+ uuid.uuid1().hex
        # 打開一個文件
        fo = open(self.json_file, "w")
        fo.write(self.target_json)
        fo.close()
        self.log.info("write config json {}".format(self.json_file))
        return self.json_file

    

    def execute(self, context):
        self.generate_config()
        # check hdfs_path
        hdfs_path = self.hdfs_path
        if(not hdfs_path.startswith('/datax/')):
            raise AirflowException("hdfs路徑填寫錯誤,不在/datax目錄下")

        # 創建目錄
        cmd = ['hadoop', 'fs', '-mkdir', '-p', hdfs_path]
        self.Popen(cmd)

        # 刪除文件
        if(not hdfs_path.startswith('/datax/')):
            raise AirflowException("hdfs路徑填寫錯誤,不在/datax目錄下")
        
        files_path = hdfs_path+"/*";
        try:
            cmd = ['hadoop', 'fs', '-rm', files_path]
            self.Popen(cmd)
        except Exception:
            self.log.info('ignore err, just make sure the dir is clean')
            pass
        
        
        # 上傳文件
        datax_home = '/data/opt/datax/bin'
        cmd = [ 'python', datax_home + '/datax.py', self.json_file]
        self.Popen(cmd)
        # 刪除配置文件
        os.remove(self.json_file)

        # hive加載
        #hive load data from hdfs
        hql = "LOAD DATA INPATH '"+ hdfs_path + "' OVERWRITE  INTO TABLE " \
              + self.hive_db+"."+self.hive_table + " PARTITION (bizdate="+ self.hive_table_partition  +")"
        cmd = ['hive', '-e', "\"" + hql + "\""]
        self.Popen(cmd)

如何使用

  1. admin登錄airflow
  2. 配置connection, 配置pg或者mysql的數據庫
  3. 修改hdfs集群配置信息
  4. 創建一個DAG
from airflow import DAG

from operators.rdbms_to_hive_operator import RDBMS2HiveOperator
from datetime import datetime, timedelta
from dag_utils import compass_utils


default_args = {
    'owner': 'ryanmiao',
    'depends_on_past': False,
    'start_date': datetime(2019, 5, 1, 9),
    'on_failure_callback': compass_utils.failure_callback(dingding_conn_id='dingding_bigdata', receivers='ryanmiao'),
    # 'on_success_callback': compass_utils.success_callback(dingding_conn_id='dingding_bigdata', receivers='ryanmiao'),
    'retries': 0
}

dag = DAG(
    'example_pg2hive', default_args=default_args, schedule_interval=None)

# CREATE TABLE test.pg2hive_test(
#      ftime int,
#      raw_cp_count int,
#      raw_to_delete_cp_count bigint,
#      create_time timestamp
#      )
#  COMMENT '這個是測試datax表'
#  PARTITIONED BY (bizdate int)
# ROW FORMAT DELIMITED
# FIELDS TERMINATED BY '\t'
# LINES TERMINATED BY '\n'
#  STORED AS TEXTFILE;
 
hive_table_column = "ftime,raw_cp_count,raw_to_delete_cp_count,create_time"

t1 = RDBMS2HiveOperator(
    task_id='pg2hive',
    conn_id='pg_rdb_poi',
    query_sql='select ftime, raw_cp_count, raw_to_delete_cp_count, create_time from tbl_poi_report limit 1000',
    hive_db='test',
    hive_table='pg2hive_test',
    hive_table_column=hive_table_column,
    hive_table_partition="{{ ds_nodash }}",
    dag=dag
)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM