【dataX】阿里開源ETL工具——dataX簡單上手


一、概述

  1.是什么?

  DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。

  開源地址https://github.com/alibaba/DataX

二、簡介

  1.設計架構

  

 

  數據交換通過DataX進行中轉,任何數據源只要和DataX連接上即可以和已實現的任意數據源同步

   2.框架結構

  

  核心組件:

      Reader:數據采集模塊,負責從源采集數據

    Writer:數據寫入模塊,負責寫入目標庫

    Framework:數據傳輸通道,負責處理數據緩沖等

    以上只需要重寫Reader與Writer插件,即可實現新數據源支持

  支持主流數據源,詳見:https://github.com/alibaba/DataX/blob/master/introduction.md

  從一個JOB來理解datax的核心模塊組件:

    datax完成單個數據同步的作業,稱為Job,job會負責數據清理、任務切分等工作;

    任務啟動后,Job會根據不同源的切分策略,切分成多個Task並發執行,Task就是執行作業的最小單元

    切分完成后,根據Scheduler模塊,將Task組合成TaskGroup,每個group負責一定的並發和分配Task

三、入門

  1.安裝

    參考官方文檔

    提取為安裝隨筆https://www.cnblogs.com/jiangbei/p/10901201.html

  2.使用

    核心就是編寫配置文件(當前版本使用JSON)

    在datax服務器上運行:

 

python bin/datax.py -r mysqlreader -w hdfswriter

 

即可獲取配置模板

 

    【配置文件】

      先看一個示例:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [{
      "reader": {
        "name": "mysqlreader",
        "parameter": {
          "username": "",
          "password": "",
          "connection": [{
            "querySql": [
              "SELECT Id,CopId,BrandId,PayType,TradeNo,OutTradeNo,TotalFee,PayTotalFee,PayCashFee,PayCouponFee,RefundFee,ShopId,VipOldCode,VipId,UserCode,PayStatus,IsReverse,CreateDate,UNIX_TIMESTAMP(LastModifiedDate) as LastModifiedDate FROM mall_out_sales_order_pay where brandid=63 order by createDate asc;"
            ],
            "jdbcUrl": [
              "jdbc:mysql://192.168.12.41:3306/ezp-pay"
            ]
          }]
        }
      },
      "writer": {
        "name": "mysqlwriter",
        "parameter": {
          "writeMode": "insert",
          "username": "",
          "password": "",
          "dateFormat": "YYYY-MM-dd hh:mm:ss",
          "column": [
            "Id",
            "CopId",
            "BrandId",
            "PayType",
            "TradeNo",
            "OutTradeNo",
            "TotalFee",
            "PayTotalFee",
            "PayCashFee",
            "PayCouponFee",
            "RefundFee",
            "ShopId",
            "VipOldCode",
            "VipId",
            "UserCode",
            "PayStatus",
            "IsReverse",
            "CreateDate",
            "LastModifiedDate"
          ],
          "session": [
            "set session sql_mode='ANSI'"
          ],
          "preSql": [
            "delete from pay_order where brandid=63"
          ],
          "connection": [{
            "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/ezp-pay",
            "table": [
              "pay_order"
            ]
          }]
        }
      }
    }]
  }
}
JSON配置文件

    整個配置文件是一個Job配置文件,以Job為起手根元素,Job下面兩個子元素配置項:setting和content,

  其中,setting描述任務本身的信息,content描述源(reader)和目的端(writer)的信息:

  

   其中,content下又分為reader和writer兩塊,分別對應源端和目的端:

  

 

 各reader與writer插件的文檔直接點擊對應的文件夾進入doc即可!

    

     3.示例

      編寫一個Mysql到本地打印的Job:

      根據Mysqlreader插件編寫配置文件:

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "age",
                            "name"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "girl"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://192.168.19.129:3306/mysql"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

    根據自檢腳本(文件顏色淺綠色),可以知道需要是可執行文件,添加權限:

chmod +x mysqltest.json

    使用運行命令,運行即可:(進入bin目錄運行命令如下)

python datax.py ../job/mysqltest.json

     有時是希望靈活一點的自定義的配置,則可參考如下json配置:

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select db_id,on_line_flag from db_info where db_id < 10;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://bad_ip:3306/database",
                                    "jdbc:mysql://127.0.0.1:bad_port/database",
                                    "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}
View Code

 四、拓展

  1.調度

    任務很少,情景簡單的情況下,使用Linux自帶的corntab即可,當然,正常的時候推薦使用調度平台

    這里推薦airflow(其他相關的azkaban、oozie等調度不展開)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM