datax入門到實戰及面試


第一章、datax入門

一. DataX3.0概覽

DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。

(這是一個單機多任務的ETL工具)

img

下載地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

1、設計理念

為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。

2、當前使用現狀

DataX在阿里巴巴集團內被廣泛使用,承擔了所有大數據的離線同步業務,並已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸數據量超過300TB。

此前已經開源DataX1.0版本,此次介紹為阿里雲開源全新版本DataX3.0,有了更多更強大的功能和更好的使用體驗。Github主頁地址:https://github.com/alibaba/DataX

二、DataX3.0框架設計

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

 

img

  • Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。

  • Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。

  • Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。

三. DataX3.0插件體系

DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖

四、DataX3.0核心架構

DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。

 

img

核心模塊介紹:

  • DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。

  • DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。

  • 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。

  • 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。

  • DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0

1、DataX調度流程:

舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:

  • DataXJob根據分庫分表切分成了100個Task。

  • 根據20個並發,DataX計算共需要分配4個TaskGroup。

  • 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。

五、DataX 3.0六大核心優勢

1、可靠的數據質量監控
  1. 完美解決數據傳輸個別類型失真問題 DataX舊版對於部分數據類型(比如時間戳)傳輸一直存在毫秒階段等數據失真情況,新版本DataX3.0已經做到支持所有的強數據類型,每一種插件都有自己的數據類型轉換策略,讓數據可以完整無損的傳輸到目的端。

  2. 提供作業全鏈路的流量、數據量運行時監控 DataX3.0運行過程中可以將作業本身狀態、數據流量、數據速度、執行進度等信息進行全面的展示,讓用戶可以實時了解作業狀態。並可在作業執行過程中智能判斷源端和目的端的速度對比情況,給予用戶更多性能排查信息。

  3. 提供臟數據探測 在大量數據的傳輸過程中,必定會由於各種原因導致很多數據傳輸報錯(比如類型轉換錯誤),這種數據DataX認為就是臟數據。DataX目前可以實現臟數據精確過濾、識別、采集、展示,為用戶提供多種的臟數據處理模式,讓用戶准確把控數據質量大關!

2、豐富的數據轉換功能

DataX作為一個服務於大數據的ETL工具,除了提供數據快照搬遷功能之外,還提供了豐富數據轉換的功能,讓數據在傳輸過程中可以輕松完成數據脫敏,補全,過濾等數據轉換功能,另外還提供了自動groovy函數,讓用戶自定義轉換函數。詳情請看DataX3的transformer詳細介紹。

3、精准的速度控制

還在為同步過程對在線存儲壓力影響而擔心嗎?新版本DataX3.0提供了包括通道(並發)、記錄流、字節流三種流控模式,可以隨意控制你的作業速度,讓你的作業在庫可以承受的范圍內達到最佳的同步速度。

 

"speed": {
  "channel": 8,    ----並發數限速(根據自己CPU合理控制並發數)
  "byte": 524288,  ----字節流限速(根據自己的磁盤和網絡合理控制字節數)
  "record": 10000  ----記錄流限速(根據數據合理空行數)
}
4、強勁的同步性能

DataX3.0每一種讀插件都有一種或多種切分策略,都能將作業合理切分成多個Task並行執行,單機多線程執行模型可以讓DataX速度隨並發成線性增長。在源端和目的端性能都足夠的情況下,單個作業一定可以打滿網卡。另外,DataX團隊對所有的已經接入的插件都做了極致的性能優化,並且做了完整的性能測試。

5、健壯的容錯機制

DataX作業是極易受外部因素的干擾,網絡閃斷、數據源不穩定等因素很容易讓同步到一半的作業報錯停止。因此穩定性是DataX的基本要求,在DataX 3.0的設計中,重點完善了框架和插件的穩定性。目前DataX3.0可以做到線程級別、進程級別(暫時未開放)、作業級別多層次局部/全局的重試,保證用戶的作業穩定運行。 線程內部重試

DataX的核心插件都經過團隊的全盤review,不同的網絡交互方式都有不同的重試策略。

6、線程級別重試

目前DataX已經可以實現TaskFailover,針對於中間失敗的Task,DataX框架可以做到整個Task級別的重新調度。

第二章、datax實戰

0.環境

1.JDK安裝配置【完美】

##### 卸載默認環境

yum -y remove java-1.8.0-openjdk*        
yum -y remove tzdata-java*

# 下載安裝依賴

yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

# 查到系統默認安裝jdk的位置/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre/bin/java

[root@ansible-server randolph]# ls -lrt /usr/bin/java
lrwxrwxrwx 1 root root 22 8月  23 13:42 /usr/bin/java -> /etc/alternatives/java
[root@ansible-server randolph]# ls -lrt /etc/alternatives/java
lrwxrwxrwx 1 root root 73 8月  23 13:42 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre/bin/java

# 加到環境變量

vim /etc/profile

# jdk

export JAVA_HOME=/usr/lib/jvm/java-1.8.0
export JRE_HOME=$JAVA_HOME/jre  
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib

source /etc/profile

# 檢測

java -version
javac

2.Maven安裝配置——用於編譯git clone下來的源碼

cd /opt/
wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
tar -zxvf apache-maven-3.6.3-bin.tar.gz
mv apache-maven-3.6.3/ maven # 改名方便操作
vim /etc/profile # 修改配置文件,並在末尾添加

# maven

M2_HOME=/opt/maven # 這里的路徑注意下
export PATH=${M2_HOME}/bin:${PATH}

# 重載文件立即生效

source /etc/profile

檢查maven安裝是否成功
mvn -v
檢查maven已經安裝
[root@ansible-server opt]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /opt/maven
Java version: 1.8.0_222, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-957.27.2.el7.x86_64", arch: "amd64", family: "unix"

3.檢查系統版本、python版本

實際上有python2/3都OK,后面datax的bin文件夾下的三個py文件datax.py也有python2/3版本,在執行命令的時候只需要對應即可。

[root@client-1 ~]# cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
[root@ansible-server DataX]# java -version
openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
[root@ansible-server ~]# python -V
Python 2.7.15
[root@ansible-server ~]# python3 -V
Python 3.6.8

1.DataX安裝部署

git clone https://github.com/alibaba/DataX.git
cd DataX
mvn -U clean package assembly:assembly -Dmaven.test.skip=true # maven打包

在這里插入圖片描述

等待編譯好久… 竟然編譯了39分鍾!!!

留個紀念:

