鏈接:https://pan.baidu.com/s/1yF5B3gepf_TFboiht2O-3g
提取碼:lycc
一、DataX
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
1、DataX的安裝
DataX不需要依賴其他服務,直接上傳、解壓、安裝、配置環境變量即可
也可以直接在windows上解壓
上傳解壓到soft目錄下
配置環境變量
2、DataX使用
1、git倉庫導入GitHub倉庫,沒有外網建議使用這種方式查看模板
有外網直接訪問GitHub倉庫
2、streamTostream
到datax/job目錄下編寫json文件然后執行
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
執行同步任務
datax.py stream2stream.json
3、mysqltomysql
需要新建student2數據庫,並創建student表
json文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"preSql": [
"truncate student2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student2?useUnicode=true&characterEncoding=utf8",
"table": [
"student2"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
新建表
use student2;
CREATE TABLE `student2` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`name` char(5) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`gender` char(2) DEFAULT NULL,
`clazz` char(4) DEFAULT NULL,
`last_mod` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=1500101002 DEFAULT CHARSET=utf8
執行json文件
datax.py mysqltomysql.json
4、mysqltohive
寫hive跟hdfs時一樣的
編寫配置json文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/user/hive/warehouse/datax.db/students",
"fileName": "student",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
},
{
"name": "last_mod",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
hive建庫建表
create table students
(
id bigint,
name string,
age int,
gender string,
clazz string,
last_mod string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
執行
datax.py mysqltohdfs.json
5、mysqltohbase
mysql中的score表需將cource_id改為course_id,並將student_id、course_id設為主鍵,並將所有字段的類型改為int
hbase需先創建score表:create 'score','cf1'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"student_id",
"course_id",
"score"
],
"splitPk": "course_id",
"connection": [
{
"table": [
"score"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181"
},
"table": "score",
"mode": "normal",
"rowkeyColumn": [
{
"index":0,
"type":"string"
},
{
"index":-1,
"type":"string",
"value":"_"
},
{
"index":1,
"type":"string"
}
],
"column": [
{
"index":2,
"name": "cf1:score",
"type": "int"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
datax.py mysqltohbase.json
6、hdfstohbase
將students.txt數據上傳至HDFS的/data/student1/目錄
在HBase中創建datax表:create 'datax','cf1'
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/student1/",
"defaultFS": "hdfs://master:9000",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master,node1,node2"
},
"table": "datax",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
},
{
"index": 1,
"type": "string"
}
],
"column": [
{
"index": 2,
"name": "cf1:age",
"type": "string"
},
{
"index": 3,
"name": "cf1:gender",
"type": "string"
},
{
"index": 4,
"name": "cf1:clazz",
"type": "string"
},
{
"index": 5,
"name": "cf1:ts",
"type": "string"
}
],
"versionColumn": {
"index": 5
},
"encoding": "utf-8"
}
}
}
]
}
}
7、mysqltophoenix
在Phoenix中創建STUDENT表
CREATE TABLE IF NOT EXISTS STUDENT (
ID VARCHAR NOT NULL PRIMARY KEY,
NAME VARCHAR,
AGE BIGINT,
GENDER VARCHAR ,
CLAZZ VARCHAR
);
編寫配置文件MySQLToPhoenix.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz"
],
"splitPk": "id",
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student?useSSL=false"
]
}
]
}
},
"writer": {
"name": "hbase11xsqlwriter",
"parameter": {
"batchSize": "256",
"column": [
"ID",
"NAME",
"AGE",
"GENDER",
"CLAZZ"
],
"hbaseConfig": {
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"nullMode": "skip",
"table": "STUDENT"
}
}
}
]
}
}
執行
datax.py xxxxx
3、datax自定義參數
Linux給文件替換字符串/替換內容/替換某行 (shell,sed)
參考:https://blog.csdn.net/Olivia_Vang/article/details/104091358
在文件里面替換
命令修改
sed 's/$$$/007/g' test.json
json文件(加入where篩選並加上參數)
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age",
"where":"last_mod>'$param$'"
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
寫定義參數腳本
#!/bin/bash
sed 's/\$param\$/20211207/g' test.json
執行腳本
獲取每天時間,實現增量
#!/bin/bash
param1=$(date "+%Y-%m-%d")
sed -i "s/'\$param\$'/$param1/g" /usr/local/soft/datax/job/test.json
datax.py /usr/local/soft/datax/job/test.json
sed -i "s/$param1/'\$param\$'/g" /usr/local/soft/datax/job/test.json
二、FlinkX
1、安裝
1、上傳解壓
直接Windows桌面拖進Linux系統上傳,如果不行可以下載依賴包
yum -y install lrzsz
安裝unzip:yum install unzip,並解壓壓縮包
unzip flinkx-1.10.zip -d /usr/local/soft/
2、配置環境變量,修改配置文件
web服務端口,不指定的話會隨機生成一個
vim flinkconf/flink-conf.yaml
rest.bind-port: 8888
給bin/flinkx這個文件加上執行權限
chmod a+x flinkx
配置環境變量
2、flinkx簡單使用
訪問GitHub搜素flinkx有快速入門
1、MySQLToHDFS
json文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '理科二班'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://master:9000/data/flinkx/student",
"defaultFS": "hdfs://master:9000",
"column": [
{
"name": "col1",
"index": 0,
"type": "string"
},
{
"name": "col2",
"index": 1,
"type": "string"
},
{
"name": "col3",
"index": 2,
"type": "string"
},
{
"name": "col4",
"index": 3,
"type": "string"
},
{
"name": "col5",
"index": 4,
"type": "string"
},
{
"name": "col6",
"index": 5,
"type": "string"
}
],
"fieldDelimiter": ",",
"fileType": "text",
"writeMode": "overwrite"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
啟動任務
flinkx -mode local -job /usr/local/soft/flinkx-1.10/job/mysqlToHDFS.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
監聽日志
flinkx 任務啟動后,會在執行命令的目錄下生成一個nohup.out文件
tail -f nohup.out//實時查看
tail -n 200 nohup.out //看后200行
通過客戶端查看,任務啟動客戶端可查看
http://master:8888
2、MySQLToHive
json文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '文科二班'",
"splitPk": "id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://master:10000/testflinkx",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"compress": "",
"charsetName": "UTF-8",
"maxFileSize": 1073741824,
"tablesColumn": "{\"student\":[{\"key\":\"id\",\"type\":\"string\"},{\"key\":\"name\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"string\"}]}",
"defaultFS": "hdfs://master:9000"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
在hive中創建testflinkx數據庫,並創建student分區表
create database testflinkx;
use testflinkx;
CREATE TABLE `student`(
`id` string,
`name` string,
`age` string)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
啟動hiveserver2
# 第一種方式:
hiveserver2
# 第二種方式:
hive --service hiveserver2
啟動任務
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
3、MySQLToHBase
json文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"score"
]
}
],
"column": [
"*"
],
"customSql": "",
"splitPk": "student_id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hbasewriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://master:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"table": "testFlinkx",
"rowkeyColumn": "$(cf1:student_id)_$(cf1:course_id)",
"column": [
{
"name": "cf1:student_id",
"type": "string"
},
{
"name": "cf1:course_id",
"type": "string"
},
{
"name": "cf1:score",
"type": "string"
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
啟動hbase 並創建testflinkx表
create 'testFlinkx','cf1'
啟動任務
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHBase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/