一、DataX3.0概述
DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
請看下圖:
設計理念:
為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。
當前使用狀況:
DataX在阿里巴巴集團內被廣泛使用,承擔了所有大數據的離線同步業務,並已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸數據量超過300TB。

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
1、Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
2、Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
3、Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。
三、插件體系
DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入。
DataX目前支持數據如下:
四、DataX3.0核心架構
DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。
1、DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。
DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
2、DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
3、切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
4、每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
5、DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0。
五、DataX調度流程:
一個DataX Job會切分成多個Task,每個Task會按TaskGroup進行分組,一個Task內部會有一組Reader->Channel->Writer。Channel是連接Reader和Writer的數據交換通道,所有的數據都會經由Channel進行傳輸
六、Datax3.0安裝部署
1、環境准備
Linux
jdk 1.8
python 2.7.5(datax是由python2開發的)
2、datax下載地址
https://github.com/alibaba/DataX?spm=a2c4e.11153940.blogcont59373.11.7a684c4fvubOe1
查看安裝成功:在bin目錄下執行 python datax.py ../job/job.json
3、查看配置文件
在bin目錄下已經給出了樣例配置,但不同的數據源配置文件不一樣。通過命令查看配置模板
# python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
示例:[xxx@xxxbin]$ python datax.py -r mysqlreader -w hdfswriter
七、json配置文件模板說明
{
"job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "querySql": "", #自定義sql,支持多表關聯,當用戶配置querySql時,直接忽略table、column、where條件的配置。 "fetchSize": "", #默認1024,該配置項定義了插件和數據庫服務器端每次批量數據獲取條數,該值決定了DataX和服務器端的網絡交互次數,能夠較大的提升數據抽取性能,注意,該值過大(>2048)可能造成DataX進程OOM "splitPk": "db_id", #僅支持整形型數據切分;如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,如果該值為空,代表不切分,使用單通道進行抽取 "column": [], #"*"默認所有列,支持列裁剪,列換序 "connection": [ { "jdbcUrl": ["jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8"], "table": [] #支持多張表同時抽取 } ], "password": "", "username": "", "where": "" #指定的column、table、where條件拼接SQL,可以指定limit 10,也可以增量數據同步,如果該值為空,代表同步全表所有的信息 } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], #必須指定字段名,字段類型,{"name":"","tpye":""} "compress": "", #hdfs文件壓縮類型,默認不填寫意味着沒有壓縮。其中:text類型文件支持壓縮類型有gzip、bzip2;orc類型文件支持的壓縮類型有NONE、SNAPPY(需要用戶安裝SnappyCodec)。 "defaultFS": "", #Hadoop hdfs文件系統namenode節點地址。 "fieldDelimiter": "", #需要用戶保證與創建的Hive表的字段分隔符一致 "fileName": "", #HdfsWriter寫入時的文件名,需要指定表中所有字段名和字段類型,其中:name指定字段名,type指定字段類型。 "fileType": "", #目前只支持用戶配置為”text”或”orc” "path": "", #存儲到Hadoop hdfs文件系統的路徑信息,hive表在hdfs上的存儲路徑 "hadoopConfig": {} #hadoopConfig里可以配置與Hadoop相關的一些高級參數,比如HA的配置。 "writeMode": "" #append,寫入前不做任何處理,文件名不沖突;nonConflict,如果目錄下有fileName前綴的文件,直接報錯。 } } } ], "setting": { "speed": { #流量控制 "byte": 1048576, #控制傳輸速度,單位為byte/s,DataX運行會盡可能達到該速度但是不超過它 "channel": "" #控制同步時的並發數 } "errorLimit": { #臟數據控制 "record": 0 #對臟數據最大記錄數閾值(record值)或者臟數據占比閾值(percentage值,當數量或百分比,DataX Job報錯退出 } } } }
{ "job":{ "setting":{ "speed":{ "channel":1 } }, "content":[ { "reader":{ "name":"sqlserverreader", "parameter":{ "username":"xxxx", "password":"xxxx", "column":[ "UserGroupId", "Name" ], "connection":[ { "table": [ "UserGroups" ], "jdbcUrl":[ "jdbc:sqlserver://xxxx:1433;DatabaseName=TEST" ] } ] } }, "writer":{ "name":"mysqlwriter", "parameter":{ "username":"xxxx", "password":"xxxx", "column":[ "user_group_id", "user_group_name" ], "connection":[ { "jdbcUrl": "jdbc:mysql://xxxx:3306/test_recruit", "table": ["gcp_user_groups"] } ], "visible":false, "encoding":"UTF-8" } } } ] } }
八、datax-web安裝
1、參考官方的安裝,包可以這里下載
https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/datax-web-deploy.md
2、在選定的安裝目錄,解壓安裝包
tar -zxvf datax-web-{VERSION}.tar.gz
3、執行安裝腳本(需要安裝數據庫mysql)
[root@roobbin datax-web-2.1.2]# ./bin/install.sh 2020-10-17 10:00:09.430 [INFO] (22745) Creating directory: [/usr/local/datax-web-2.1.2/bin/../modules]. 2020-10-17 10:00:09.459 [INFO] (22745) ####### Start To Uncompress Packages ###### 2020-10-17 10:00:09.462 [INFO] (22745) Uncompressing.... Do you want to decompress this package: [datax-admin_2.1.2_1.tar.gz]? (Y/N)y 2020-10-17 10:00:17.298 [INFO] (22745) Uncompress package: [datax-admin_2.1.2_1.tar.gz] to modules directory Do you want to decompress this package: [datax-executor_2.1.2_1.tar.gz]? (Y/N)
按照提示輸入數據庫地址,端口號,用戶名,密碼以及數據庫名稱,大部分情況下即可快速完成初始化。 如果服務上並沒有安裝mysql命令,則可以取用目錄下/bin/db/datax-web.sql腳本去手動執行,完成后修改相關配置文件
vi ./modules/datax-admin/conf/bootstrap.properties #Database #DB_HOST= #DB_PORT= #DB_USERNAME= #DB_PASSWORD= #DB_DATABASE=
在項目目錄下/modules/datax-execute/bin/env.properties 指定PYTHON_PATH的路徑 vi ./modules/{module_name}/bin/env.properties ### 執行datax的python腳本地址 PYTHON_PATH= ### 保持和datax-admin服務的端口一致;默認是9527,如果沒改datax-admin的端口,可以忽略 DATAX_ADMIN_PORT=
4、啟動DataX_web
./bin/start-all.sh
進入可視化界面
http://ip:9527/index.html
登陸用戶名admin 密碼123456
over!