#
DataX
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構數據源之間高效的數據同步功能。
Features
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
System Requirements
Linux
JDK(1.6以上,推薦1.6)
Python(推薦Python2.6.X)
Apache Maven 3.x (Compile DataX)
Quick Start
工具部署
方法一、直接下載DataX工具包:DataX
下載后解壓至本地某個目錄,進入bin目錄,即可運行同步作業:
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
方法二、下載DataX源碼,自己編譯:DataX源碼
(1)、下載DataX源碼:
$ git clone git@github.com:alibaba/DataX.git
(2)、通過maven打包:
$ cd {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志顯示如下:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------
打包成功后的DataX包位於 {DataX_source_code_home}/target/datax/datax/ ,結構如下:
$ cd {DataX_source_code_home}
$ ls ./target/datax/datax/
bin conf job lib log log_perf plugin script tmp
配置示例:從stream讀取數據並打印到控制台
第一步、創建創業的配置文件(json格式)
可以通過命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py -r streamreader -w streamwriter
DataX (UNKNOWN_DATAX_VERSION), From Alibaba !
Copyright (C) 2010-2015, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
根據模板配置json如下:
#stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
第二步:啟動DataX
$ cd {YOUR_DATAX_DIR_BIN}
$ python datax.py ./stream2stream.json
同步結束,顯示日志如下:
...
2015-12-17 11:20:25.263 [job-0] INFO JobContainer -
任務啟動時刻 : 2015-12-17 11:20:15
任務結束時刻 : 2015-12-17 11:20:25
任務總計耗時 : 10s
任務平均流量 : 205B/s
記錄寫入速度 : 5rec/s
讀出記錄總數 : 50
讀寫失敗總數 : 0
Support Data Channels
目前DataX支持的數據源有:
Reader
Reader實現了從數據存儲系統批量抽取數據,並轉換為DataX標准數據交換協議,DataX任意Reader能與DataX任意Writer實現無縫對接,達到任意異構數據互通之目的。
RDBMS 關系型數據庫
�MysqlReader: 使用JDBC批量抽取Mysql數據集。
OracleReader: 使用JDBC批量抽取Oracle數據集。
SqlServerReader: 使用JDBC批量抽取SqlServer數據集
PostgresqlReader: 使用JDBC批量抽取PostgreSQL數據集
DrdsReader: 針對公有雲上DRDS的批量數據抽取工具。
數倉數據存儲
ODPSReader: 使用ODPS Tunnel SDK批量抽取ODPS數據。
NoSQL數據存儲
OTSReader: 針對公有雲上OTS的批量數據抽取工具。
HBaseReader: 針對 HBase 0.94版本的在線數據抽取工具
無結構化數據存儲
TxtFileReader: 讀取(遞歸/過濾)本地文件。
FtpReader: 讀取(遞歸/過濾)遠程ftp文件。
HdfsReader: 針對Hdfs文件系統中textfile和orcfile文件批量數據抽取工具。
OssReader: 針對公有雲OSS產品的批量數據抽取工具。
StreamReader
Writer
Writer實現了從DataX標准數據交換協議,翻譯為具體的數據存儲類型並寫入目的數據存儲。DataX任意Writer能與DataX任意Reader實現無縫對接,達到任意異構數據互通之目的。
RDBMS 關系型數據庫
MysqlWriter: 使用JDBC(Insert,Replace方式)寫入Mysql數據庫
OracleWriter: 使用JDBC(Insert方式)寫入Oracle數據庫
PostgresqlWriter: 使用JDBC(Insert方式)寫入PostgreSQL數據庫
SqlServerWriter: 使用JDBC(Insert方式)寫入sqlserver數據庫
DrdsWriter: 使用JDBC(Replace方式)寫入Drds數據庫
數倉數據存儲
ODPSWriter: 使用ODPS Tunnel SDK向ODPS寫入數據。
ADSWriter: 使用ODPS中轉將數據導入ADS。
NoSQL數據存儲
OTSWriter: 使用OTS SDK向OTS Public模型的表中導入數據。
OCSWriter
MongoDBReader:MongoDBReader
MongoDBWriter:MongoDBWriter
無結構化數據存儲
TxtFileWriter: 提供寫入本地文件功能。
OssWriter: 使用OSS SDK寫入OSS數據。
HdfsWriter: 提供向Hdfs文件系統中寫入textfile文件和orcfile文件功能。
StreamWriter
###linux 批量執行job
exec.sh
find ./ -name "*.json" -exec python datax.py {} \;