[INFO] datax/lib/slf4j-api-1.7.10.jar already added, skipping
[INFO] datax/lib/logback-classic-1.0.13.jar already added, skipping
[INFO] datax/lib/logback-core-1.0.13.jar already added, skipping
[INFO] datax/lib/commons-math3-3.1.1.jar already added, skipping
[INFO] datax/lib/hamcrest-core-1.3.jar already added, skipping
[WARNING] Assembly file: /opt/DataX/target/datax is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO]
[INFO] datax-all .......................................... SUCCESS [03:23 min]
[INFO] datax-common ....................................... SUCCESS [ 22.458 s]
[INFO] datax-transformer .................................. SUCCESS [ 25.734 s]
[INFO] datax-core ......................................... SUCCESS [01:02 min]
[INFO] plugin-rdbms-util .................................. SUCCESS [ 20.741 s]
[INFO] mysqlreader ........................................ SUCCESS [  1.096 s]
[INFO] drdsreader ......................................... SUCCESS [  2.539 s]
[INFO] sqlserverreader .................................... SUCCESS [  1.885 s]
[INFO] postgresqlreader ................................... SUCCESS [  5.029 s]
[INFO] oraclereader ....................................... SUCCESS [  1.047 s]
[INFO] odpsreader ......................................... SUCCESS [ 43.033 s]
[INFO] otsreader .......................................... SUCCESS [ 31.965 s]
[INFO] otsstreamreader .................................... SUCCESS [ 16.498 s]
[INFO] plugin-unstructured-storage-util ................... SUCCESS [03:08 min]
[INFO] txtfilereader ...................................... SUCCESS [  5.172 s]
[INFO] hdfsreader ......................................... SUCCESS [06:13 min]
[INFO] streamreader ....................................... SUCCESS [  1.028 s]
[INFO] ossreader .......................................... SUCCESS [ 12.157 s]
[INFO] ftpreader .......................................... SUCCESS [  6.172 s]
[INFO] mongodbreader ...................................... SUCCESS [  9.056 s]
[INFO] rdbmsreader ........................................ SUCCESS [  1.186 s]
[INFO] hbase11xreader ..................................... SUCCESS [04:06 min]
[INFO] hbase094xreader .................................... SUCCESS [02:57 min]
[INFO] tsdbreader ......................................... SUCCESS [  7.664 s]
[INFO] opentsdbreader ..................................... SUCCESS [02:45 min]
[INFO] cassandrareader .................................... SUCCESS [ 35.874 s]
[INFO] mysqlwriter ........................................ SUCCESS [  0.861 s]
[INFO] drdswriter ......................................... SUCCESS [  1.111 s]
[INFO] odpswriter ......................................... SUCCESS [  1.895 s]
[INFO] txtfilewriter ...................................... SUCCESS [  3.672 s]
[INFO] ftpwriter .......................................... SUCCESS [  3.045 s]
[INFO] hdfswriter ......................................... SUCCESS [  6.932 s]
[INFO] streamwriter ....................................... SUCCESS [  0.863 s]
[INFO] otswriter .......................................... SUCCESS [  1.609 s]
[INFO] oraclewriter ....................................... SUCCESS [  1.247 s]
[INFO] sqlserverwriter .................................... SUCCESS [  0.855 s]
[INFO] postgresqlwriter ................................... SUCCESS [  1.073 s]
[INFO] osswriter .......................................... SUCCESS [  3.064 s]
[INFO] mongodbwriter ...................................... SUCCESS [  3.227 s]
[INFO] adswriter .......................................... SUCCESS [ 20.076 s]
[INFO] ocswriter .......................................... SUCCESS [ 37.687 s]
[INFO] rdbmswriter ........................................ SUCCESS [  1.196 s]
[INFO] hbase11xwriter ..................................... SUCCESS [  6.453 s]
[INFO] hbase094xwriter .................................... SUCCESS [  4.315 s]
[INFO] hbase11xsqlwriter .................................. SUCCESS [03:24 min]
[INFO] hbase11xsqlreader .................................. SUCCESS [02:09 min]
[INFO] elasticsearchwriter ................................ SUCCESS [ 21.244 s]
[INFO] tsdbwriter ......................................... SUCCESS [  1.477 s]
[INFO] adbpgwriter ........................................ SUCCESS [ 24.980 s]
[INFO] gdbwriter .......................................... SUCCESS [01:30 min]
[INFO] cassandrawriter .................................... SUCCESS [  5.859 s]
[INFO] hbase20xsqlreader .................................. SUCCESS [02:00 min]
[INFO] hbase20xsqlwriter .................................. SUCCESS [  1.659 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  39:28 min
[INFO] Finished at: 2019-12-12T14:39:51+08:00
[INFO] ------------------------------------------------------------------------
[root@ansible-server DataX]#

 

4.DataX使用/踩坑

1.配置一個簡單例子

做什么好呢,正好手上有三個虛機absible-server client-1 client-2; 我們將absible-server mysql數據庫test.user同步到client-1數據庫test.user吧;

配置的任務是json格式的,我們假設任務的配置文件叫做mysql2mysql.json 需要用datax.py執行,看一眼datax.py就知道項目是python2的; mysql2mysql.json位置在上一層目錄job文件夾下存放;

1.如何跑任務呢:
python /opt/DataX/target/datax/datax/bin/datax.py /opt/DataX/target/datax/datax/job/mysql2mysql.json

成功的話會是這樣的:

在這里插入圖片描述

但是我相信你需要仔細看一下任務的json文件該如何去配置

2.配置文件格式說明:

可以看到job分為reader和writer兩部分,正體現了datax的架構特征: 阿里雲開源離線同步工具DataX3.0介紹

在這里插入圖片描述

部分參數說明:

jdbcUrl jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8 里面需要配置源IP/目標IP:數據庫端口/數據庫名?characterEncoding=utf8 控制編碼格式,防止寫入中文出現錯誤 username 這個用戶,需要在數據庫中創建,並賦予其增刪改查等權限 創建的可以用來讀取主庫/修改從庫的用戶,我直接使用的root用戶,並且后面試錯后給root用戶【允許所有遠程機器訪問】的權限 參考MySQL用戶授權(GRANT) /opt/DataX/target/datax/datax/job/mysql2mysql.json :

{
   "job": {
       "content": [
          {
               "reader": {
                   "name": "mysqlreader",
                   "parameter": {
                       "column": [
                           "id",
                           "name"
                      ],
                       "connection": [
                          {
                               "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8"],
                               "table": ["user"]
                          }
                      ],
"password": "asdf",
"username": "root"
                  }
              },
               "writer": {
                   "name": "mysqlwriter",
                   "parameter": {
                       "column": [
                       "id",
                       "name"
                      ],
                       "connection": [
                          {
                               "jdbcUrl": "jdbc:mysql://192.168.255.134:3306/test?characterEncoding=utf8",
                               "table": ["user"]
                          }
                      ],
"password": "asdf",
      "username": "root"
                  }
              }
          }
      ],
       "setting": {
           "speed": {
               "channel": "1"
          }
      }
  }
}

上面的文件配置完,就可以用datax.py跑這個json文件了,當然,我第一次跑,遇到了幾處錯誤

3.跑任務出錯與測試

1.Host ‘ansible-server’ is not allowed to connect to this MySQL server 原因:mysql服務器出於安全考慮,默認只允許本地登錄數據庫服務器 因此主服務器133不能用root訪問從服務器數據庫:

[root@ansible-server job]# mysql -h192.168.255.134 -uroot -p
Enter password:
ERROR 1130 (HY000): Host '192.168.255.133' is not allowed to connect to this MySQL server

