DataX的安装及使用
DataX 简介
和 Sqoop 的功能类似,都是做离线采集的
Sqoop 是基于 MapReduce 的,分布式的
DataX 是基于 java 的,单机多线程的
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
开源地址
https://github.com/alibaba/DataX
DataX的安装
DataX不需要依赖其他服务,直接上传、解压、安装、配置环境变量即可
也可以直接在windows上解压
随便在哪个节点都行,只要有 Python 的环境即可,而 Linux 自带 Python 环境
DataX的使用
stream2stream
本地测试是否安装成功用的
编写配置文件stream2stream.json
# stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
执行同步任务
datax.py stream2stream.json
执行结果
mysql2mysql
需要新建student2数据库,并创建student2表
编写配置文件mysql2mysql.json
{
"job": {
"content": [
{
"reader": { # 读数据
"name": "mysqlreader", # 这个名字是固定的,不是自定义的
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age", # 多并行的时候可以指定一个分割的 key ,一般使用主键
"connection": [ # 连接
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": { # 写数据
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"preSql": [ # 执行写操作之前
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6 # 并行度的意思,就像是启动几个管道的意思,多线程
}
}
}
}
执行同步任务
datax.py mysql2mysql.json
mysql2hdfs
读写 hive 跟读写 hdfs 是一样的
编写配置文件mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"splitPk": "id",
"where":"age=23", # 指定筛选条件,使用中文要注意编码格式的问题
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/user/data/student",
"fileName": "student",
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
需要先在hdfs中创建对应的目录
hadoop dfs -mkdir -p /user/data/student
hbase2mysql
{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader", # 11x表示版本
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "student",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info:name",
"type": "string"
},
{
"name": "info:age",
"type": "int"
},
{
"name": "info:gender",
"type": "string"
},
{
"name": "info:clazz",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": false
}
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"preSql": [
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
mysql2hbase
mysql中的score表需将cource_id改为course_id,并将student_id、course_id设为主键
hbase需先创建score表:
create 'score','cf1'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"student_id",
"course_id",
"score"
],
"splitPk": "student_id",
"connection": [
{
"table": [
"score"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "score",
"mode": "normal",
"rowkeyColumn": [ # 将 student_id 和 course_id 作为 rowkey
{
"index":0, # student_id
"type":"string"
},
{
"index":-1, # 分割方式
"type":"string",
"value":"_"
},
{
"index":1, # course_id
"type":"string"
}
],
"column": [
{
"index":2,
"name": "cf1:score",
"type": "int"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
HDFSToHBase
将students.txt数据上传至HDFS的
/data/student1/
目录在HBase中创建datax表:
create 'datax','cf1'
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": { # 指定错误限制,超过这个限制任务就报错结束了,出错的条数不得超过总条数的0.02
"record": 0, # 出错的行
"percentage": 0.02 # 错误数据的百分比
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/student1/",
"defaultFS": "hdfs://master:9000",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "datax",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
},
{
"index": 1,
"type": "string"
}
],
"column": [
{
"index": 2,
"name": "cf1:age",
"type": "string"
},
{
"index": 3,
"name": "cf1:gender",
"type": "string"
},
{
"index": 4,
"name": "cf1:clazz",
"type": "string"
},
{
"index": 5,
"name": "cf1:ts",
"type": "string"
}
],
"versionColumn": { # 指定数据版本字段
"index": 5
},
"encoding": "utf-8" # 指定编码
}
}
}
]
}
}