DataX的安装及使用
目录
DataX的介绍
DataX是阿里开源软件异构数据源离线同步工具,
致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能
易用性:以执行脚本方式运行,对使用人员技术要求较高。
性能:数据抽取性能高。
部署:可独立部署
适用场景:在异构数据库/文件系统之间高速交换数据。
DataX的安装
DataX不需要依赖其他服务,直接上传、解压、安装、配置环境变量即可
也可以直接在windows上解压
#解压
tar -xvf datax.tar.gz
#配置环境变量
cd /usr/local/soft/datax
vim /etc/profile
#添加
export DATAX_HOME=/usr/local/soft/datax
export PATH=$DATAX_HOME/bin
#刷新
source /etc/profile
DataX的使用
1、stream2stream
编写配置文件stream2stream.json
相当于新建一个文件
vim stream2stream.json
# stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
执行同步任务
datax.py stream2stream.json
执行结果
2、mysql2mysql----将mysql中的数据导入mysql中
需要新建一个student数据库,并创建一个student表
新建的student表要和读取的表结构一样
编写配置文件mysql2mysql.json
相当于新建一个文件
vim mysql2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/lyw11"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
],
"preSql": [
"truncate student"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf8",
"table": [
"student"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
执行同步任务
datax.py mysql2mysql.json
执行结果
3、mysql2hdfs----将mysql的数据导入hdfs中
读写 hive 跟读写 hdfs 是一样的
需要启动Hadoop
start-all.sh
编写配置文件mysql2hdfs.json
相当于新建一个文件
mysql2hdfs.json
vim mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/user/data/student",
"fileName": "student",
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
需要先在hdfs中创建对应的目录
hadoop dfs -mkdir -p /user/data/student
执行
datax.py mysql2hdfs.json
4、hbase2mysql----将hbase的数据导入mysql中
需要先启动ZK:
zkServer.sh start
再启动Hbase:
start-hbase.sh
进入hbase的shell命令栏
hbase shell
编写配置文件hbase2mysql.json
相当于新建一个文件
hbase2mysql.json
vim hbase2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181.node2:2181"
},
"table": "student",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info:name",
"type": "string"
},
{
"name": "info:age",
"type": "string"
},
{
"name": "info:gender",
"type": "string"
},
{
"name": "info:clazz",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": false
}
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"preSql": [
"truncate student11"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/lyw11?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
执行
datax.py hbase2mysql.json
5、mysql2hbase----将mysql中的数据导入hbase中
mysql中的score表需将cource改为course_id,并将student_id、course_id设为主键,并将所有字段的类型改为int
hbase需先创建score表
create 'score','cf1'
编写配置文件mysql2hbase.json
相当于新建一个文件
mysql2hbase.json
vim mysql2hbase.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"student_id",
"course_id",
"score"
],
"splitPk": "student_id",
"connection": [
{
"table": [
"score"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,note01:2181,note2:2181"
},
"table": "score",
"mode": "normal",
"rowkeyColumn": [
{
"index":0,
"type":"string"
},
{
"index":-1,
"type":"string",
"value":"_"
},
{
"index":1,
"type":"string"
}
],
"column": [
{
"index":2,
"name": "cf1:score",
"type": "int"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
执行
datax.py mysql2hbase.json
6、HDFSToHBase----将HDFS中的数据导入HBase
将students.txt数据上传至HDFS的
/data/student1/
目录在HBase中创建datax表:
create 'datax','cf1'
编写配置文件mysql2hbase.json
相当于新建一个文件
HDFSToHBase.json
vim HDFSToHBase.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "user/data/student/",
"defaultFS": "hdfs://master:9000",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "datax_student",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
},
{
"index": 1,
"type": "string"
}
],
"column": [
{
"index": 1,
"name": "cf1:name",
"type": "string"
},
{
"index": 2,
"name": "cf1:age",
"type": "string"
},
{
"index": 3,
"name": "cf1:gender",
"type": "string"
},
{
"index": 4,
"name": "cf1:clazz",
"type": "string"
},
{
"index": 5,
"name": "cf1:ts",
"type": "string"
}
],
"versionColumn": {
"index": 5
},
"encoding": "utf-8"
}
}
}
]
}
}