從服務器將mysql的root用戶的host從"localhost"改成"%",允許所有遠程端訪問: 主服務器的root用戶也作同樣修改,進入mysql庫,看下root的host,然后改成%,確認下:

mysql> use mysql;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select host, user from user;
+-----------+---------------+
| host     | user         |
+-----------+---------------+
| localhost | mysql.session |
| localhost | mysql.sys     |
| localhost | root         |
+-----------+---------------+
3 rows in set (0.28 sec)

mysql> update user set host = '%' where user = 'root';
Query OK, 1 row affected (0.59 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> flush privileges;
Query OK, 0 rows affected (0.14 sec)

mysql> select host, user from user;
+-----------+---------------+
| host     | user         |
+-----------+---------------+
| %         | root         |
| localhost | mysql.session |
| localhost | mysql.sys     |
+-----------+---------------+
3 rows in set (0.00 sec)

 

2.跑同步任務成功【但中文插入錯誤】

檢查下有沒有將主庫的test.user表同步到從庫134的test.user表: 出現了錯誤,將中文數據顯示錯誤,jdbcurl末尾加上?characterEncoding=utf8 主庫test.user表插入一條數據:

在這里插入圖片描述

[root@ansible-server job]# python /opt/DataX/target/datax/datax/bin/datax.py /opt/DataX/target/datax/datax/job/mysql2mysql.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2019-12-12 17:04:18.115 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-12-12 17:04:18.122 [main] INFO Engine - the machine info  =>

osInfo: Oracle Corporation 1.8 25.232-b09
jvmInfo: Linux amd64 3.10.0-957.27.2.el7.x86_64
cpu num: 4

totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1

GC Names [PS MarkSweep, PS Scavenge]

MEMORY_NAME                   | allocation_size               | init_size                      
PS Eden Space                 | 256.00MB                       | 256.00MB                      
Code Cache                     | 240.00MB                       | 2.44MB                        
Compressed Class Space         | 1,024.00MB                     | 0.00MB                        
PS Survivor Space             | 42.50MB                       | 42.50MB                        
PS Old Gen                     | 683.00MB                       | 683.00MB                      
Metaspace                     | -0.00MB                       | 0.00MB                        

2019-12-12 17:04:18.143 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
"id",
"name"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://127.0.0.1:3306/test"
],
"table":[
"user"
]
}
],
"password":"****",
"username":"root"
}
},
"writer":{
"name":"mysqlwriter",
"parameter":{
"column":[
"id",
"name"
],
"connection":[
{
"jdbcUrl":"jdbc:mysql://192.168.255.134:3306/test",
"table":[
"user"
]
}
],
"password":"****",
"username":"root"
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}

2019-12-12 17:04:18.161 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2019-12-12 17:04:18.163 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2019-12-12 17:04:18.164 [main] INFO JobContainer - DataX jobContainer starts job.
2019-12-12 17:04:18.165 [main] INFO JobContainer - Set jobId = 0
2019-12-12 17:04:18.467 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2019-12-12 17:04:18.484 [job-0] INFO OriginalConfPretreatmentUtil - table:[user] has columns:[id,name].
2019-12-12 17:04:18.930 [job-0] INFO OriginalConfPretreatmentUtil - table:[user] all columns:[
id,name
].
2019-12-12 17:04:18.967 [job-0] INFO OriginalConfPretreatmentUtil - Write data [
INSERT INTO %s (id,name) VALUES(?,?)
], which jdbcUrl like:[jdbc:mysql://192.168.255.134:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]
2019-12-12 17:04:18.967 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2019-12-12 17:04:18.968 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2019-12-12 17:04:18.968 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .
2019-12-12 17:04:18.968 [job-0] INFO JobContainer - jobContainer starts to do split ...
2019-12-12 17:04:18.969 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2019-12-12 17:04:18.973 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2019-12-12 17:04:18.974 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.
2019-12-12 17:04:18.990 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2019-12-12 17:04:18.993 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2019-12-12 17:04:18.996 [job-0] INFO JobContainer - Running by standalone Mode.
2019-12-12 17:04:19.007 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2019-12-12 17:04:19.010 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2019-12-12 17:04:19.010 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2019-12-12 17:04:19.019 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2019-12-12 17:04:19.024 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name from user
] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2019-12-12 17:04:19.043 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,name from user
] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2019-12-12 17:04:19.226 [0-0-0-writer] WARN CommonRdbmsWriter$Task - 回滾此次寫入, 采用每次寫入一行方式提交. 因為:Duplicate entry '1' for key 'PRIMARY'
2019-12-12 17:04:19.243 [0-0-0-writer] ERROR StdoutPluginCollector -
com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_232]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_232]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_232]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_232]
at com.mysql.jdbc.Util.handleNewInstance(Util.java:377) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.Util.getInstance(Util.java:360) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:971) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1199) ~[mysql-connector-java-5.1.34.jar:5.1.34]
at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Task.doOneInsert(CommonRdbmsWriter.java:382) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Task.doBatchInsert(CommonRdbmsWriter.java:362) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Task.startWriteWithConnection(CommonRdbmsWriter.java:297) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Task.startWrite(CommonRdbmsWriter.java:319) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter$Task.startWrite(MysqlWriter.java:78) [mysqlwriter-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:56) [datax-core-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
2019-12-12 17:04:19.246 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '1' for key 'PRIMARY'","record":[{"byteSize":1,"index":0,"rawData":1,"type":"LONG"},{"byteSize":3,"index":1,"rawData":"cad","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.257 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '8' for key 'PRIMARY'","record":[{"byteSize":1,"index":0,"rawData":8,"type":"LONG"},{"byteSize":3,"index":1,"rawData":"半同步","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.263 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '9' for key 'PRIMARY'","record":[{"byteSize":1,"index":0,"rawData":9,"type":"LONG"},{"byteSize":4,"index":1,"rawData":"影響性能","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.267 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '10' for key 'PRIMARY'","record":[{"byteSize":2,"index":0,"rawData":10,"type":"LONG"},{"byteSize":8,"index":1,"rawData":"數據量小沒啥區別","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.270 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '11' for key 'PRIMARY'","record":[{"byteSize":2,"index":0,"rawData":11,"type":"LONG"},{"byteSize":4,"index":1,"rawData":"雨女無瓜","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.285 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '12' for key 'PRIMARY'","record":[{"byteSize":2,"index":0,"rawData":12,"type":"LONG"},{"byteSize":3,"index":1,"rawData":"馬冬梅","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.289 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '13' for key 'PRIMARY'","record":[{"byteSize":2,"index":0,"rawData":13,"type":"LONG"},{"byteSize":4,"index":1,"rawData":"馬什么梅","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.294 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '14' for key 'PRIMARY'","record":[{"byteSize":2,"index":0,"rawData":14,"type":"LONG"},{"byteSize":4,"index":1,"rawData":"馬冬什么","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.297 [0-0-0-writer] ERROR StdoutPluginCollector - 臟數據:
{"exception":"Duplicate entry '15' for key 'PRIMARY'","record":[{"byteSize":2,"index":0,"rawData":15,"type":"LONG"},{"byteSize":4,"index":1,"rawData":"什么冬梅","type":"STRING"}],"type":"writer"}
2019-12-12 17:04:19.523 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[508]ms
2019-12-12 17:04:19.524 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2019-12-12 17:04:29.015 [job-0] INFO StandAloneJobContainerCommunicator - Total 11 records, 60 bytes | Speed 6B/s, 1 records/s | Error 10 records, 56 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2019-12-12 17:04:29.016 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2019-12-12 17:04:29.016 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2019-12-12 17:04:29.016 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2019-12-12 17:04:29.016 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2019-12-12 17:04:29.017 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/DataX/target/datax/datax/hook
2019-12-12 17:04:29.018 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu                     | maxDeltaCpu                   | minDeltaCpu                    
-1.00%                         | -1.00%                         | -1.00%
                       

[total gc info] =>
NAME                 | totalGCCount       | maxDeltaGCCount   | minDeltaGCCount   | totalGCTime       | maxDeltaGCTime     | minDeltaGCTime    
PS MarkSweep         | 0                 | 0                 | 0                 | 0.000s             | 0.000s             | 0.000s            
PS Scavenge         | 0                 | 0                 | 0                 | 0.000s             | 0.000s             | 0.000s            

2019-12-12 17:04:29.018 [job-0] INFO JobContainer - PerfTrace not enable!
2019-12-12 17:04:29.019 [job-0] INFO StandAloneJobContainerCommunicator - Total 11 records, 60 bytes | Speed 6B/s, 1 records/s | Error 10 records, 56 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2019-12-12 17:04:29.020 [job-0] INFO JobContainer -
任務啟動時刻                   : 2019-12-12 17:04:18
任務結束時刻                   : 2019-12-12 17:04:29
任務總計耗時                   :                 10s
任務平均流量                   :               6B/s
記錄寫入速度                   :             1rec/s
讀出記錄總數                   :                  11
讀寫失敗總數                   :                  10

從庫結果:

在這里插入圖片描述

再次嘗試: 但是之前錯誤的沒有改正

在這里插入圖片描述

相較於mysql自帶的主從復制功能,這里阿里的DataX同步“主從庫”不會因為數據不一致而直接同步不成功,IO線程掛掉,而是選擇忽略,接着同步其他數據。蠻像Noteability軟件的。

5.結合crontabs,使DataX可以配置定時任務

1.確保安裝crontabs
yum install crontabs
# crontabs-1.11-6.20121102git.el7.noarch 已安裝並且是最新版本

常用命令:

service crond start  # 啟服務
service crond stop # 停服務
service crond restart # 重啟服務
service crond reload # 重載配置
service crond status # 查看服務狀態
crontab -l # 查看當前用戶的定時任務
2.如何創建crontab定時任務

我們在/opt/DataX/target/datax/datax/job下創建定時任務文件,因為是crontabs+datax,就叫做crondatax吧: vim crondatax

30,31,32,33  19 * * * python /opt/DataX/target/datax/datax/bin/datax.py /opt/DataX/target/datax/datax/job/mysql2mysql.json >>/opt/DataX/target/datax/datax/job/log.`date +\%Y\%m\%d\%H\%M\%S`  2>&1

格式說明:參照下圖,解釋下上面的測試例子:19時的30,31,32,33分分別做一次這個任務python /opt/DataX/target/datax/datax/bin/datax.py /opt/DataX/target/datax/datax/job/mysql2mysql.json,然后將每次執行的日志記錄下,下圖的log.xxxx就是,內容不贅述,都成功了。 填寫說明:再新建一個任務,就在crondatax文件換一行寫入即可。

在這里插入圖片描述

在這里插入圖片描述

跑任務,看下有沒有:

在這里插入圖片描述

跑起來了,這主庫加的一條:

在這里插入圖片描述

還未刷新navicat:

在這里插入圖片描述

刷新,數據已經同步進來了,【我手速太慢,截圖】:

在這里插入圖片描述

第三章、datax面試

一、DataX介紹

 DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構數據源之間高效的數據同步功能。 DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。

 

二、Job&Task概念

 在DataX的邏輯模型中包括job、task兩個維度,通過將job進行task拆分,然后將task合並到taskGroup進行運行。

  • job實例運行在jobContainer容器中,它是所有任務的master,負責初始化、拆分、調度、運行、回收、監控和匯報,但它並不做實際的數據同步操作。

  • Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。

  • Task: Task是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個並發執行。

  • TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup。

  • JobContainer: Job執行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker。

  • TaskGroupContainer: TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TaskTracker。

  • 簡而言之, Job拆分成Task,在分別在框架提供的容器中執行,插件只需要實現Job和Task兩部分邏輯。

 

三、啟動過程

img

說明:

  • 上圖中,黃色表示Job部分的執行階段,藍色表示Task部分的執行階段,綠色表示框架執行階段。

 

 

img

說明:

  • reader和writer的自定義插件內部需要實現job和task的接口即可

 

四、DataX開啟Debug

 閱讀源碼的最好方法是debug整個項目工程,在如何調試DataX項目的過程中還是花費了一些精力在里面的,現在一並共享出來供有興趣的程序員一並研究。 整個debug過程需要按照下列步驟進行:

  • 1、github上下載DataX的源碼並通過以下命令進行編譯,github官網有編譯命令,如果遇到依賴包無法下載可以省去部分writer或reader插件,不影響debug。

 

(1)、下載DataX源碼:

$ git clone git@github.com:alibaba/DataX.git
(2)、通過maven打包:

$ cd {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志顯示如下:

[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------
打包成功后的DataX包位於 {DataX_source_code_home}/target/datax/datax/ ,結構如下:

$ cd {DataX_source_code_home}
$ ls ./target/datax/datax/
bin     conf        job     lib     log     log_perf    plugin      script      tmp
  • 2、由於DataX是通過python腳本進行啟動的,所以在python腳本中把啟動參數打印出來,核心在於print startCommand這句,繼而我們就能夠獲取啟動命令參數了。

 

if __name__ == "__main__":
   printCopyright()
   parser = getOptionParser()
   options, args = parser.parse_args(sys.argv[1:])
   if options.reader is not None and options.writer is not None:
       generateJobConfigTemplate(options.reader,options.writer)
       sys.exit(RET_STATE['OK'])
   if len(args) != 1:
       parser.print_help()
       sys.exit(RET_STATE['FAIL'])

   startCommand = buildStartCommand(options, args)
   print startCommand

   child_process = subprocess.Popen(startCommand, shell=True)
   register_signal()
  (stdout, stderr) = child_process.communicate()

   sys.exit(child_process.returncode)
  • 3、獲取啟動DataX的啟動命令

 

java -server  -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log
-Dloglevel=info -Dfile.encoding=UTF-8
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
-Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax
-Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml
-classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:.
-Dlog.file.name=s_datax_job_job_json
com.alibaba.datax.core.Engine

-mode standalone -jobid -1
-job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
  • 4、配置Idea啟動腳本

img

 

以下配置在VM options當中
-server  -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log
-Dloglevel=info -Dfile.encoding=UTF-8
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
-Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax
-Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml
-classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:.
-Dlog.file.name=s_datax_job_job_json
com.alibaba.datax.core.Engine

以下配置在Program arguments當中
-mode standalone -jobid -1
-job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json

 

五、啟動步驟解析

  • 1、解析配置,包括job.json、core.json、plugin.json三個配置

  • 2、設置jobId到configuration當中

  • 3、啟動Engine,通過Engine.start()進入啟動程序

  • 4、設置RUNTIME_MODE奧configuration當中

  • 5、通過JobContainer的start()方法啟動

  • 6、依次執行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。

  • 7、init()方法涉及到根據configuration來初始化reader和writer插件,這里涉及到jar包熱加載以及調用插件init()操作方法,同時設置reader和writer的configuration信息

  • 8、prepare()方法涉及到初始化reader和writer插件的初始化,通過調用插件的prepare()方法實現,每個插件都有自己的jarLoader,通過集成URLClassloader實現而來

  • 9、split()方法通過adjustChannelNumber()方法調整channel個數,同時執行reader和writer最細粒度的切分,需要注意的是,writer的切分結果要參照reader的切分結果,達到切分后數目相等,才能滿足1:1的通道模型

  • 10、channel的計數主要是根據byte和record的限速來實現的,在split()的函數中第一步就是計算channel的大小

  • 11、split()方法reader插件會根據channel的值進行拆分,但是有些reader插件可能不會參考channel的值,writer插件會完全根據reader的插件1:1進行返回

  • 12、split()方法內部的mergeReaderAndWriterTaskConfigs()負責合並reader、writer、以及transformer三者關系,生成task的配置,並且重寫job.content的配置

  • 13、schedule()方法根據split()拆分生成的task配置分配生成taskGroup對象,根據task的數量和單個taskGroup支持的task數量進行配置,兩者相除就可以得出taskGroup的數量

  • 14、schdule()內部通過AbstractScheduler的schedule()執行,繼續執行startAllTaskGroup()方法創建所有的TaskGroupContainer組織相關的task,TaskGroupContainerRunner負責運行TaskGroupContainer執行分配的task。

  • 15、taskGroupContainerExecutorService啟動固定的線程池用以執行TaskGroupContainerRunner對象,TaskGroupContainerRunner的run()方法調用taskGroupContainer.start()方法,針對每個channel創建一個TaskExecutor,通過taskExecutor.doStart()啟動任務

 

六、啟動過程源碼分析

1、入口main函數

 

public class Engine {

   public static void main(String[] args) throws Exception {
       int exitCode = 0;
       try {
           Engine.entry(args);
      } catch (Throwable e) {
           System.exit(exitCode);
      }
  }

   public static void entry(final String[] args) throws Throwable {

       // 省略相關參數的解析代碼
       
       // 獲取job的配置路徑信息
       String jobPath = cl.getOptionValue("job");

       // 如果用戶沒有明確指定jobid, 則 datax.py 會指定 jobid 默認值為-1
       String jobIdString = cl.getOptionValue("jobid");
       RUNTIME_MODE = cl.getOptionValue("mode");

       // 解析配置信息
       Configuration configuration = ConfigParser.parse(jobPath);

       // 省略相關代碼
       boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
       configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

       // 根據配置啟動參數
       Engine engine = new Engine();
       engine.start(configuration);
  }
}

說明: main函數主要做兩件事情,分別是:

  • 1、解析job相關配置生成configuration。

  • 2、依據配置啟動Engine。

 

2、configuration解析過程

 

public final class ConfigParser {
   private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class);
   /**
    * 指定Job配置路徑,ConfigParser會解析Job、Plugin、Core全部信息,並以Configuration返回
    */
   public static Configuration parse(final String jobPath) {
       // 加載任務的指定的配置文件,這個配置是有固定的json的固定模板格式的
       Configuration configuration = ConfigParser.parseJobConfig(jobPath);

       // 合並conf/core.json的配置文件
       configuration.merge(
               ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
               false);

       // todo config優化,只捕獲需要的plugin
       // 固定的節點路徑 job.content[0].reader.name
       String readerPluginName = configuration.getString(
               CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
       // 固定的節點路徑 job.content[0].writer.name
       String writerPluginName = configuration.getString(
               CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

       // 固定的節點路徑 job.preHandler.pluginName
       String preHandlerName = configuration.getString(
               CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

       // 固定的節點路徑 job.postHandler.pluginName
       String postHandlerName = configuration.getString(
               CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

       // 添加讀寫插件的列表待加載
       Set<String> pluginList = new HashSet<String>();
       pluginList.add(readerPluginName);
       pluginList.add(writerPluginName);

       if(StringUtils.isNotEmpty(preHandlerName)) {
           pluginList.add(preHandlerName);
      }
       if(StringUtils.isNotEmpty(postHandlerName)) {
           pluginList.add(postHandlerName);
      }
       try {
           // parsePluginConfig(new ArrayList<String>(pluginList))加載指定的插件的配置信息,並且和全局的配置文件進行合並
           configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
      }catch (Exception e){
      }

       // configuration整合了三方的配置,包括 任務配置、core核心配置、指定插件的配置。
       return configuration;
  }


   // 在指定的reader和writer目錄獲取指定的插件並解析其配置
   public static Configuration parsePluginConfig(List<String> wantPluginNames) {
       // 創建一個空的配置信息對象
       Configuration configuration = Configuration.newDefault();

       Set<String> replicaCheckPluginSet = new HashSet<String>();
       int complete = 0;
       // 所有的reader在/plugin/reader目錄,遍歷獲取所有reader的目錄
       // 獲取待加載插件的配資信息,並合並到上面創建的空配置對象
       // //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader
       for (final String each : ConfigParser
              .getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {

           // 解析單個reader目錄,eachReaderConfig保存的是key是plugin.reader.pluginname,value是對應的plugin.json內容
           Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
           if(eachReaderConfig!=null) {
               // 采用覆蓋式的合並
               configuration.merge(eachReaderConfig, true);
               complete += 1;
          }
      }

       // //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer
       for (final String each : ConfigParser
              .getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
           Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
           if(eachWriterConfig!=null) {
               configuration.merge(eachWriterConfig, true);
               complete += 1;
          }
      }

       if (wantPluginNames != null && wantPluginNames.size() > 0 && wantPluginNames.size() != complete) {
           throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加載失敗,未完成指定插件加載:" + wantPluginNames);
      }

       return configuration;
  }
}

說明: configuration解析包括三部分的配置解析合並解析結果並返回,分別是:

  • 1、解析job的配置信息,由啟動參數指定job.json文件。

  • 2、解析DataX自帶配置信息,由默認指定的core.json文件。

  • 3、解析讀寫插件配置信息,由job.json指定的reader和writer插件信息

 

3、configuration配置信息

job.json的configuration

 

{
   "job": {
       "setting": {
           "speed": {
               "byte":10485760,
               "record":1000
          },
           "errorLimit": {
               "record": 0,
               "percentage": 0.02
          }
      },
       "content": [
          {
               "reader": {
                   "name": "streamreader",
                   "parameter": {
                       "column" : [
                          {
                               "value": "DataX",
                               "type": "string"
                          },
                          {
                               "value": 19890604,
                               "type": "long"
                          },
                          {
                               "value": "1989-06-04 00:00:00",
                               "type": "date"
                          },
                          {
                               "value": true,
                               "type": "bool"
                          },
                          {
                               "value": "test",
                               "type": "bytes"
                          }
                      ],
                       "sliceRecordCount": 100000
                  }
              },
               "writer": {
                   "name": "streamwriter",
                   "parameter": {
                       "print": false,
                       "encoding": "UTF-8"
                  }
              }
          }
      ]
  }
}

 

core.json的configuration

 

{
   "entry": {
       "jvm": "-Xms1G -Xmx1G",
       "environment": {}
  },
   "common": {
       "column": {
           "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
           "timeFormat": "HH:mm:ss",
           "dateFormat": "yyyy-MM-dd",
           "extraFormats":["yyyyMMdd"],
           "timeZone": "GMT+8",
           "encoding": "utf-8"
      }
  },
   "core": {
       "dataXServer": {
           "address": "http://localhost:7001/api",
           "timeout": 10000,
           "reportDataxLog": false,
           "reportPerfLog": false
      },
       "transport": {
           "channel": {
               "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
               "speed": {
                   "byte": 100,
                   "record": 10
              },
               "flowControlInterval": 20,
               "capacity": 512,
               "byteCapacity": 67108864
          },
           "exchanger": {
               "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
               "bufferSize": 32
          }
      },
       "container": {
           "job": {
               "reportInterval": 10000
          },
           "taskGroup": {
               "channel": 5
          },
           "trace": {
               "enable": "false"
          }

      },
       "statistics": {
           "collector": {
               "plugin": {
                   "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
                   "maxDirtyNumber": 10
              }
          }
      }
  }
}

 

plugin.json的configuration

 

{
   "name": "streamreader",
   "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
   "description": {
       "useScene": "only for developer test.",
       "mechanism": "use datax framework to transport data from stream.",
       "warn": "Never use it in your real job."
  },
   "developer": "alibaba"
}


{
   "name": "streamwriter",
   "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
   "description": {
       "useScene": "only for developer test.",
       "mechanism": "use datax framework to transport data to stream.",
       "warn": "Never use it in your real job."
  },
   "developer": "alibaba"
}

 

合並后的configuration

 

{
   "common": {
       "column": {
           "dateFormat": "yyyy-MM-dd",
           "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
           "encoding": "utf-8",
           "extraFormats": ["yyyyMMdd"],
           "timeFormat": "HH:mm:ss",
           "timeZone": "GMT+8"
      }
  },
   "core": {
       "container": {
           "job": {
               "id": -1,
               "reportInterval": 10000
          },
           "taskGroup": {
               "channel": 5
          },
           "trace": {
               "enable": "false"
          }
      },
       "dataXServer": {
           "address": "http://localhost:7001/api",
           "reportDataxLog": false,
           "reportPerfLog": false,
           "timeout": 10000
      },
       "statistics": {
           "collector": {
               "plugin": {
                   "maxDirtyNumber": 10,
                   "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
              }
          }
      },
       "transport": {
           "channel": {
               "byteCapacity": 67108864,
               "capacity": 512,
               "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
               "flowControlInterval": 20,
               "speed": {
                   "byte": -1,
                   "record": -1
              }
          },
           "exchanger": {
               "bufferSize": 32,
               "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
          }
      }
  },
   "entry": {
       "jvm": "-Xms1G -Xmx1G"
  },
   "job": {
       "content": [{
           "reader": {
               "name": "streamreader",
               "parameter": {
                   "column": [{
                       "type": "string",
                       "value": "DataX"
                  }, {
                       "type": "long",
                       "value": 19890604
                  }, {
                       "type": "date",
                       "value": "1989-06-04 00:00:00"
                  }, {
                       "type": "bool",
                       "value": true
                  }, {
                       "type": "bytes",
                       "value": "test"
                  }],
                   "sliceRecordCount": 100000
              }
          },
           "writer": {
               "name": "streamwriter",
               "parameter": {
                   "encoding": "UTF-8",
                   "print": false
              }
          }
      }],
       "setting": {
           "errorLimit": {
               "percentage": 0.02,
               "record": 0
          },
           "speed": {
               "byte": 10485760
          }
      }
  },
   "plugin": {
       "reader": {
           "streamreader": {
               "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
               "description": {
                   "mechanism": "use datax framework to transport data from stream.",
                   "useScene": "only for developer test.",
                   "warn": "Never use it in your real job."
              },
               "developer": "alibaba",
               "name": "streamreader",
               "path": "//Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader/streamreader"
          }
      },
       "writer": {
           "streamwriter": {
               "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
               "description": {
                   "mechanism": "use datax framework to transport data to stream.",
                   "useScene": "only for developer test.",
                   "warn": "Never use it in your real job."
              },
               "developer": "alibaba",
               "name": "streamwriter",
               "path": "//Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer/streamwriter"
          }
      }
  }
}

 

4、Engine的start過程

 

public class Engine {
   private static final Logger LOG = LoggerFactory.getLogger(Engine.class);

   private static String RUNTIME_MODE;

   /* check job model (job/task) first */
   public void start(Configuration allConf) {

       // 省略相關代碼

       boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
              .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
       AbstractContainer container;
       if (isJob) {
           allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
           // 核心點在於JobContainer的對象
           container = new JobContainer(allConf);
           instanceId = allConf.getLong(
                   CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

      }

       Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);

       // 核心容器的啟動
       container.start();

  }

說明: start過程中做了兩件事:

  • 1、創建JobContainer對象

  • 2、啟動JobContainer對象

 

5、JobContainer的啟動過程

 

public class JobContainer extends AbstractContainer {
   /**
    * jobContainer主要負責的工作全部在start()里面,包括init、prepare、split、scheduler、
    * post以及destroy和statistics
    */
   @Override
   public void start() {
       try {
           isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
           if(isDryRun) {
               // 省略相關代碼
          } else {

               //拷貝一份新的配置,保證線程安全
               userConf = configuration.clone();

               // 執行preHandle()操作
               LOG.debug("jobContainer starts to do preHandle ...");
               this.preHandle();

               // 執行reader、transform、writer等初始化
               LOG.debug("jobContainer starts to do init ...");
               this.init();

               // 執行plugin的prepare
               LOG.info("jobContainer starts to do prepare ...");
               this.prepare();

               // 執行任務切分
               LOG.info("jobContainer starts to do split ...");
               this.totalStage = this.split();

               // 執行任務調度
               LOG.info("jobContainer starts to do schedule ...");
               this.schedule();

               // 執行后置操作
               LOG.debug("jobContainer starts to do post ...");
               this.post();

               // 執行postHandle操作
               LOG.debug("jobContainer starts to do postHandle ...");
               this.postHandle();

               LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

               this.invokeHooks();
          }
      } catch (Throwable e) {
           // 省略相關代碼
      } finally {
            // 省略相關代碼
      }
  }
}

說明: JobContainer的start方法會執行一系列job相關的操作,如下:

  • 1、執行job的preHandle()操作,暫時不關注。

  • 2、執行job的init()操作,需重點關注。

  • 3、執行job的prepare()操作,暫時不關注。

  • 4、執行job的split()操作,需重點關注。

  • 5、執行job的schedule()操作,需重點關注。

  • 6、執行job的post()和postHandle()操作,暫時不關注。

 

6、Job的init過程

 

public class JobContainer extends AbstractContainer {
   private void init() {
       this.jobId = this.configuration.getLong(
               CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);

       if (this.jobId < 0) {
           LOG.info("Set jobId = 0");
           this.jobId = 0;
           this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
                   this.jobId);
      }

       Thread.currentThread().setName("job-" + this.jobId);

       // 初始化
       JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
               this.getContainerCommunicator());

       //必須先Reader ,后Writer
       this.jobReader = this.initJobReader(jobPluginCollector);
       this.jobWriter = this.initJobWriter(jobPluginCollector);
  }


   private Reader.Job initJobReader(
           JobPluginCollector jobPluginCollector) {

       // 獲取插件名字
       this.readerPluginName = this.configuration.getString(
               CoreConstant.DATAX_JOB_CONTENT_READER_NAME);

       classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
               PluginType.READER, this.readerPluginName));

       Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
               PluginType.READER, this.readerPluginName);

       // 設置reader的jobConfig
       jobReader.setPluginJobConf(this.configuration.getConfiguration(
               CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

       // 設置reader的readerConfig
       jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
               CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

       jobReader.setJobPluginCollector(jobPluginCollector);

       // 這里已經到每個插件具體的初始化操作
       jobReader.init();

       classLoaderSwapper.restoreCurrentThreadClassLoader();
       return jobReader;
  }


   private Writer.Job initJobWriter(
           JobPluginCollector jobPluginCollector) {
       this.writerPluginName = this.configuration.getString(
               CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
       classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
               PluginType.WRITER, this.writerPluginName));

       Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
               PluginType.WRITER, this.writerPluginName);

       // 設置writer的jobConfig
       jobWriter.setPluginJobConf(this.configuration.getConfiguration(
               CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

       // 設置reader的readerConfig
       jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(
               CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

       jobWriter.setPeerPluginName(this.readerPluginName);
       jobWriter.setJobPluginCollector(jobPluginCollector);
       jobWriter.init();
       classLoaderSwapper.restoreCurrentThreadClassLoader();

       return jobWriter;
  }
}

說明: Job的init()過程主要做了兩個事情,分別是:

  • 1、創建reader的job對象,通過URLClassLoader實現類加載。

  • 2、創建writer的job對象,通過URLClassLoader實現類加載。

 

7、job的split過程

 

public class JobContainer extends AbstractContainer {
   private int split() {
       this.adjustChannelNumber();

       if (this.needChannelNumber <= 0) {
           this.needChannelNumber = 1;
      }

       List<Configuration> readerTaskConfigs = this
              .doReaderSplit(this.needChannelNumber);
       int taskNumber = readerTaskConfigs.size();
       List<Configuration> writerTaskConfigs = this
              .doWriterSplit(taskNumber);

       List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

       LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
       /**
        * 輸入是reader和writer的parameter list,輸出是content下面元素的list
        */
       List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
               readerTaskConfigs, writerTaskConfigs, transformerList);


       LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));

       this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);

       return contentConfig.size();
  }


   private void adjustChannelNumber() {
       int needChannelNumberByByte = Integer.MAX_VALUE;
       int needChannelNumberByRecord = Integer.MAX_VALUE;

       boolean isByteLimit = (this.configuration.getInt(
               CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
       if (isByteLimit) {
           long globalLimitedByteSpeed = this.configuration.getInt(
                   CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
           Long channelLimitedByteSpeed = this.configuration
                  .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
           needChannelNumberByByte =
                  (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
           needChannelNumberByByte =
                   needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
           LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
      }

       boolean isRecordLimit = (this.configuration.getInt(
               CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
       if (isRecordLimit) {
           long globalLimitedRecordSpeed = this.configuration.getInt(
                   CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);

           Long channelLimitedRecordSpeed = this.configuration.getLong(
                   CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);

           needChannelNumberByRecord =
                  (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
           needChannelNumberByRecord =
                   needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
      }

       // 取較小值
       this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
               needChannelNumberByByte : needChannelNumberByRecord;

       boolean isChannelLimit = (this.configuration.getInt(
               CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
       if (isChannelLimit) {
           this.needChannelNumber = this.configuration.getInt(
                   CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

           LOG.info("Job set Channel-Number to " + this.needChannelNumber
                   + " channels.");

           return;
      }

       throw DataXException.asDataXException(
               FrameworkErrorCode.CONFIG_ERROR,
               "Job運行速度必須設置");
  }
}

說明: DataX的job的split過程主要是根據限流配置計算channel的個數,進而計算task的個數,主要過程如下:

  • 1、adjustChannelNumber的過程根據按照字節限流和record限流計算channel的個數。

  • 2、reader的個數根據channel的個數進行計算。

  • 3、writer的個數根據reader的個數進行計算,writer和reader實現1:1綁定。

  • 4、通過mergeReaderAndWriterTaskConfigs()方法生成reader+writer的task的configuration,至此我們生成了task的配置信息。

 

8、Job的schedule過程

 

public class JobContainer extends AbstractContainer {

   private void schedule() {
       /**
        * 通過獲取配置信息得到每個taskGroup需要運行哪些tasks任務
        */
       List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
               this.needChannelNumber, channelsPerTaskGroup);

       ExecuteMode executeMode = null;
       AbstractScheduler scheduler;
       try {
           executeMode = ExecuteMode.STANDALONE;
           scheduler = initStandaloneScheduler(this.configuration);

           //設置 executeMode
           for (Configuration taskGroupConfig : taskGroupConfigs) {
               taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
          }

           // 開始調度所有的taskGroup
           scheduler.schedule(taskGroupConfigs);

      } catch (Exception e) {
           // 省略相關代碼
      }
  }

}

說明: Job的schedule的過程主要做了兩件事,分別是:

  • 1、將task拆分成taskGroup,生成List<Configuration> taskGroupConfigs。

  • 2、啟動taskgroup的對象, scheduler.schedule(taskGroupConfigs)。

 

9、TaskGroup的schedule過程

 

public abstract class AbstractScheduler {
 
   public void schedule(List<Configuration> configurations) {
       
       int totalTasks = calculateTaskCount(configurations);
       
       // 啟動所有的TaskGroup
       startAllTaskGroup(configurations);
       try {
           while (true) {
               // 省略相關代碼
          }
      } catch (InterruptedException e) {
      }
  }
}



public abstract class ProcessInnerScheduler extends AbstractScheduler {

   private ExecutorService taskGroupContainerExecutorService;

   @Override
   public void startAllTaskGroup(List<Configuration> configurations) {

       //todo 根據taskGroup的數量啟動固定的線程數
       this.taskGroupContainerExecutorService = Executors
              .newFixedThreadPool(configurations.size());

       //todo 每個TaskGroup啟動一個TaskGroupContainerRunner
       for (Configuration taskGroupConfiguration : configurations) {
           //todo 創建TaskGroupContainerRunner並提交線程池運行
           TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
           this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
      }

       // 等待所有任務執行完后會關閉,執行該方法后不會再接收新任務
       this.taskGroupContainerExecutorService.shutdown();
  }
}



public class TaskGroupContainerRunner implements Runnable {

   private TaskGroupContainer taskGroupContainer;

   private State state;

   public TaskGroupContainerRunner(TaskGroupContainer taskGroup) {
       this.taskGroupContainer = taskGroup;
       this.state = State.SUCCEEDED;
  }

   @Override
   public void run() {
       try {
           Thread.currentThread().setName(
                   String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
           this.taskGroupContainer.start();
           this.state = State.SUCCEEDED;
      } catch (Throwable e) {
      }
  }
}

說明: TaskGroup的Schedule方法做的事情如下:

  • 1、為所有的TaskGroup創建TaskGroupContainerRunner。

  • 2、通過線程池提交TaskGroupContainerRunner任務,執行TaskGroupContainerRunner的run()方法。

  • 3、在run()方法內部執行this.taskGroupContainer.start()方法。

 

10、TaskGroupContainer的啟動

 

public class TaskGroupContainer extends AbstractContainer {

   @Override
   public void start() {
       try {
            // 省略相關代碼

           int taskCountInThisTaskGroup = taskConfigs.size();
           Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId與task配置
           List<Configuration> taskQueue = buildRemainTasks(taskConfigs); //待運行task列表
           Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId與上次失敗實例
           List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); //正在運行task
           Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); //任務開始時間

           while (true) {
               // 省略相關代碼
               
               // 新增任務會在這里被啟動
               Iterator<Configuration> iterator = taskQueue.iterator();
               while(iterator.hasNext() && runTasks.size() < channelNumber){
                   Configuration taskConfig = iterator.next();
                   Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
                   int attemptCount = 1;
                   TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);

                   // todo 需要新建任務的配置信息
                   Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;

                   // todo taskExecutor應該就需要新建的任務
                   TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                   taskStartTimeMap.put(taskId, System.currentTimeMillis());
                   taskExecutor.doStart();

                   iterator.remove();
                   runTasks.add(taskExecutor);
                 
              }
      } catch (Throwable e) {
      }finally {
      }
  }
}

說明: TaskGroupContainer的內部主要做的事情如下:

  • 1、根據TaskGroupContainer分配的Task任務列表,創建TaskExecutor對象。

  • 2、創建TaskExecutor對象,用以啟動分配該TaskGroup的task。

  • 3、至此,已經成功的啟動了Job當中的Task任務。

11、Task的啟動

 

    class TaskExecutor {
       
       private Channel channel;
       private Thread readerThread;
       private Thread writerThread;
       private ReaderRunner readerRunner;
       private WriterRunner writerRunner;

       /**
        * 該處的taskCommunication在多處用到:
        * 1. channel
        * 2. readerRunner和writerRunner
        * 3. reader和writer的taskPluginCollector
        */
       public TaskExecutor(Configuration taskConf, int attemptCount) {
           // 獲取該taskExecutor的配置
           this.taskConfig = taskConf;

           // 得到taskId
           this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
           this.attemptCount = attemptCount;

           /**
            * 由taskId得到該taskExecutor的Communication
            * 要傳給readerRunner和writerRunner,同時要傳給channel作統計用
            */
           this.channel = ClassUtil.instantiate(channelClazz,
                   Channel.class, configuration);
           // channel在這里生成,每個taskGroup生成一個channel,在generateRunner方法當中生成writer或reader並注入channel
           this.channel.setCommunication(this.taskCommunication);

           /**
            * 獲取transformer的參數
            */

           List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);

           /**
            * 生成writerThread
            */
           writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
           this.writerThread = new Thread(writerRunner,
                   String.format("%d-%d-%d-writer",
                           jobId, taskGroupId, this.taskId));
           //通過設置thread的contextClassLoader,即可實現同步和主程序不通的加載器
           this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
                   PluginType.WRITER, this.taskConfig.getString(
                           CoreConstant.JOB_WRITER_NAME)));

           /**
            * 生成readerThread
            */
           readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
           this.readerThread = new Thread(readerRunner,
                   String.format("%d-%d-%d-reader",
                           jobId, taskGroupId, this.taskId));
           /**
            * 通過設置thread的contextClassLoader,即可實現同步和主程序不通的加載器
            */
           this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
                   PluginType.READER, this.taskConfig.getString(
                           CoreConstant.JOB_READER_NAME)));
      }

       public void doStart() {
           this.writerThread.start();
           this.readerThread.start();
      }
}

說明: TaskExecutor的啟動過程主要做了以下事情:

  • 1、創建了reader和writer的線程任務,reader和writer公用一個channel。

  • 2、先啟動writer線程后,再啟動reader線程。

  • 3、至此,同步數據的Task任務已經啟動了。

 

七、DataX的數據傳輸

 跟一般的生產者-消費者模式一樣,Reader插件和Writer插件之間也是通過channel來實現數據的傳輸的。channel可以是內存的,也可能是持久化的,插件不必關心。插件通過RecordSender往channel寫入數據,通過RecordReceiver從channel讀取數據。

 channel中的一條數據為一個Record的對象,Record中可以放多個Column對象,這可以簡單理解為數據庫中的記錄和列。

 

public class DefaultRecord implements Record {

   private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;

   private List<Column> columns;

   private int byteSize;

   // 首先是Record本身需要的內存
   private int memorySize = ClassSize.DefaultRecordHead;

   public DefaultRecord() {
       this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER);
  }

   @Override
   public void addColumn(Column column) {
       columns.add(column);
       incrByteSize(column);
  }

   @Override
   public Column getColumn(int i) {
       if (i < 0 || i >= columns.size()) {
           return null;
      }
       return columns.get(i);
  }

   @Override
   public void setColumn(int i, final Column column) {
       if (i < 0) {
           throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR,
                   "不能給index小於0的column設置值");
      }

       if (i >= columns.size()) {
           expandCapacity(i + 1);
      }

       decrByteSize(getColumn(i));
       this.columns.set(i, column);
       incrByteSize(getColumn(i));
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM