一、概述
DataX 是阿里巴巴開源的一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
DataX 設計
為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。


DataX 框架設計

- Reader:數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
- Writer:數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
- Framework:用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖, 流控,並發,數據轉換等核心技術問題。
DataX 運行原理
- Job:單個作業的管理節點,負責數據清理、子任務划分、TaskGroup監控管理。
- Task:由Job切分而來,是DataX作業的最小單元,每個Task負責一部分數據的同步工作。
- Schedule:將Task組成TaskGroup,單個TaskGroup的並發數量為5。
- TaskGroup:負責啟動Task。
二、DataX 安裝
官方文檔
環境要求
- Linux
- JDK(1.8以上,推薦1.8)
- Python(推薦Python2.6.X)
安裝
1、將下載好的datax.tar.gz上傳到hadoop201的/opt/softwarez
[hui@hadoop201 software]$ ll datax.tar.gz
-rw-rw-r--. 1 hui hui 829372407 Jan 20 13:27 datax.tar.gz
2、解壓到指定目錄
tar -zxvf datax.tar.gz -C /opt/module/
3、運行自檢腳本
[hui@hadoop201 software]$ cd /opt/module/datax/bin/
[hui@hadoop201 bin]$ python datax.py /opt/module/datax/job/job.json
三、應用舉例
mysql to hdfs
官方文檔
[hui@hadoop201 bin]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mysqlreader document: https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md Please refer to the hdfswriter document: https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
mysqlreader參數解析
"reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [ ] 【“querySql:[]】 } ], "password": "", "username": "", 【"where": ""】 【"splitPk": ""】 } } 注意:【】中的參數為可選參數
name:reader名 column:需要同步的列名集合,使用JSON數組描述自帶信息, *代表所有列 jdbcUrl:對數據庫的JDBC連接信息,使用JSON數組描述, 支持多個連接地址 table:需要同步的表,支持多個 querySql:自定義SQL,配置它后,mysqlreader直接忽略table、 column、where password:數據庫用戶名對應的密碼 username:數據庫用戶名 where:篩選條件 splitPK:數據分片字段,一般是主鍵,僅支持整型
hdfswriter參數解析
"writer": { "name": "hdfswriter", "parameter": { "column": [], "compress": "", "defaultFS": "", "fieldDelimiter": "", "fileName": "", "fileType": "", "path": "", "writeMode": "" } }
name:writer名 column:寫入數據的字段,其中name指定字段名,type指定類型 compress:hdfs文件壓縮類型,默認不填寫意味着沒有壓縮。 defaultFS:hdfs文件系統namenode節點地址,格式:hdfs://ip:端口 fieldDelimiter:字段分隔符 fileName:寫入文件名 fileType:文件的類型,目前只支持用戶配置為"text"或"orc" path:存儲到Hadoop hdfs文件系統的路徑信息 writeMode:hdfswriter寫入前數據清理處理模式: (1)append:寫入前不做任何處理,DataX hdfswriter直接使用filename寫入, 並保證文件名不沖突。 (2)nonConflict:如果目錄下有fileName前綴的文件,直接報錯。
數據准備
CREATE DATABASE test; USE test; CREATE TABLE book_info( id INT, NAME VARCHAR(20), author VARCHAR(20) ); SELECT * FROM book_info; INSERT INTO book_info VALUES(1001,'俠客行','金庸'),(1002, '孔雀翎','古龍'),(1003, '萍蹤俠影','梁羽生');
myql 到 hdfs 編寫配置文件
hui@hadoop201 job]$ pwd /opt/module/datax/job [hui@hadoop201 job]$ cat mysql2hdfs.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "id", "name", "author" ], "connection": [ { "jdbcUrl": [ "jdbcUrl": "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNul" ], "table": [ "book_info" ] } ], "username": "username", "password": "password" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "author", "type": "string" } ], "defaultFS": "hdfs://localhost:8020", "fieldDelimiter": "\t", "fileName": "book_info.txt", "fileType": "text", "path": "/", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }
執行測試
[hui@hadoop201 datax]$ bin/datax.py job/mysql2hdfs.json 2022-03-06 01:09:12.111 [job-0] INFO JobContainer - 任務啟動時刻 : 2022-03-06 01:08:57 任務結束時刻 : 2022-03-06 01:09:12 任務總計耗時 : 15s 任務平均流量 : 2B/s 記錄寫入速度 : 0rec/s 讀出記錄總數 : 3 讀寫失敗總數 : 0
查看結果
hui@hadoop201 datax]$ hadoop fs -ls / /book_info.txt__cfcd2ce4_a6dd_40c4_b449_4392774af189
說明:HdfsWriter實際執行時會在該文件名后添加隨機的后綴作為每個線程寫入實際文件名。
根據業務需要可以通過sql 來指定數據列和限制條件抽取數據
[hui@hadoop201 job]$ less base_province.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbcUrl": "jdbc:mysql://hostname:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNul" ], "querySql": [ "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=0" ] } ], "password": "password", "username": "username" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hostname:8020", "fieldDelimiter": "\t", "fileName": "base_province", "fileType": "text", "path": "/base_province", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
hdfs to mysql
1、將上個案例的文件名重置
[hui@hadoop201 datax]$ hadoop fs -mv /book_info.txt__cfcd2ce4_a6dd_40c4_b449_4392774af189 /book_info.txt
mysql 清空測試數據
TRUNCATE TABLE book_info
2、查看官方文檔
[hui@hadoop201 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the hdfsreader document: https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md Please refer to the mysqlwriter document: https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": [], "defaultFS": "", "encoding": "UTF-8", "fieldDelimiter": ",", "fileType": "orc", "path": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [], "connection": [ { "jdbcUrl": "", "table": [] } ], "password": "", "preSql": [], "session": [], "username": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } }
3、編寫hdfs 到 mysql 配置文件
[hui@hadoop201 datax]$ less job/hdfs2mysql.json { "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": ["*"], "defaultFS": "hdfs://hadoop201:8020", "encoding": "UTF-8", "fieldDelimiter": "\t", "fileType": "text", "path": "/book_info.txt" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "id", "name", "author" ], "connection": [ { "jdbcUrl": "jdbc:mysql://hadoop201:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNul", "table": ["book_info"] } ], "password": "password", "username": "username", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "1" } } } }
4、測試
[hui@hadoop201 datax]$ bin/datax.py job/hdfs2mysql.json 2022-03-06 01:31:57.812 [job-0] INFO JobContainer - 任務啟動時刻 : 2022-03-06 01:31:43 任務結束時刻 : 2022-03-06 01:31:57 任務總計耗時 : 14s 任務平均流量 : 2B/s 記錄寫入速度 : 0rec/s 讀出記錄總數 : 3 讀寫失敗總數 : 0