DataX的安裝及使用
DataX 簡介
和 Sqoop 的功能類似,都是做離線采集的
Sqoop 是基於 MapReduce 的,分布式的
DataX 是基於 java 的,單機多線程的
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
開源地址
https://github.com/alibaba/DataX
DataX的安裝
DataX不需要依賴其他服務,直接上傳、解壓、安裝、配置環境變量即可
也可以直接在windows上解壓
隨便在哪個節點都行,只要有 Python 的環境即可,而 Linux 自帶 Python 環境
DataX的使用
stream2stream
本地測試是否安裝成功用的
編寫配置文件stream2stream.json
# stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
執行同步任務
datax.py stream2stream.json
執行結果

mysql2mysql
需要新建student2數據庫,並創建student2表
編寫配置文件mysql2mysql.json
{
"job": {
"content": [
{
"reader": { # 讀數據
"name": "mysqlreader", # 這個名字是固定的,不是自定義的
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age", # 多並行的時候可以指定一個分割的 key ,一般使用主鍵
"connection": [ # 連接
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": { # 寫數據
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"preSql": [ # 執行寫操作之前
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6 # 並行度的意思,就像是啟動幾個管道的意思,多線程
}
}
}
}
執行同步任務
datax.py mysql2mysql.json
mysql2hdfs
讀寫 hive 跟讀寫 hdfs 是一樣的
編寫配置文件mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"splitPk": "id",
"where":"age=23", # 指定篩選條件,使用中文要注意編碼格式的問題
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/user/data/student",
"fileName": "student",
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
需要先在hdfs中創建對應的目錄
hadoop dfs -mkdir -p /user/data/student
hbase2mysql
{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader", # 11x表示版本
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "student",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info:name",
"type": "string"
},
{
"name": "info:age",
"type": "int"
},
{
"name": "info:gender",
"type": "string"
},
{
"name": "info:clazz",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": false
}
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"preSql": [
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
mysql2hbase
mysql中的score表需將cource_id改為course_id,並將student_id、course_id設為主鍵
hbase需先創建score表:
create 'score','cf1'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"student_id",
"course_id",
"score"
],
"splitPk": "student_id",
"connection": [
{
"table": [
"score"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "score",
"mode": "normal",
"rowkeyColumn": [ # 將 student_id 和 course_id 作為 rowkey
{
"index":0, # student_id
"type":"string"
},
{
"index":-1, # 分割方式
"type":"string",
"value":"_"
},
{
"index":1, # course_id
"type":"string"
}
],
"column": [
{
"index":2,
"name": "cf1:score",
"type": "int"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
HDFSToHBase
將students.txt數據上傳至HDFS的
/data/student1/目錄在HBase中創建datax表:
create 'datax','cf1'
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": { # 指定錯誤限制,超過這個限制任務就報錯結束了,出錯的條數不得超過總條數的0.02
"record": 0, # 出錯的行
"percentage": 0.02 # 錯誤數據的百分比
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/student1/",
"defaultFS": "hdfs://master:9000",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "datax",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
},
{
"index": 1,
"type": "string"
}
],
"column": [
{
"index": 2,
"name": "cf1:age",
"type": "string"
},
{
"index": 3,
"name": "cf1:gender",
"type": "string"
},
{
"index": 4,
"name": "cf1:clazz",
"type": "string"
},
{
"index": 5,
"name": "cf1:ts",
"type": "string"
}
],
"versionColumn": { # 指定數據版本字段
"index": 5
},
"encoding": "utf-8" # 指定編碼
}
}
}
]
}
}
