1.什么是DataX
DataX 是阿里巴巴開源的一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
https://github.com/kris-2018/DataX
2. DataX的設計
為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。
3. 框架設計
DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
Reader:數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
Writer:數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
Framework:用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。
4. 運行原理
1) DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
2) DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
3) 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
4) 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
5) DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
5.快速入門
前置要求
- Linux
- JDK(1.8以上,推薦1.8)
- Python(推薦Python2.6.X)
安裝
1)將下載好的datax.tar.gz上傳到hadoop101的/opt/software
[kris@hadoop101 software]$ ls
datax.tar.gz
2)解壓datax.tar.gz到/opt/module
[kris@hadoop101 software]$ tar -zxvf datax.tar.gz -C /opt/module/
3)運行自檢腳本
[kris@hadoop101 bin]$ cd /opt/module/datax/bin/
[kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json
[kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. 2019-07-14 07:51:25.661 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl 2019-07-14 07:51:25.676 [main] INFO Engine - the machine info => osInfo: Oracle Corporation 1.8 25.144-b01 jvmInfo: Linux amd64 2.6.32-642.el6.x86_64 cpu num: 8 totalPhysicalMemory: -0.00G freePhysicalMemory: -0.00G maxFileDescriptorCount: -1 currentOpenFileDescriptorCount: -1 GC Names [PS MarkSweep, PS Scavenge] MEMORY_NAME | allocation_size | init_size PS Eden Space | 256.00MB | 256.00MB Code Cache | 240.00MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB PS Survivor Space | 42.50MB | 42.50MB PS Old Gen | 683.00MB | 683.00MB Metaspace | -0.00MB | 0.00MB 2019-07-14 07:51:25.708 [main] INFO Engine - { "content":[ { "reader":{ "name":"streamreader", "parameter":{ "column":[ { "type":"string", "value":"DataX" }, { "type":"long", "value":19890604 }, { "type":"date", "value":"1989-06-04 00:00:00" }, { "type":"bool", "value":true }, { "type":"bytes", "value":"test" } ], "sliceRecordCount":100000 } }, "writer":{ "name":"streamwriter", "parameter":{ "encoding":"UTF-8", "print":false } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "byte":10485760 } } } 2019-07-14 07:51:35.892 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook 2019-07-14 07:51:35.896 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2019-07-14 07:51:35.897 [job-0] INFO JobContainer - PerfTrace not enable! 2019-07-14 07:51:35.898 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.018s | All Task WaitReaderTime 0.047s | Percentage 100.00% 2019-07-14 07:51:35.899 [job-0] INFO JobContainer - 任務啟動時刻 : 2019-07-14 07:51:25 任務結束時刻 : 2019-07-14 07:51:35 任務總計耗時 : 10s 任務平均流量 : 253.91KB/s 記錄寫入速度 : 10000rec/s 讀出記錄總數 : 100000 讀寫失敗總數 : 0
6. 使用案例
①從stream流讀取數據並打印到控制台
1)查看配置模板
[kris@hadoop101 bin]$ python datax.py -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }
②從MySQL的導入和導出
②. 1 讀取MySQL中的數據存放到HDFS
查看官方模板

[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mysqlreader document: https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md Please refer to the hdfswriter document: https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
編寫配置文件並執行:
[kris@hadoop101 job]$ vim mysql2hdfs

{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "id", "name" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop101:3306/test" ], "table": [ "stu1" ] } ], "username": "root", "password": "123456" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "INT" }, { "name": "name", "type": "STRING" } ], "defaultFS": "hdfs://hadoop101:9000", "fieldDelimiter": "\t", "fileName": "student.txt", "fileType": "text", "path": "/", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "2" } } } }
2019-07-14 09:02:55.924 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 1 | 1 | 1 | 0.071s | 0.071s | 0.071s PS Scavenge | 1 | 1 | 1 | 0.072s | 0.072s | 0.072s 2019-07-14 09:02:55.924 [job-0] INFO JobContainer - PerfTrace not enable! 2019-07-14 09:02:55.925 [job-0] INFO StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2019-07-14 09:02:55.927 [job-0] INFO JobContainer - 任務啟動時刻 : 2019-07-14 09:02:43 任務結束時刻 : 2019-07-14 09:02:55 任務總計耗時 : 12s 任務平均流量 : 3B/s 記錄寫入速度 : 0rec/s 讀出記錄總數 : 3 讀寫失敗總數 : 0
結果展示,從Mysql成功寫入HDFS中:
注意:HdfsWriter實際執行時會在該文件名后添加隨機的后綴作為每個線程寫入實際文件名。
[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter

[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mongodbreader document: https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md Please refer to the hdfswriter document: https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": [], "collectionName": "", "column": [], "dbName": "", "userName": "", "userPassword": "" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
②. 2讀取HDFS數據寫入MySQL
將上個案例上傳的文件改名
[kris@hadoop101 datax]$ hadoop fs -mv /student.txt* /student.txt
查看官方模板
[kris@hadoop101 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter
創建配置文件

{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": ["*"], "defaultFS": "hdfs://hadoop102:9000", "encoding": "UTF-8", "fieldDelimiter": "\t", "fileType": "text", "path": "/student.txt" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "id", "name" ], "connection": [ { "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", "table": ["student2"] } ], "password": "000000", "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "1" } } } }
在MySQL的datax數據庫中創建student2
mysql> use test;
mysql> create table stu1(id int,name varchar(20));
執行任務,查看數據庫中已成功寫入:
[kris@hadoop101 datax]$ bin/datax.py job/hdfs2mysql.json
③DataX導入導出案例
③.1讀取MongoDB的數據導入到HDFS
編寫配置文件
[kris@hadoop101 datax]$ vim job/mongdb2hdfs.json

{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["127.0.0.1:27017"], "collectionName": "atguigu", "column": [ { "name":"name", "type":"string" }, { "name":"url", "type":"string" } ], "dbName": "test", } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name":"name", "type":"string" }, { "name":"url", "type":"string" } ], "defaultFS": "hdfs://hadoop102:9000", "fieldDelimiter": "\t", "fileName": "mongo.txt", "fileType": "text", "path": "/", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }
mongodbreader參數解析
address: MongoDB的數據地址信息,因為MonogDB可能是個集群,則ip端口信息需要以Json數組的形式給出。【必填】
userName:MongoDB的用戶名。【選填】
userPassword: MongoDB的密碼。【選填】
collectionName: MonogoDB的集合名。【必填】
column:MongoDB的文檔列名。【必填】
name:Column的名字。【必填】
type:Column的類型。【選填】
splitter:因為MongoDB支持數組類型,但是Datax框架本身不支持數組類型,所以mongoDB讀出來的數組類型要通過這個分隔符合並成字符串。【選填】
執行
[kris@hadoop101 datax]$ bin/datax.py job/mongdb2hdfs.json
③.2 讀取MongoDB的數據導入MySQL
在MySQL中創建表 mysql> create table kris(name varchar(20),url varchar(20));
編寫DataX配置文件
[kris@hadoop101 datax]$ vim job/mongodb2mysql.json

{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["127.0.0.1:27017"], "collectionName": "kris", "column": [ { "name":"name", "type":"string" }, { "name":"url", "type":"string" } ], "dbName": "test", } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["*"], "connection": [ { "jdbcUrl": "jdbc:mysql://hadoop101:3306/test", "table": ["kris"] } ], "password": "123456", "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "1" } } } }
執行:
[kris@hadoop101 datax]$ bin/datax.py job/mongodb2mysql.json
查看結果:
mysql> select * from kris;
③.3 讀取MongoDB的數據導入HBase
vim mongodb2hbase.json

{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["x.x.x.x:28888"], "userName": "root", "userPassword": "xxxxt", "collectionName": "shop", "column": [ { "name":"_id", "type":"string" }, { "name":"shopNo", "type":"string" } ], "dbName": "posB", } }, "writer": { "name": "hbase11xwriter", "parameter": { "hbaseConfig": { "hbase.rootdir": "hdfs://hadoop101:9000/hbase", "hbase.cluster.distributed": "true", "hbase.zookeeper.quorum": "hadoop101,hadoop102,hadoop103" }, "table": "NS:shop", "mode": "normal", "rowkeyColumn": [ { "index":0, "type":"string" } ], "column": [ { "index":0, "name": "cf1:id", "type": "string" }, { "index":1, "name": "cf1:shopNo", "type": "string" } ], "versionColumn":{ "index": "-1", "value":"123456789" }, "encoding": "utf-8" } } } ], "setting": { "speed": { "channel": "1" } } } }