DataX的安裝及使用
目錄
DataX的介紹
DataX是阿里開源軟件異構數據源離線同步工具,
致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能
易用性:以執行腳本方式運行,對使用人員技術要求較高。
性能:數據抽取性能高。
部署:可獨立部署
適用場景:在異構數據庫/文件系統之間高速交換數據。
DataX的安裝
DataX不需要依賴其他服務,直接上傳、解壓、安裝、配置環境變量即可
也可以直接在windows上解壓
#解壓
tar -xvf datax.tar.gz
#配置環境變量
cd /usr/local/soft/datax
vim /etc/profile
#添加
export DATAX_HOME=/usr/local/soft/datax
export PATH=$DATAX_HOME/bin
#刷新
source /etc/profile
DataX的使用
1、stream2stream
編寫配置文件stream2stream.json
相當於新建一個文件
vim stream2stream.json
# stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
執行同步任務
datax.py stream2stream.json
執行結果
2、mysql2mysql----將mysql中的數據導入mysql中
需要新建一個student數據庫,並創建一個student表
新建的student表要和讀取的表結構一樣
編寫配置文件mysql2mysql.json
相當於新建一個文件
vim mysql2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/lyw11"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
],
"preSql": [
"truncate student"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf8",
"table": [
"student"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
執行同步任務
datax.py mysql2mysql.json
執行結果
3、mysql2hdfs----將mysql的數據導入hdfs中
讀寫 hive 跟讀寫 hdfs 是一樣的
需要啟動Hadoop
start-all.sh
編寫配置文件mysql2hdfs.json
相當於新建一個文件
mysql2hdfs.json
vim mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/user/data/student",
"fileName": "student",
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
需要先在hdfs中創建對應的目錄
hadoop dfs -mkdir -p /user/data/student
執行
datax.py mysql2hdfs.json
4、hbase2mysql----將hbase的數據導入mysql中
需要先啟動ZK:
zkServer.sh start
再啟動Hbase:
start-hbase.sh
進入hbase的shell命令欄
hbase shell
編寫配置文件hbase2mysql.json
相當於新建一個文件
hbase2mysql.json
vim hbase2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181.node2:2181"
},
"table": "student",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info:name",
"type": "string"
},
{
"name": "info:age",
"type": "string"
},
{
"name": "info:gender",
"type": "string"
},
{
"name": "info:clazz",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": false
}
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"preSql": [
"truncate student11"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/lyw11?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
執行
datax.py hbase2mysql.json
5、mysql2hbase----將mysql中的數據導入hbase中
mysql中的score表需將cource改為course_id,並將student_id、course_id設為主鍵,並將所有字段的類型改為int
hbase需先創建score表
create 'score','cf1'
編寫配置文件mysql2hbase.json
相當於新建一個文件
mysql2hbase.json
vim mysql2hbase.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"student_id",
"course_id",
"score"
],
"splitPk": "student_id",
"connection": [
{
"table": [
"score"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,note01:2181,note2:2181"
},
"table": "score",
"mode": "normal",
"rowkeyColumn": [
{
"index":0,
"type":"string"
},
{
"index":-1,
"type":"string",
"value":"_"
},
{
"index":1,
"type":"string"
}
],
"column": [
{
"index":2,
"name": "cf1:score",
"type": "int"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
執行
datax.py mysql2hbase.json
6、HDFSToHBase----將HDFS中的數據導入HBase
將students.txt數據上傳至HDFS的
/data/student1/
目錄在HBase中創建datax表:
create 'datax','cf1'
編寫配置文件mysql2hbase.json
相當於新建一個文件
HDFSToHBase.json
vim HDFSToHBase.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "user/data/student/",
"defaultFS": "hdfs://master:9000",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "datax_student",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
},
{
"index": 1,
"type": "string"
}
],
"column": [
{
"index": 1,
"name": "cf1:name",
"type": "string"
},
{
"index": 2,
"name": "cf1:age",
"type": "string"
},
{
"index": 3,
"name": "cf1:gender",
"type": "string"
},
{
"index": 4,
"name": "cf1:clazz",
"type": "string"
},
{
"index": 5,
"name": "cf1:ts",
"type": "string"
}
],
"versionColumn": {
"index": 5
},
"encoding": "utf-8"
}
}
}
]
}
}