DataX入門使用
一、簡介
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。Datax將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。

為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。

Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。
DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖,詳情請點擊:DataX數據源參考指南
| 類型 | 數據源 | Reader(讀) | Writer(寫) | 文檔 |
|---|---|---|---|---|
| RDBMS 關系型數據庫 | MySQL | √ | √ | 讀 、寫 |
| Oracle | √ | √ | 讀 、寫 | |
| SQLServer | √ | √ | 讀 、寫 | |
| PostgreSQL | √ | √ | 讀 、寫 | |
| DRDS | √ | √ | 讀 、寫 | |
| 通用RDBMS(支持所有關系型數據庫) | √ | √ | 讀 、寫 | |
| 阿里雲數倉數據存儲 | ODPS | √ | √ | 讀 、寫 |
| ADS | √ | 寫 | ||
| OSS | √ | √ | 讀 、寫 | |
| OCS | √ | √ | 讀 、寫 | |
| NoSQL數據存儲 | OTS | √ | √ | 讀 、寫 |
| Hbase0.94 | √ | √ | 讀 、寫 | |
| Hbase1.1 | √ | √ | 讀 、寫 | |
| Phoenix4.x | √ | √ | 讀 、寫 | |
| Phoenix5.x | √ | √ | 讀 、寫 | |
| MongoDB | √ | √ | 讀 、寫 | |
| Hive | √ | √ | 讀 、寫 | |
| Cassandra | √ | √ | 讀 、寫 | |
| 無結構化數據存儲 | TxtFile | √ | √ | 讀 、寫 |
| FTP | √ | √ | 讀 、寫 | |
| HDFS | √ | √ | 讀 、寫 | |
| Elasticsearch | √ | 寫 | ||
| 時間序列數據庫 | OpenTSDB | √ | 讀 | |
| TSDB | √ | √ | 讀 、寫 |
二、DataX核心架構
DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。

核心模塊介紹:
- DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
- 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
- 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
- DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
DataX調度流程:
舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:
- DataXJob根據分庫分表切分成了100個Task。
- 根據20個並發,DataX計算共需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。
三、DataX的使用
3.1、下載datax
-
方法一、直接下載DataX工具包:DataX下載地址
下載后解壓至本地某個目錄,進入bin目錄,即可運行同步作業:
$ cd {YOUR_DATAX_HOME}/bin $ python datax.py {YOUR_JOB.json} -
方法二、下載DataX源碼,自己編譯:DataX源碼
3.2、datax的目錄結構

-
bin目錄下是pytho腳本文件,主要用來執行job文件(默認需要依賴Python2的環境,也可以修改為Python3)
-
conf目錄存放一些配置文件
-
job目錄下存放了一個job測試文件(我們通過datax-web生成的臨時job文件不會放在這里,而是在data-web里邊自己配置存放目錄)
-
lib是依賴的一些jar包
-
log目錄存放job文件的執行日志
-
plugin目錄存放的是對不同數據源讀取(Reader)和寫入(Writer)的插件支持
如果沒有在plugin目錄下發現自己需要的Reader或者Writer則需要自己手動安裝(比如ES的Reader和Writer)。
四、使用Datax執行job文件
job文件是一個JSON格式的文件,每一個job文件都代表一個同步任務,執行job文件需要使用bin目錄datax.py腳本。執行命令如下命令執行job任務:
python datax.py job文件

如果出現控制台亂碼在控制台輸入:CHCP 65001,再次執行就不會亂碼


