1.DataX介紹
DataX
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
安裝
Download DataX下載地址
解壓后即可使用,運行腳本如下
python27 datax.py ..\job\test.json
2.DataX數據同步
2.1 從MySQL到MySQL
建表語句
DROP TABLE IF EXISTS `tb_dmp_requser`; CREATE TABLE `tb_dmp_requser` ( `reqid` varchar(50) NOT NULL COMMENT '活動編號', `exetype` varchar(50) DEFAULT NULL COMMENT '執行類型', `allnum` varchar(11) DEFAULT NULL COMMENT '全部目標用戶數量', `exenum` varchar(11) DEFAULT NULL COMMENT '執行的目標用戶數據', `resv` varchar(50) DEFAULT NULL, `createtime` datetime DEFAULT NULL )
將dmp數據庫的tb_dmp_requser表拷貝到dota2_databank的tb_dmp_requser表
job_mysql_to_mysql.json如下
{ "job": { "content": [{ "reader": { "name": "mysqlreader", "parameter": { "column": [ "allnum", "reqid" ], "connection": [{ "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/dmp"], "table": ["tb_dmp_requser"] }], "password": "123456", "username": "root" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "allnum", "reqid" ], "preSql": [ "delete from tb_dmp_requser" ], "connection": [{ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/dota2_databank", "table": ["tb_dmp_requser"] }], "password": "123456", "username": "root", "writeMode": "replace" } } }], "setting": { "speed": { "channel": "2" } } } }
2.2 從Oracle到Oracle
將scott用戶下的test表拷貝到test用戶下的test表
建表語句
drop table TEST; CREATE TABLE TEST ( ID NUMBER(32) NULL, NAME VARCHAR2(255 BYTE) NULL ) LOGGING NOCOMPRESS NOCACHE;
job_oracle_oracle.json
{ "job": { "content": [ { "reader": { "name": "oraclereader", "parameter": { "column": ["id","name"], "connection": [ { "jdbcUrl": ["jdbc:oracle:thin:@localhost:1521:ORCL"], "table": ["test"] } ], "password": "tiger", "username": "scott", "where":"rownum < 1000" } }, "writer": { "name": "oraclewriter", "parameter": { "column": ["id","name"], "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:ORCL", "table": ["test"] } ], "password": "test", "username": "test" } } } ], "setting": { "speed": { "channel": 6 } } } }
2.3 從HBase到本地
將HBase的"LXW"表拷貝到本地路徑../job/datax_hbase
建表語句,添加兩條數據
hbase(main):046:0> create 'LXW','CF' 0 row(s) in 1.2120 seconds => Hbase::Table - LXW hbase(main):047:0> put 'LXW','row1','CF:NAME','lxw' 0 row(s) in 0.0120 seconds hbase(main):048:0> put 'LXW','row1','CF:AGE','18' 0 row(s) in 0.0080 seconds hbase(main):049:0> put 'LXW','row1','CF:ADDRESS','BeijingYiZhuang' 0 row(s) in 0.0070 seconds hbase(main):050:0> put 'LXW','row2','CF:ADDRESS','BeijingYiZhuang2' 0 row(s) in 0.0060 seconds hbase(main):051:0> put 'LXW','row2','CF:AGE','18' 0 row(s) in 0.0050 seconds hbase(main):052:0> put 'LXW','row2','CF:NAME','lxw2' 0 row(s) in 0.0040 seconds hbase(main):053:0> exit
job_hbase_to_local.json
hbase高可用集群配置參考https://www.cnblogs.com/Java-Starter/p/10756647.html
{ "job": { "content": [ { "reader": { "name": "hbase11xreader", "parameter": { "hbaseConfig": { "hbase.zookeeper.quorum": "CentOS7Five:2181,CentOS7Six:2181,CentOS7Seven:2181" }, "table": "LXW", "encoding": "utf-8", "mode": "normal", "column": [ { "name":"rowkey", "type":"string" }, { "name":"CF:NAME", "type":"string" }, { "name":"CF:AGE", "type":"string" }, { "name":"CF:ADDRESS", "type":"string" } ], "range": { "endRowkey": "", "isBinaryRowkey": false, "startRowkey": "" } } }, "writer": { "name": "txtfilewriter", "parameter": { "dateFormat": "yyyy-MM-dd", "fieldDelimiter": "\t", "fileName": "LXW", "path": "../job/datax_hbase", "writeMode": "truncate" } } } ], "setting": { "speed": { "channel": 5 } } } }
在../job/datax_hbase路徑下生成文件LXW__e647d969_d2c6_47ad_9534_15c90d696099
文件內容如下
row1 lxw 18 BeijingYiZhuang row2 lxw2 18 BeijingYiZhuang2
2.4 從本地到HBase
將本地文件導入到HBase的LXW表中
源數據source.txt
row3,jjj1,150,BeijingYiZhuang3 row4,jjj2,150,BeijingYiZhuang4
job_local_to_hbase.json
{ "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": "../job/datax_hbase/source.txt", "charset": "UTF-8", "column": [ { "index": 0, "type": "String" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" } ], "fieldDelimiter": "," } }, "writer": { "name": "hbase11xwriter", "parameter": { "hbaseConfig": { "hbase.zookeeper.quorum": "CentOS7Five:2181,CentOS7Six:2181,CentOS7Seven:2181" }, "table": "LXW", "mode": "normal", "rowkeyColumn": [ { "index":0, "type":"string" } ], "column": [ { "index":1, "name":"CF:NAME", "type":"string" }, { "index":2, "name":"CF:AGE", "type":"string" }, { "index":3, "name":"CF:ADDRESS", "type":"string" } ], "versionColumn":{ "index": -1, "value":"123456789" }, "encoding": "utf-8" } } } ] } }
導入過后可以看到,新增的數據
hbase(main):241:0* get 'LXW','row3' COLUMN CELL CF:ADDRESS timestamp=123456789, value=BeijingYiZhuang3 CF:AGE timestamp=123456789, value=150 CF:NAME timestamp=123456789, value=jjj1
2.5 從本地到HDFS/Hive
HDFS導入到本地不支持高可用,所以這里不做實驗
Hive高可用配置參考https://www.cnblogs.com/Java-Starter/p/10756528.html
將本地數據文件導入到HDFS/Hive,在Hive上建表才可以導入
因為路徑的問題,只能在Linux端操作
源數據source.txt
3,1,150,33 4,2,150,44
建表語句
create table datax_test( col1 varchar(10), col2 varchar(10), col3 varchar(10), col4 varchar(10) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS ORC;
fileType要orc,text類型必須要壓縮,有可能亂碼
job_local_to_hdfs.json
{ "setting": {}, "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["../job/datax_hbase/source.txt"], "encoding": "UTF-8", "column": [ { "index": 0, "type": "String" }, { "index": 1, "type": "String" }, { "index": 2, "type": "String" }, { "index": 3, "type": "String" } ], "fieldDelimiter": "," } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://ns1/", "hadoopConfig":{ "dfs.nameservices": "ns1", "dfs.ha.namenodes.ns1": "nn1,nn2", "dfs.namenode.rpc-address.ns1.nn1": "CentOS7One:9000", "dfs.namenode.rpc-address.ns1.nn2": "CentOS7Two:9000", "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "fileType": "orc", "path": "/user/hive/warehouse/datax_test", "fileName": "datax_test", "column": [ { "name": "col1", "type": "VARCHAR" }, { "name": "col2", "type": "VARCHAR" }, { "name": "col3", "type": "VARCHAR" }, { "name": "col4", "type": "VARCHAR" } ], "writeMode": "append", "fieldDelimiter": ",", "compress":"NONE" } } } ] } }
導入完畢,查看hive
Time taken: 0.105 seconds hive> > > > select *from datax_test; OK 3 1 150 33 4 2 150 44 Time taken: 0.085 seconds, Fetched: 2 row(s)
2.6 從txt到oracle
txt,dat,csv等格式均可,該dat文件16G,一億八千萬條記錄。
建表語句
CREATE TABLE T_CJYX_HOMECOUNT ( "ACYC_ID" VARCHAR2(4000 BYTE) NULL , "ADDRESS_ID" VARCHAR2(4000 BYTE) NULL , "ADDRESS_NAME" VARCHAR2(4000 BYTE) NULL , "ADDRESS_LEVEL" VARCHAR2(4000 BYTE) NULL , "CHECK_TARGET_NUM" VARCHAR2(4000 BYTE) NULL , "CHECK_VALUE" VARCHAR2(4000 BYTE) NULL , "TARGET_PHONE" VARCHAR2(4000 BYTE) NULL , "NOTARGET_PHONE" VARCHAR2(4000 BYTE) NULL , "PARENT_ID" VARCHAR2(4000 BYTE) NULL , "BCYC_ID" VARCHAR2(4000 BYTE) NULL )
job_txt_to_oracle.json文件如下
{ "setting": {}, "job": { "setting": { "speed": { "channel": 11 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["E:/opt/srcbigdata2/di_00121_20190427.dat"], "encoding": "UTF-8", "nullFormat": "", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" }, { "index": 4, "type": "string" }, { "index": 5, "type": "string" }, { "index": 6, "type": "string" }, { "index": 7, "type": "string" }, { "index": 8, "type": "string" }, { "index": 9, "type": "string" }, ], "fieldDelimiter": "$" } }, "writer": { "name": "oraclewriter", "parameter": { "column": ["acyc_id","address_id","address_name","address_level","check_target_num","check_value","target_phone","notarget_phone","parent_id","bcyc_id"], "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:ORCL", "table": ["T_CJYX_HOMECOUNT"] } ], "password": "test", "username": "test" } } } ] } }
腳本
python27 datax.py ../job/job_txt_to_oracle.json
效率比oracle自帶的sqlldr快很多,只需要117分鍾,就導入了一億八千萬數據,sqlldr需要41小時。
2.7 從txt到txt
job_txt_to_txt.json如下
{ "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["../job/data_txt/a.txt"], "encoding": "UTF-8", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" }, ], "fieldDelimiter": "$" } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "../job/data_txt/", "fileName": "luohw", "writeMode": "truncate", "format": "yyyy-MM-dd" } } } ] } }
導入完畢生成文件如下