一、概述
1.是什么?
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。
開源地址:https://github.com/alibaba/DataX
二、簡介
1.設計架構

數據交換通過DataX進行中轉,任何數據源只要和DataX連接上即可以和已實現的任意數據源同步
2.框架結構

核心組件:
Reader:數據采集模塊,負責從源采集數據
Writer:數據寫入模塊,負責寫入目標庫
Framework:數據傳輸通道,負責處理數據緩沖等
以上只需要重寫Reader與Writer插件,即可實現新數據源支持
支持主流數據源,詳見:https://github.com/alibaba/DataX/blob/master/introduction.md
從一個JOB來理解datax的核心模塊組件:
datax完成單個數據同步的作業,稱為Job,job會負責數據清理、任務切分等工作;
任務啟動后,Job會根據不同源的切分策略,切分成多個Task並發執行,Task就是執行作業的最小單元
切分完成后,根據Scheduler模塊,將Task組合成TaskGroup,每個group負責一定的並發和分配Task
三、入門
1.安裝
參考官方文檔
提取為安裝隨筆:https://www.cnblogs.com/jiangbei/p/10901201.html
2.使用
核心就是編寫配置文件(當前版本使用JSON)
在datax服務器上運行:
python bin/datax.py -r mysqlreader -w hdfswriter
即可獲取配置模板
【配置文件】:
先看一個示例:
{ "job": { "setting": { "speed": { "channel": 1 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": "", "password": "", "connection": [{ "querySql": [ "SELECT Id,CopId,BrandId,PayType,TradeNo,OutTradeNo,TotalFee,PayTotalFee,PayCashFee,PayCouponFee,RefundFee,ShopId,VipOldCode,VipId,UserCode,PayStatus,IsReverse,CreateDate,UNIX_TIMESTAMP(LastModifiedDate) as LastModifiedDate FROM mall_out_sales_order_pay where brandid=63 order by createDate asc;" ], "jdbcUrl": [ "jdbc:mysql://192.168.12.41:3306/ezp-pay" ] }] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "insert", "username": "", "password": "", "dateFormat": "YYYY-MM-dd hh:mm:ss", "column": [ "Id", "CopId", "BrandId", "PayType", "TradeNo", "OutTradeNo", "TotalFee", "PayTotalFee", "PayCashFee", "PayCouponFee", "RefundFee", "ShopId", "VipOldCode", "VipId", "UserCode", "PayStatus", "IsReverse", "CreateDate", "LastModifiedDate" ], "session": [ "set session sql_mode='ANSI'" ], "preSql": [ "delete from pay_order where brandid=63" ], "connection": [{ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/ezp-pay", "table": [ "pay_order" ] }] } } }] } }
整個配置文件是一個Job配置文件,以Job為起手根元素,Job下面兩個子元素配置項:setting和content,
其中,setting描述任務本身的信息,content描述源(reader)和目的端(writer)的信息:

其中,content下又分為reader和writer兩塊,分別對應源端和目的端:

各reader與writer插件的文檔直接點擊對應的文件夾進入doc即可!

3.示例
編寫一個Mysql到本地打印的Job:
根據Mysqlreader插件編寫配置文件:
{ "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "column": [ "id", "age", "name" ], "connection": [ { "table": [ "girl" ], "jdbcUrl": [ "jdbc:mysql://192.168.19.129:3306/mysql" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print":true } } } ] } }
根據自檢腳本(文件顏色淺綠色),可以知道需要是可執行文件,添加權限:
chmod +x mysqltest.json
使用運行命令,運行即可:(進入bin目錄運行命令如下)
python datax.py ../job/mysqltest.json
有時是希望靈活一點的自定義的配置,則可參考如下json配置:
{ "job": { "setting": { "speed": { "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:mysql://bad_ip:3306/database", "jdbc:mysql://127.0.0.1:bad_port/database", "jdbc:mysql://127.0.0.1:3306/database" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": false, "encoding": "UTF-8" } } } ] } }
四、拓展
1.調度
任務很少,情景簡單的情況下,使用Linux自帶的corntab即可,當然,正常的時候推薦使用調度平台
這里推薦airflow(其他相關的azkaban、oozie等調度不展開)