一個Json格式的Job文件模板如下(Mysql為例):
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "yRjwDFuoPKlqya9h9H2Amg==",
"password": "6YrK4y3NaUxccEgnoAz8yA==",
"column": [
"`id`",
"`course_id`",
"`chapter_id`",
"`title`",
"`video_source_id`",
"`video_original_name`",
"`sort`",
"`play_count`",
"`is_free`",
"`duration`",
"`status`",
"`size`",
"`version`",
"`gmt_create`",
"`gmt_modified`"
],
"where": "gmt_modified >= ${lastTime} and gmt_modified < ${currentTime}",
"splitPk": "",
"connection": [
{
"table": [
"edu_video"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/education_edu?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "yRjwDFuoPKlqya9h9H2Amg==",
"password": "6YrK4y3NaUxccEgnoAz8yA==",
"writeMode": "update",
"column": [
"`id`",
"`course_id`",
"`chapter_id`",
"`title`",
"`video_source_id`",
"`video_original_name`",
"`sort`",
"`play_count`",
"`is_free`",
"`duration`",
"`status`",
"`size`",
"`version`",
"`gmt_create`",
"`gmt_modified`"
],
"connection": [
{
"table": [
"edu_video"
],
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true"
}
]
}
}
}
]
}
}
主要包括三部分
settingreaderwriter
如要查看不同數據源的reader和writer的寫法可以參照官網或者執行以下命令
# 查看mysql的reader和writer的寫法
python datax.py -r mysqlreader -w mysqlwriter

五、DataX-Web
DataX Web是在DataX之上開發的分布式數據同步工具,提供簡單易用的操作界面,降低用戶使用DataX的學習成本,縮短任務配置時間,避免配置過程中出錯。用戶可通過頁面選擇數據源即可創建數據同步任務,RDBMS數據源可批量創建數據同步任務,支持實時查看數據同步進度及日志並提供終止同步功能,集成並二次開發xxl-job可根據時間、自增主鍵增量同步數據。
任務"執行器"支持集群部署,支持執行器多節點路由策略選擇,支持超時控制、失敗重試、失敗告警、任務依賴,執行器CPU.內存.負載的監控等等。后續還將提供更多的數據源支持、數據轉換UDF、表結構同步、數據同步血緣等更為復雜的業務場景。
System Requirements
- Language: Java 8(jdk版本建議1.8.201以上)
Python2.7(支持Python3需要修改替換datax/bin下面的三個python文件,替換文件在doc/datax-web/datax-python3下) - Environment: MacOS, Windows,Linux
- Database: Mysql5.7
六、Datax-web結構

