原文鏈接:https://github.com/alibaba/DataX/blob/master/introduction.md
一、datax3.0介紹
1、DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
2、DataX3.0框架設計
DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
1. Reader:數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
2. Writer:數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
3. Framework:用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。
3、DataX3.0核心架構
1. 核心模塊介紹:
- DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
- 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。
- 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
- DataX作業運行起來之后, Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
2. DataX調度流程:
舉例來說,用戶提交了一個DataX作業,並且配置了20個並發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:
- DataXJob根據分庫分表切分成了100個Task。
- 根據20個並發,DataX計算共需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個並發共計運行25個Task。
二、Datax3.0安裝部署
1、環境准備
Linux
jdk 1.8
python 2.7.5(datax是由python2開發的)
2、datax下載地址
https://github.com/alibaba/DataX?spm=a2c4e.11153940.blogcont59373.11.7a684c4fvubOe1
查看安裝成功:在bin目錄下執行 python datax.py ../job/job.json
3、查看配置文件
在bin目錄下已經給出了樣例配置,但不同的數據源配置文件不一樣。通過命令查看配置模板
# python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
示例:[xxx@xxxbin]$ python datax.py -r mysqlreader -w hdfswriter
4、Reader插件和Writer插件
DataX3.0版本提供的Reader插件和Writer插件,每種讀插件都有一種和多種切分策略
"reader": {
"name": "mysqlreader", #從mysql數據庫獲取數據(也支持sqlserverreader,oraclereader)
"name": "txtfilereader", #從本地獲取數據
"name": "hdfsreader", #從hdfs文件、hive表獲取數據
"name": "streamreader", #從stream流獲取數據(常用於測試)
"name": "httpreader", #從http URL獲取數據
}
"writer": {
"name":"hdfswriter", #向hdfs,hive表寫入數據
"name":"mysqlwriter ", #向mysql寫入數據(也支持sqlserverwriter,oraclewriter)
"name":"streamwriter ", #向stream流寫入數據。(常用於測試)
}
5、json配置文件模板
1. 整個配置文件是一個job的描述;
2. job下面有兩個配置項,content和setting,其中content用來描述該任務的源和目的端的信息,setting用來描述任務本身的信息;
3. content又分為兩部分,reader和writer,分別用來描述源端和目的端的信息;
4. setting中的speed項表示同時起幾個並發去跑該任務。
1. mysql_to_hive示例 { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "querySql": "", #自定義sql,支持多表關聯,當用戶配置querySql時,直接忽略table、column、where條件的配置。 "fetchSize": "", #默認1024,該配置項定義了插件和數據庫服務器端每次批量數據獲取條數,該值決定了DataX和服務器端的網絡交互次數,能夠較大的提升數據抽取性能,注意,該值過大(>2048)可能造成DataX進程OOM "splitPk": "db_id", #僅支持整形型數據切分;如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,如果該值為空,代表不切分,使用單通道進行抽取 "column": [], #"*"默認所有列,支持列裁剪,列換序 "connection": [ { "jdbcUrl": ["jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8"], "table": [] #支持多張表同時抽取 } ], "password": "", "username": "", "where": "" #指定的column、table、where條件拼接SQL,可以指定limit 10,也可以增量數據同步,如果該值為空,代表同步全表所有的信息 } }, "writer": { "name": "hdfswriter", "parameter": { "column": [], #必須指定字段名,字段類型,{"name":"","tpye":""} "compress": "", #hdfs文件壓縮類型,默認不填寫意味着沒有壓縮。其中:text類型文件支持壓縮類型有gzip、bzip2;orc類型文件支持的壓縮類型有NONE、SNAPPY(需要用戶安裝SnappyCodec)。 "defaultFS": "", #Hadoop hdfs文件系統namenode節點地址。 "fieldDelimiter": "", #需要用戶保證與創建的Hive表的字段分隔符一致 "fileName": "", #HdfsWriter寫入時的文件名,需要指定表中所有字段名和字段類型,其中:name指定字段名,type指定字段類型。 "fileType": "", #目前只支持用戶配置為”text”或”orc” "path": "", #存儲到Hadoop hdfs文件系統的路徑信息,hive表在hdfs上的存儲路徑 "hadoopConfig": {} #hadoopConfig里可以配置與Hadoop相關的一些高級參數,比如HA的配置。 "writeMode": "" #append,寫入前不做任何處理,文件名不沖突;nonConflict,如果目錄下有fileName前綴的文件,直接報錯。 } } } ], "setting": { "speed": { #流量控制 "byte": 1048576, #控制傳輸速度,單位為byte/s,DataX運行會盡可能達到該速度但是不超過它 "channel": "" #控制同步時的並發數 } "errorLimit": { #臟數據控制 "record": 0 #對臟數據最大記錄數閾值(record值)或者臟數據占比閾值(percentage值,當數量或百分比,DataX Job報錯退出 } } } } 2. hive_to_mysql示例 { "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": [], #"*"默認所有列,指定Column信息時,type必須填寫,index/value必須選擇其一。 "defaultFS": "", #hdfs文件系統namenode節點地址 "encoding": "UTF-8", #默認UTF-8 "nullFormat": "", #文本文件中無法使用標准字符串定義null(空指針),例如:nullFormat:”\N”,那么如果源頭數據是”\N” "compress": "", #orc文件類型下無需填寫 "hadoopConfig": {}, #hadoopConfig里可以配置與Hadoop相關的一些高級參數,比如HA的配置。 "fieldDelimiter": ",", #默認",";讀取textfile數據時,需要指定字段分割符,HdfsReader在讀取orcfile時,用戶無需指定字段分割符 "fileType": "orc", #文件的類型,目前只支持用戶配置為”text”、”orc”、”rc”、”seq”、”csv”。 "path": "" #文件路徑,支持多文件讀取,可以使用"*",也可以指定通配符遍歷多文件,單文件只能單線程,多文件可以多線程,線程並發數通過通道數指定 } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [], #必須指定,不能留空;如果要依次寫入全部列,使用表示, 例如: "column": [""],強烈不建議 "batchSize": "", #默認值1024 一次性批量提交的記錄數大小,該值可以極大減少DataX與Mysql的網絡交互次數,並提升整體吞吐量。但是該值設置過大可能會造成DataX運行進程OOM情況。 "connection": [ { "jdbcUrl": "jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8", "table": [] #支持寫入一個或者多個表。當配置為多張表時,必須確保所有表結構保持一致。 } ], "password": "", "preSql": [], #寫入數據到目的表前,會先執行這里的標准語句。例在導入表前先進行刪除操作:["delete from 表名"] "postSql":[], #寫入數據到目的表后,會執行這里的標准語句。(原理同 preSql ) "session": [], #DataX在獲取Mysql連接時,執行session指定的SQL語句,修改當前connection session屬性 "username": "", "writeMode": "" #默認insert ,可選insert/replace/update } } } ], "setting": { "speed": { "channel": "" } "errorLimit": { #臟數據控制 "record": 0 #對臟數據最大記錄數閾值(record值)或者臟數據占比閾值(percentage值,當數量或百分比,DataX Job報錯退出 } } } }
三、Datax3.0使用
# trail_pigeon導入hive
#hive里面建表
CREATE TABLE ods_db_bidata.trail_pigeon (
order_id int ,
order_apply_time string
)
stored as orc tblproperties ("orc.compress"="ZLIB");
#建shell腳本,執行python腳本前先清空目標表
#!/bin/bash
hive_db=ods_db_bidata
hive_table=trail_pigeon
hive -e "truncate table ${hive_db}.${hive_table}"
python /opt/app/datax/bin/datax.py /opt/app/datax/job/mysql2hive/trail_pigeon.json
#寫json配置文件
{
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"splitPk": "order_id",
"connection": [{
"jdbcUrl": ["jdbc:mysql://ip:3306/bidata?useUnicode=true&characterEncoding=utf8"],
"table": ["trail_pigeon"]
}],
"password": "password",
"username": "username",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name": "order_id","type": "int"},
{"name": "order_apply_time","type": "string"}
],
"compress": "SNAPPY",
"defaultFS": "hdfs://192.168.0.127:8020",
"fieldDelimiter": "\u0001",
"fileName": "trail_pigeon",
"fileType": "orc",
"path": "/hive/warehouse/ods_db_bidata.db/trail_pigeon",
"writeMode": "nonConflict"
}
}
}],
"setting": {
"speed": {
"channel": "5"
}
}
}
}