一、全量同步
1.簡單字段同步
本文以mysql -> mysql為示例:
本次測試的表為mysql的系統庫-sakila中的actor表,由於不支持目的端自動建表,此處預先建立目的表:
CREATE TABLE `actor_copy` ( `actor_id` smallint(5) unsigned NOT NULL AUTO_INCREMENT, `first_name` varchar(45) NOT NULL, `last_name` varchar(45) NOT NULL, `last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`actor_id`), KEY `idx_actor_last_name` (`last_name`) ) ENGINE=InnoDB AUTO_INCREMENT=201 DEFAULT CHARSET=utf8;
通過官方快速開始提供的命令,可以查看配置模板:
python datax.py -r {YOUR_READER} -w {YOUR_WRITER} python datax.py -r streamreader -w streamwriter
打開dataX的mysqlreader以及mysqlwriter文檔,編寫JSON配置文件:(此處經過試驗,即使是自增主鍵,同樣需要配置,否則會報輸入輸出不匹配的錯),加上JSON配置文件的x權限:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "Zcc170821#",
"column": [
"actor_id",
"first_name",
"last_name",
"last_update"
],
"splitPk": "actor_id",
"connection": [
{
"table": [
"actor"
],
"jdbcUrl": [
"jdbc:mysql://192.168.19.129:3306/sakila"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "Zcc170821#",
"column": [
"actor_id",
"first_name",
"last_name",
"last_update"
],
"preSql": [
"truncate table actor_copy"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.19.129:3306/sakila",
"table": [
"actor_copy"
]
}
]
}
}
}
]
}
}
這樣,單表的最基本全量同步就完成了!
通過python 命令運行即可:
python datax.py ../job/mysqltest.json
2.增加常量與插入時間字段
原表正常字段,目標表多出兩列:來源部門,插入時間。json配置如下:
常量使用單引號,時間暫時未摸索到變量如何使用(以下通過啟動腳本已更新方式),通過時間函數實現
{ "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "column": [ "actor_id", "first_name", "last_name", "last_update", "'自動生成'", "NOW()" ], "splitPk": "actor_id", "connection": [ { "table": [ "actor" ], "jdbcUrl": [ "jdbc:mysql://hadoop01:3306/sakila" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "insert", "username": "root", "password": "root", "column": [ "actor_id", "first_name", "last_name", "last_update", "src", "load_time" ], "preSql": [ "truncate table actor_copy" ], "connection": [ { "jdbcUrl": "jdbc:mysql://hadoop01:3306/sakila", "table": [ "actor_copy" ] } ] } } } ] } }
2020.1.11,更新通過啟動腳本控制時間戳:
首先Json配置更改為變量:(注意變量有個單引號!)
{ "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "column": [ "actor_id", "first_name", "last_name", "last_update", "'${src}'", "'${systime}'" ], "splitPk": "actor_id", "connection": [ { "table": [ "actor" ], "jdbcUrl": [ "jdbc:mysql://hadoop01:3306/sakila" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "insert", "username": "root", "password": "root", "column": [ "actor_id", "first_name", "last_name", "last_update", "src", "load_time" ], "preSql": [ "truncate table actor_copy" ], "connection": [ { "jdbcUrl": "jdbc:mysql://hadoop01:3306/sakila", "table": [ "actor_copy" ] } ] } } } ] } }
在datax的srcipts文件下新建一個啟動腳本:
#coding:UTF-8
from datetime import datetime import os import sys configFilePath = sys.argv[1] src = '自動生成' currentTime = format(datetime.now(), '%Y-%m-%d %H:%M:%S') script2execute = "python /opt/datax/bin/datax.py {0} -p \"-Dsrc='{1}' -Dsystime='{2}'\"".format( configFilePath, src, currentTime) os.system(script2execute)
在srcipts下的啟動命令為:
python ./datax_start.py '/opt/datax/job/mysql_actor_copy_arg.json'
二、增量同步
增量同步的核心思路是時間戳,需要同步的表中要有Update_time字段:
參考實現:https://www.jianshu.com/p/34b3a084d7d8
https://blog.csdn.net/quadimodo/article/details/82186788
增量數據和全量數據如何合並?使用full join
https://blog.csdn.net/kx306_csdn/article/details/89508323
當然如果有例如更新時間,修改時間字段,可以直接將增量表INTO入昨日全量,然后根據ID去重,取最新時間也是可以的