- bin目錄下是Linux上的一些執行安裝腳本和datax-web需要的sql腳本
- doc目錄下是官方文檔
- datax-admin項目和datax-executor項目使我們需要啟動的項目。
七、啟動Datax-web項目
7.1、執行sql腳本
在mysql數據庫創建項目運行所必需要的數據庫文件,sql腳本在bin/db目錄下
7.2、修改datax_admin下resources/application.yml文件
# 端口號
server:
port: 8080
spring:
#數據源,目前僅僅支持Mysql
datasource:
username: root
password: 'root'
url: jdbc:mysql://127.0.0.1:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8
driver-class-name: com.mysql.jdbc.Driver
# 數據庫連接池配置
hikari:
## 最小空閑連接數量
minimum-idle: 5
## 空閑連接存活最大時間,默認600000(10分鍾)
idle-timeout: 180000
## 連接池最大連接數,默認是10
maximum-pool-size: 10
## 數據庫連接超時時間,默認30秒,即30000
connection-timeout: 30000
connection-test-query: SELECT 1
##此屬性控制池中連接的最長生命周期,值0表示無限生命周期,默認1800000即30分鍾
max-lifetime: 1800000
# datax-web email 不需要可以不用配置
mail:
host: smtp.qq.com
port: 25
username: 1769959702@qq.com
password: opfmjdcwbnlhefee
properties:
mail:
smtp:
auth: true
starttls:
enable: true
required: true
socketFactory:
class: javax.net.ssl.SSLSocketFactory
management:
health:
mail:
enabled: false
server:
servlet:
context-path: /actuator
mybatis-plus:
# mapper.xml文件掃描
mapper-locations: classpath*:/mybatis-mapper/*Mapper.xml
# 實體掃描,多個package用逗號或者分號分隔
#typeAliasesPackage: com.yibo.essyncclient.*.entity
global-config:
# 數據庫相關配置
db-config:
# 主鍵類型 AUTO:"數據庫ID自增", INPUT:"用戶輸入ID", ID_WORKER:"全局唯一ID (數字類型唯一ID)", UUID:"全局唯一ID UUID";
id-type: AUTO
# 字段策略 IGNORED:"忽略判斷",NOT_NULL:"非 NULL 判斷"),NOT_EMPTY:"非空判斷"
field-strategy: NOT_NULL
# 駝峰下划線轉換
column-underline: true
# 邏輯刪除
logic-delete-value: 0
logic-not-delete-value: 1
# 數據庫類型
db-type: mysql
banner: false
# mybatis原生配置
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
call-setters-on-nulls: true
jdbc-type-for-null: 'null'
type-handlers-package: com.wugui.datax.admin.core.handler
# 配置mybatis-plus打印sql日志
logging:
level:
com.wugui.datax.admin.mapper: error
path: ./data/applogs/admin
#datax-job, access token
datax:
job:
accessToken:
#i18n (default empty as chinese, "en" as english)
i18n:
## triggerpool max size
triggerpool:
fast:
max: 200
slow:
max: 100
### log retention days
logretentiondays: 30
datasource:
aes:
key: AD42F6697B035B75
主要修改數據源的配置
7.3修改datax_executor下resources/application.yml文件
# web port
server:
#port: ${server.port}
port: 8081
# 日志路徑
logging:
config: classpath:logback.xml
path: ./data/applogs/executor/jobhandler
datax:
job:
admin:
### datax admin address list, http://address01,http://address02",data-admin的地址
addresses: http://127.0.0.1:8080
executor:
appname: datax-executor # 創建執行器時的AppName需要和這里保持一致
ip:
port: 9999 # 執行器端口號
### job log path job文件的執行日志
logpath: ./data/applogs/executor/jobhandler
### job log retention days
logretentiondays: 30
### job, access token
accessToken:
executor:
# datax json臨時文件保存路徑
jsonpath: F:\javaProject\datax-web-v-2.1.2\temp\executor
#jsonpath: ${json.path}
# Datax執行文件datax.py的地址
pypath: F:\javaProject\datax\bin\datax.py
#pypath: ${python.path}
datax.job配置
- admin.addresses datax_admin部署地址,如調度中心集群部署存在多個地址則用逗號分隔,執行器將會使用該地址進行"執行器心跳注冊"和"任務結果回調";
- executor.appname 執行器AppName,每個執行器機器集群的唯一標示,執行器心跳注冊分組依據;
- executor.ip 默認為空表示自動獲取IP,多網卡時可手動設置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用於 "執行器注冊" 和 "調度中心請求並觸發任務";
- executor.port 執行器Server端口號,默認端口為9999,單機部署多個執行器時,注意要配置不同執行器端口;
- executor.logpath 執行器運行日志文件存儲磁盤路徑,需要對該路徑擁有讀寫權限;
- executor.logretentiondays 執行器日志文件保存天數,過期日志自動清理, 限制值大於等於3時生效; 否則, 如-1, 關閉自動清理功能;
- executor.jsonpath datax json臨時文件保存路徑
- pypath DataX啟動腳本地址,例如:xxx/datax/bin/datax.py 如果系統配置DataX環境變量(DATAX_HOME),logpath、jsonpath、pypath可不配,log文件和臨時json存放在環境變量路徑下。
上述准備工作做好后,運行datax_admin下 的DataXAdminApplication和運行datax_executor下 的DataXExecutorApplication

admin啟動成功后日志會輸出三個地址,兩個接口文檔地址,一個前端頁面地址
注意:windows下如果控制台報錯需要hadoop啥的則需要先安裝hadoop的環境變量,下載地址:https://github.com/srccodes/hadoop-common-2.2.0-bin,下載成功后解壓配置環境變量
如果不配置hadoop環境變量,不能生成.json格式的job任務文件

八、在web頁面創建同步任務
1.執行器配置(使用開源項目xxl-job)

- 1、"調度中心OnLine:"右側顯示在線的"調度中心"列表, 任務執行結束后, 將會以failover的模式進行回調調度中心通知執行結果, 避免回調的單點風險;
- 2、"執行器列表" 中顯示在線的執行器列表, 可通過"OnLine 機器"查看對應執行器的集群機器;
執行器屬性說明

1、AppName: (與datax-executor中application.yml的datax.job.executor.appname保持一致)
每個執行器集群的唯一標示AppName, 執行器會周期性以AppName為對象進行自動注冊。可通過該配置自動發現注冊成功的執行器, 供任務調度時使用;
2、名稱: 執行器的名稱, 因為AppName限制字母數字等組成,可讀性不強, 名稱為了提高執行器的可讀性;
3、排序: 執行器的排序, 系統中需要執行器的地方,如任務新增, 將會按照該排序讀取可用的執行器列表;
4、注冊方式:調度中心獲取執行器地址的方式;
自動注冊:執行器自動進行執行器注冊,調度中心通過底層注冊表可以動態發現執行器機器地址;
手動錄入:人工手動錄入執行器的地址信息,多地址逗號分隔,供調度中心使用;
5、機器地址:"注冊方式"為"手動錄入"時有效,支持人工維護執行器的地址信息;
2.創建數據源

數據源就是數據庫的一些連接信息
3.創建項目

4.創建任務模版

我們稍后創建的同步任務都是依據任務模板創建的,模板主要配置任務的執行器,路由策略、定時任務等,同一個模板下的任務屬於同一個項目。
5. 構建JSON腳本
- 1.步驟一,步驟二,選擇第二步中創建的數據源,JSON構建目前支持的數據源有hive,mysql,oracle,postgresql,sqlserver,hbase,mongodb,clickhouse 其它數據源的JSON構建正在開發中,暫時需要手動編寫。

切分字段splitPk
-
描述:MysqlReader進行數據抽取時,如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,DataX因此會啟動並發任務進行數據同步,這樣可以大大提供數據同步的效能。
推薦splitPk用戶使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分片也不容易出現數據熱點。
目前splitPk僅支持整形數據切分,不支持浮點、字符串、日期等其他類型。如果用戶指定其他非支持類型,MysqlReader將報錯!
如果splitPk不填寫,包括不提供splitPk或者splitPk值為空,DataX視作使用單通道同步該表數據。
- 2.字段映射

注意:在選擇字段映射時,源端表字段的選擇順序要和目的端表字段的選擇順序一致
- 3.點擊構建,生成json,此時可以選擇復制json然后創建任務,選擇datax任務,將json粘貼到文本框。也可以點擊選擇模版,直接生成任務。

6.批量創建任務


注意:批量構建任務時,源端表和目的端表的數量要一致,需要同步的表勾選的順序也要一致。
7.任務創建介紹(關聯模版創建任務不再介紹,具體參考5構建JSON腳本)
DataX任務

Shell任務

Python任務

PowerShell任務

- 任務類型:目前支持DataX任務、Shell任務、Python任務、PowerShell任務;
- 阻塞處理策略:調度過於密集執行器來不及處理時的處理策略;
- 單機串行:調度請求進入單機執行器后,調度請求進入FIFO隊列並以串行方式運行;
- 丟棄后續調度:調度請求進入單機執行器后,發現執行器存在運行的調度任務,本次請求將會被丟棄並標記為失敗;
- 覆蓋之前調度:調度請求進入單機執行器后,發現執行器存在運行的調度任務,將會終止運行中的調度任務並清空隊列,然后運行本地調度任務;
- 增量增新建議將阻塞策略設置為丟棄后續調度或者單機串行
- 設置單機串行時應該注意合理設置重試次數(失敗重試的次數*每次執行時間<任務的調度周期),重試的次數如果設置的過多會導致數據重復,例如任務30秒執行一次,每次執行時間需要20秒,設置重試三次,如果任務失敗了,第一個重試的時間段為1577755680-1577756680,重試任務沒結束,新任務又開啟,那新任務的時間段會是1577755680-1577758680
8.增量更新JSON腳本任務
一、根據日期進行增量數據抽取
按下圖中5個步驟進行配置

- 1.任務類型選DataX任務
- 2.輔助參數選擇時間自增
- 3.增量開始時間選擇,即sql中查詢時間的開始時間,用戶使用此選項方便第一次的全量同步。第一次同步完成后,該時間被更新為上一次的任務觸發時間,任務失敗不更新。增量的區間為兩個時間之間的所有數據
- 4.增量時間字段,-DlastTime='%s' -DcurrentTime='%s' 先來解析下這段字符串
1.-D是DataX參數的標識符,必配
2.-D后面的lastTime和currentTime是DataX json中where條件的時間字段標識符,必須和json中的變量名稱保持一致
3.='%s'是項目用來去替換時間的占位符,比配並且格式要完全一致
4.注意-DlastTime='%s'和-DcurrentTime='%s'中間有一個空格,空格必須保留並且是一個空格
- 5.時間格式,可以選擇自己數據庫中時間的格式,也可以通過json中配置sql時間轉換函數來處理
一個時間增量的Json例子如下
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "yRjwDFuoPKlqya9h9H2Amg==",
"password": "6YrK4y3NaUxccEgnoAz8yA==",
"splitPk": "",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/education_edu?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true"
],
"querySql": [
"select * from edu_teacher_copy1 where gmt_modified >= ${lastTime} and gmt_modified < ${currentTime}"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "yRjwDFuoPKlqya9h9H2Amg==",
"password": "6YrK4y3NaUxccEgnoAz8yA==",
"column": [
"`id`",
"`name`",
"`intro`",
"`career`",
"`level`",
"`avatar`",
"`sort`",
"`join_date`",
"`is_deleted`",
"`gmt_create`",
"`gmt_modified`"
],
"connection": [
{
"table": [
"edu_teacher_copy1"
],
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true"
}
]
}
}
}
]
}
}
querySql解析
select * from edu_teacher_copy1 where gmt_modified >= ${lastTime} and gmt_modified < ${currentTime}"
- 1.此處的關鍵點在${lastTime},${currentTime},${}是DataX動態參數的固定格式,lastTime,currentTime就是我們頁面配置中 -DlastTime='%s' -DcurrentTime='%s'中的lastTime,currentTime,注意字段一定要一致。
- 2.如果任務配置頁面,時間類型選擇為時間戳但是數據庫時間格式不是時間戳,例如是:2019-11-26 11:40:57 此時可以用FROM_UNIXTIME(${lastTime})進行轉換。
select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})
此處還有兩點需要注意:
-
配置增量需要我們創建任務后,在任務管理編輯任務設置
輔助參數為時間自增,並且手動的給json文件Reader-->parameter加上querySql,也就是reader增量查詢的sql,當然,簡單一點可以不寫querySql而直接在reader的parameter加一個where條件"where": "gmt_modified >= ${lastTime} and gmt_modified < ${currentTime}",如果使用querySql的方式需要刪除掉Reader的column屬性和table屬性,因為我們的sql中已經指明了我們需要的字段和表
-
對於Writer我們需要設置parameter-->"writeMode": "update",寫入模式修改為update過后,datax會對增量的數據依照主鍵進行判斷如果目標表中已經有這條數據就是更新操作,目標表沒有這條數據就是新增。默認情況下寫入模式是新增和不是update,我們可以手動改Json文件也可以對源碼進行修改。
writeModel控制寫入數據到目標表采用
insert into或者replace into或者ON DUPLICATE KEY UPDATE語句。- insert:將數據源表的數據直接寫的到目的表,主要用於全量的導入。實現原理是直接采用
insert into; - replace和update:如果目標表中包含待寫入的數據則更新該行數據,主要用於增量導入。實現原理:在mysql中用
ON DUPLICATE KEY UPDATE語句,其他數據庫中用replace into.

- insert:將數據源表的數據直接寫的到目的表,主要用於全量的導入。實現原理是直接采用
關於ID自增增量參照官網
9. 任務列表

10. 可以點擊查看日志,實時獲取日志信息,終止正在執行的datax進程



11.任務資源監控

12. admin可以創建用戶,編輯用戶信息

關於更詳細文檔可以參照官網或者doc目錄下的文檔
項目規划


