一. DataX3.0概覽
DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
(這是一個單機多任務的ETL工具)
下載地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
1、設計理念
為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。
2、當前使用現狀
DataX在阿里巴巴集團內被廣泛使用,承擔了所有大數據的離線同步業務,並已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸數據量超過300TB。
此前已經開源DataX1.0版本,此次介紹為阿里雲開源全新版本DataX3.0,有了更多更強大的功能和更好的使用體驗。Github主頁地址:https://github.com/alibaba/DataX
二、DataX3.0框架設計
DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
-
Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
-
Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
-
Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。
三. DataX3.0插件體系
DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖
四、DataX3.0核心架構
DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。
核心模塊介紹:
-
DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
-
DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
-
切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
-
每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
-
DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
1、DataX調度流程:
舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:
-
DataXJob根據分庫分表切分成了100個Task。
-
根據20個並發,DataX計算共需要分配4個TaskGroup。
-
4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。
五、DataX 3.0六大核心優勢
1、可靠的數據質量監控
-
完美解決數據傳輸個別類型失真問題 DataX舊版對於部分數據類型(比如時間戳)傳輸一直存在毫秒階段等數據失真情況,新版本DataX3.0已經做到支持所有的強數據類型,每一種插件都有自己的數據類型轉換策略,讓數據可以完整無損的傳輸到目的端。
-
提供作業全鏈路的流量、數據量運行時監控 DataX3.0運行過程中可以將作業本身狀態、數據流量、數據速度、執行進度等信息進行全面的展示,讓用戶可以實時了解作業狀態。並可在作業執行過程中智能判斷源端和目的端的速度對比情況,給予用戶更多性能排查信息。
-
提供臟數據探測 在大量數據的傳輸過程中,必定會由於各種原因導致很多數據傳輸報錯(比如類型轉換錯誤),這種數據DataX認為就是臟數據。DataX目前可以實現臟數據精確過濾、識別、采集、展示,為用戶提供多種的臟數據處理模式,讓用戶准確把控數據質量大關!
2、豐富的數據轉換功能
DataX作為一個服務於大數據的ETL工具,除了提供數據快照搬遷功能之外,還提供了豐富數據轉換的功能,讓數據在傳輸過程中可以輕松完成數據脫敏,補全,過濾等數據轉換功能,另外還提供了自動groovy函數,讓用戶自定義轉換函數。詳情請看DataX3的transformer詳細介紹。
3、精准的速度控制
還在為同步過程對在線存儲壓力影響而擔心嗎?新版本DataX3.0提供了包括通道(並發)、記錄流、字節流三種流控模式,可以隨意控制你的作業速度,讓你的作業在庫可以承受的范圍內達到最佳的同步速度。
"speed": {
"channel": 8, ----並發數限速(根據自己CPU合理控制並發數)
"byte": 524288, ----字節流限速(根據自己的磁盤和網絡合理控制字節數)
"record": 10000 ----記錄流限速(根據數據合理空行數)
}
4、強勁的同步性能
DataX3.0每一種讀插件都有一種或多種切分策略,都能將作業合理切分成多個Task並行執行,單機多線程執行模型可以讓DataX速度隨並發成線性增長。在源端和目的端性能都足夠的情況下,單個作業一定可以打滿網卡。另外,DataX團隊對所有的已經接入的插件都做了極致的性能優化,並且做了完整的性能測試。
5、健壯的容錯機制
DataX作業是極易受外部因素的干擾,網絡閃斷、數據源不穩定等因素很容易讓同步到一半的作業報錯停止。因此穩定性是DataX的基本要求,在DataX 3.0的設計中,重點完善了框架和插件的穩定性。目前DataX3.0可以做到線程級別、進程級別(暫時未開放)、作業級別多層次局部/全局的重試,保證用戶的作業穩定運行。 線程內部重試
DataX的核心插件都經過團隊的全盤review,不同的網絡交互方式都有不同的重試策略。
6、線程級別重試
目前DataX已經可以實現TaskFailover,針對於中間失敗的Task,DataX框架可以做到整個Task級別的重新調度。
第二章、datax實戰
0.環境
1.JDK安裝配置【完美】
##### 卸載默認環境
yum -y remove java-1.8.0-openjdk*
yum -y remove tzdata-java*
# 下載安裝依賴
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 查到系統默認安裝jdk的位置/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre/bin/java
[root@ansible-server randolph]# ls -lrt /usr/bin/java
lrwxrwxrwx 1 root root 22 8月 23 13:42 /usr/bin/java -> /etc/alternatives/java
[root@ansible-server randolph]# ls -lrt /etc/alternatives/java
lrwxrwxrwx 1 root root 73 8月 23 13:42 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre/bin/java
# 加到環境變量
vim /etc/profile
# jdk
export JAVA_HOME=/usr/lib/jvm/java-1.8.0
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
source /etc/profile
# 檢測
java -version
javac
2.Maven安裝配置——用於編譯git clone下來的源碼
cd /opt/
wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
tar -zxvf apache-maven-3.6.3-bin.tar.gz
mv apache-maven-3.6.3/ maven # 改名方便操作
vim /etc/profile # 修改配置文件,並在末尾添加
# maven
M2_HOME=/opt/maven # 這里的路徑注意下
export PATH=${M2_HOME}/bin:${PATH}
# 重載文件立即生效
source /etc/profile
檢查maven安裝是否成功
mvn -v
檢查maven已經安裝
[root@ansible-server opt]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /opt/maven
Java version: 1.8.0_222, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.27.2.el7.x86_64", arch: "amd64", family: "unix"
3.檢查系統版本、python版本
實際上有python2/3都OK,后面datax的bin文件夾下的三個py文件datax.py也有python2/3版本,在執行命令的時候只需要對應即可。
[root@client-1 ~]# cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
[root@ansible-server DataX]# java -version
openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
[root@ansible-server ~]# python -V
Python 2.7.15
[root@ansible-server ~]# python3 -V
Python 3.6.8
1.DataX安裝部署
git clone https://github.com/alibaba/DataX.git
cd DataX
mvn -U clean package assembly:assembly -Dmaven.test.skip=true # maven打包
等待編譯好久… 竟然編譯了39分鍾!!!
留個紀念:
[INFO] datax/lib/slf4j-api-1.7.10.jar already added, skipping
[INFO] datax/lib/logback-classic-1.0.13.jar already added, skipping
[INFO] datax/lib/logback-core-1.0.13.jar already added, skipping
[INFO] datax/lib/commons-math3-3.1.1.jar already added, skipping
[INFO] datax/lib/hamcrest-core-1.3.jar already added, skipping
[WARNING] Assembly file: /opt/DataX/target/datax is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO]
[INFO] datax-all .......................................... SUCCESS [03:23 min]
[INFO] datax-common ....................................... SUCCESS [ 22.458 s]
[INFO] datax-transformer .................................. SUCCESS [ 25.734 s]
[INFO] datax-core ......................................... SUCCESS [01:02 min]
[INFO] plugin-rdbms-util .................................. SUCCESS [ 20.741 s]
[INFO] mysqlreader ........................................ SUCCESS [ 1.096 s]
[INFO] drdsreader ......................................... SUCCESS [ 2.539 s]
[INFO] sqlserverreader .................................... SUCCESS [ 1.885 s]
[INFO] postgresqlreader ................................... SUCCESS [ 5.029 s]
[INFO] oraclereader ....................................... SUCCESS [ 1.047 s]
[INFO] odpsreader ......................................... SUCCESS [ 43.033 s]
[INFO] otsreader .......................................... SUCCESS [ 31.965 s]
[INFO] otsstreamreader .................................... SUCCESS [ 16.498 s]
[INFO] plugin-unstructured-storage-util ................... SUCCESS [03:08 min]
[INFO] txtfilereader ...................................... SUCCESS [ 5.172 s]
[INFO] hdfsreader ......................................... SUCCESS [06:13 min]
[INFO] streamreader ....................................... SUCCESS [ 1.028 s]
[INFO] ossreader .......................................... SUCCESS [ 12.157 s]
[INFO] ftpreader .......................................... SUCCESS [ 6.172 s]
[INFO] mongodbreader ...................................... SUCCESS [ 9.056 s]
[INFO] rdbmsreader ........................................ SUCCESS [ 1.186 s]
[INFO] hbase11xreader ..................................... SUCCESS [04:06 min]
[INFO] hbase094xreader .................................... SUCCESS [02:57 min]
[INFO] tsdbreader ......................................... SUCCESS [ 7.664 s]
[INFO] opentsdbreader ..................................... SUCCESS [02:45 min]
[INFO] cassandrareader .................................... SUCCESS [ 35.874 s]
[INFO] mysqlwriter ........................................ SUCCESS [ 0.861 s]
[INFO] drdswriter ......................................... SUCCESS [ 1.111 s]
[INFO] odpswriter ......................................... SUCCESS [ 1.895 s]
[INFO] txtfilewriter ...................................... SUCCESS [ 3.672 s]
[INFO] ftpwriter .......................................... SUCCESS [ 3.045 s]
[INFO] hdfswriter ......................................... SUCCESS [ 6.932 s]
[INFO] streamwriter ....................................... SUCCESS [ 0.863 s]
[INFO] otswriter .......................................... SUCCESS [ 1.609 s]
[INFO] oraclewriter ....................................... SUCCESS [ 1.247 s]
[INFO] sqlserverwriter .................................... SUCCESS [ 0.855 s]
[INFO] postgresqlwriter ................................... SUCCESS [ 1.073 s]
[INFO] osswriter .......................................... SUCCESS [ 3.064 s]
[INFO] mongodbwriter ...................................... SUCCESS [ 3.227 s]
[INFO] adswriter .......................................... SUCCESS [ 20.076 s]
[INFO] ocswriter .......................................... SUCCESS [ 37.687 s]
[INFO] rdbmswriter ........................................ SUCCESS [ 1.196 s]
[INFO] hbase11xwriter ..................................... SUCCESS [ 6.453 s]
[INFO] hbase094xwriter .................................... SUCCESS [ 4.315 s]
[INFO] hbase11xsqlwriter .................................. SUCCESS [03:24 min]
[INFO] hbase11xsqlreader .................................. SUCCESS [02:09 min]
[INFO] elasticsearchwriter ................................ SUCCESS [ 21.244 s]
[INFO] tsdbwriter ......................................... SUCCESS [ 1.477 s]
[INFO] adbpgwriter ........................................ SUCCESS [ 24.980 s]
[INFO] gdbwriter .......................................... SUCCESS [01:30 min]
[INFO] cassandrawriter .................................... SUCCESS [ 5.859 s]
[INFO] hbase20xsqlreader .................................. SUCCESS [02:00 min]
[INFO] hbase20xsqlwriter .................................. SUCCESS [ 1.659 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 39:28 min
[INFO] Finished at: 2019-12-12T14:39:51+08:00
[INFO] ------------------------------------------------------------------------
[root@ansible-server DataX]#
4.DataX使用/踩坑
1.配置一個簡單例子
做什么好呢,正好手上有三個虛機absible-server client-1 client-2; 我們將absible-server mysql數據庫test.user同步到client-1數據庫test.user吧;
配置的任務是json格式的,我們假設任務的配置文件叫做mysql2mysql.json 需要用datax.py執行,看一眼datax.py就知道項目是python2的; mysql2mysql.json位置在上一層目錄job文件夾下存放;
1.如何跑任務呢:
python /opt/DataX/target/datax/datax/bin/datax.py /opt/DataX/target/datax/datax/job/mysql2mysql.json
成功的話會是這樣的:
但是我相信你需要仔細看一下任務的json文件該如何去配置
2.配置文件格式說明:
可以看到job分為reader和writer兩部分,正體現了datax的架構特征: 阿里雲開源離線同步工具DataX3.0介紹
部分參數說明:
jdbcUrl jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8 里面需要配置源IP/目標IP:數據庫端口/數據庫名?characterEncoding=utf8 控制編碼格式,防止寫入中文出現錯誤 username 這個用戶,需要在數據庫中創建,並賦予其增刪改查等權限 創建的可以用來讀取主庫/修改從庫的用戶,我直接使用的root用戶,並且后面試錯后給root用戶【允許所有遠程機器訪問】的權限 參考MySQL用戶授權(GRANT) /opt/DataX/target/datax/datax/job/mysql2mysql.json :
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8"],
"table": ["user"]
}
],
"password": "asdf",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.255.134:3306/test?characterEncoding=utf8",
"table": ["user"]
}
],
"password": "asdf",
"username": "root"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
上面的文件配置完,就可以用datax.py跑這個json文件了,當然,我第一次跑,遇到了幾處錯誤
3.跑任務出錯與測試
1.Host ‘ansible-server’ is not allowed to connect to this MySQL server 原因:mysql服務器出於安全考慮,默認只允許本地登錄數據庫服務器 因此主服務器133不能用root訪問從服務器數據庫:
[root@ansible-server job]# mysql -h192.168.255.134 -uroot -p
Enter password:
ERROR 1130 (HY000): Host '192.168.255.133' is not allowed to connect to this MySQL server
從服務器將mysql的root用戶的host從"localhost"改成"%",允許所有遠程端訪問: 主服務器的root用戶也作同樣修改,進入mysql庫,看下root的host,然后改成%,確認下:
mysql> use mysql;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select host, user from user;
+-----------+---------------+
| host | user |
+-----------+---------------+
| localhost | mysql.session |
| localhost | mysql.sys |
| localhost | root |
+-----------+---------------+
3 rows in set (0.28 sec)
mysql> update user set host = '%' where user = 'root';
Query OK, 1 row affected (0.59 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> flush privileges;
Query OK, 0 rows affected (0.14 sec)
mysql> select host, user from user;
+-----------+---------------+
| host | user |
+-----------+---------------+
| % | root |
| localhost | mysql.session |
| localhost | mysql.sys |
+-----------+---------------+
3 rows in set (0.00 sec)
2.跑同步任務成功【但中文插入錯誤】
檢查下有沒有將主庫的test.user表同步到從庫134的test.user表: 出現了錯誤,將中文數據顯示錯誤,jdbcurl末尾加上?characterEncoding=utf8 主庫test.user表插入一條數據:
[root@ansible-server job]# python /opt/DataX/target/datax/datax/bin/datax.py /opt/DataX/target/datax/datax/job/mysql2mysql.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2019-12-12 17:04:18.115 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-12-12 17:04:18.122 [main] INFO Engine - the machine info =>
osInfo: Oracle Corporation 1.8 25.232-b09
jvmInfo: Linux amd64 3.10.0-957.27.2.el7.x86_64
cpu num: 4
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2019-12-12 17:04:18.143 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
"id",
"name"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://127.0.0.1:3306/test"
],
"table":[
"user"
]
}
],
"password":"****",
"username":"root"