@
數據實時增量同步工具之go-mysql-transfer:https://blog.csdn.net/weixin_42526326/article/details/121302961
Elasticsearch筆記之安裝、配置、Kibana基礎:https://blog.csdn.net/weixin_42526326/article/details/121302809
go-mysql-transfer官方手冊:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425
GO筆記之環境安裝:https://blog.csdn.net/weixin_42526326/article/details/121302777
技術選型:Mysql8 + go-mysql-transfer + ElasticSearch7.13
簡介
go-mysql-transfer是一款MySQL數據庫實時增量同步工具。需要GO環境
能夠監聽MySQL二進制日志(Binlog)的變動,將變更內容形成指定格式的消息,實時發送到接收端。從而在數據庫和接收端之間形成一個高性能、低延遲的增量數據同步更新管道。
工作需要研究了下阿里開源的MySQL Binlog增量訂閱消費組件canal,其功能強大、運行穩定,但是有些方面不是太符合需求,主要有如下三點:
1、需要自己編寫客戶端來消費canal解析到的數據
2、server-client模式,需要同時部署server和client兩個組件,我們的項目中有6個業務數據庫要實時同步到redis,意味着要多部署12個組件,硬件和運維成本都會增加。
3、從server端到client端需要經過一次網絡傳輸和序列化反序列化操作,然后再同步到接收端,感覺沒有直接懟到接收端更高效。
前提條件
-
MySQL 服務器需要開啟 row 模式的 binlog。
-
因為要使用 mysqldump 命令,因此該進程的所在的服務器需要部署這一工具。
-
這一工具使用 GoLang 開發,需要 Go 1.9+ 的環境進行構建。
-
新版(7.13+)的本地es必須關閉安全模式才可以
yml配置文件添加
xpack.security.enabled: false
-
可用的 MySQL、Elasticsearch 以及 Kibana 實例。權限需要大一些。
- mysql binlog必須是ROW模式
- 要同步的mysql數據表必須包含主鍵,否則直接忽略,這是因為如果數據表沒有主鍵,UPDATE和DELETE操作就會因為在ES中找不到對應的document而無法進行同步
- 不支持程序運行過程中修改表結構
- 要賦予用於連接mysql的賬戶RELOAD權限以及REPLICATION權限, SUPER權限
GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'IP'; GRANT RELOAD ON *.* TO 'elastic'@'IP'; UPDATE mysql.user SET Super_Priv='Y' WHERE user='elastic' AND host='IP';
特性
1、簡單,不依賴其它組件,一鍵部署
2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,無需編寫客戶端,開箱即用
3、內置豐富的數據解析、消息生成規則,支持模板語法
4、支持Lua腳本擴展,可處理復雜邏輯,如:數據的轉換、清洗、打寬
5、集成Prometheus客戶端,支持監控、告警
6、集成Web Admin監控頁面
7、支持高可用集群部署
8、數據同步失敗重試
9、支持全量數據初始化
與同類工具比較
特色 | Canal | mysql_stream | go-mysql-transfer | Maxwell |
---|---|---|---|---|
開發語言 | Java | Python | Golang | Java |
高可用 | 支持 | 支持 | 支持 | 支持 |
接收端 | 編碼定制 | Kafka等(MQ) | Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ、HTTP API 等 | Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件等 |
全量數據初始化 | 不支持 | 支持 | 支持 | 支持 |
數據格式 | 編碼定制 | Json(固定格式) | Json(規則配置) 模板語法 Lua腳本 | JSON |
性能(4-8TPS) |
實現原理
1、go-mysql-transfer將自己偽裝成MySQL的Slave,
2、向Master發送dump協議獲取binlog,解析binlog並生成消息
3、將生成的消息實時、批量發送給接收端
如下圖所示:
go-mysql部署運行
開啟MySQL的binlog
修改app.yml
命令行運行
Windows直接運行 go-mysql-transfer.exe
Linux執行 nohup go-mysql-transfer &
監控
go-mysql-transfer支持兩種監控模式,Prometheus和內置的Web Admin
相關配置:
# web admin相關配置
enable_web_admin: true #是否啟用web admin,默認false
web_admin_port: 8060 #web監控端口,默認8060
直接訪問127.0.0.1:8060 可以看到監控界面同步數據到Elasticsearch
同步數據到Elasticsearch
配置文件——相關配置如下:
# app.yml
#目標類型
target: elasticsearch
#elasticsearch連接配置
es_addrs: 127.0.0.1:9200 #連接地址,多個用逗號分隔
es_version: 7 # Elasticsearch版本,支持6和7、默認為7
#es_password: # 用戶名
#es_version: # 密碼
目前支持Elasticsearch6、Elasticsearch7兩個版本
基於規則同步
相關配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數據同步時不能為空
#column_lower_case: true #列名稱轉為小寫,默認為false
#column_upper_case:false#列名稱轉為大寫,默認為false
column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空
#default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
#Elasticsearch相關
es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱
#es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導
# -
# column: REMARK #數據庫列名稱
# field: remark #映射后的ES字段名稱
# type: text #ES字段類型
# analyzer: ik_smart #ES分詞器,type為text此項有意義
# #format: #日期格式,type為date此項有意義
# -
# column: USER_NAME #數據庫列名稱
# field: account #映射后的ES字段名稱
# type: keyword #ES字段類型
規則示例
t_user表,數據如下:
示例一
使用上述配置
自動創建的Mapping,如下:
同步到Elasticsearch的數據如下:
示例二
配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
order_by_column: id #排序字段,存量數據同步時不能為空
column_lower_case: true #列名稱轉為小寫,默認為false
#column_upper_case:false#列名稱轉為大寫,默認為false
#column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空
default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
#Elasticsearch相關
es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱
es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導
-
column: REMARK #數據庫列名稱
field: remark #映射后的ES字段名稱
type: text #ES字段類型
analyzer: ik_smart #ES分詞器,type為text此項有意義
#format: #日期格式,type為date此項有意義
-
column: USER_NAME #數據庫列名稱
field: account #映射后的ES字段名稱
type: keyword #ES字段類型
es_mappings配置項表示定義索引的mappings(映射關系),不定義es_mappings則使用列類型自動創建索引的mappings(映射關系)。
創建的Mapping,如下:
同步到Elasticsearch的數據如下:
基於Lua腳本同步
使用Lua腳本可以實現更復雜的數據處理邏輯,go-mysql-transfer支持Lua5.1語法
Lua示例
t_user表,數據如下:
示例一
引入Lua腳本:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
order_by_column: id #排序字段,存量數據同步時不能為空
lua_file_path: lua/t_user_es.lua #lua腳本文件
es_index: user_index #Elasticsearch Index名稱,可以為空,默認使用表(Table)名稱
es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導
-
field: id #映射后的ES字段名稱
type: keyword #ES字段類型
-
field: userName #映射后的ES字段名稱
type: keyword #ES字段類型
-
field: password #映射后的ES字段名稱
type: keyword #ES字段類型
-
field: createTime #映射后的ES字段名稱
type: date #ES字段類型
format: yyyy-MM-dd HH:mm:ss #日期格式,type為date此項有意義
-
field: remark #映射后的ES字段名稱
type: text #ES字段類型
analyzer: ik_smart #ES分詞器,type為text此項有意義
-
field: source #映射后的ES字段名稱
type: keyword #ES字段類型
其中,
es_mappings 表示索引的mappings(映射關系),不定義es_mappings則根據字段的值自動創建mappings(映射關系)。根據es_mappings 生成的mappings如下:
Lua腳本:
local ops = require("esOps") --加載elasticsearch操作模塊
local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱
local action = ops.rawAction() --當前數據庫事件,包括:insert、update、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local remark = row["REMARK"] --獲取REMARK列的值
local result = {} -- 定義一個table,作為結果集
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["remark"] = remark
result["source"] = "binlog" -- 數據來源
if action == "insert" then -- 只監聽新增事件
ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串
end
同步到Elasticsearch的數據如下:
示例二
引入Lua腳本:
schema: eseap #數據庫名稱
table: t_user #表名稱
lua_file_path: lua/t_user_es2.lua #lua腳本文件
未明確定義index名稱、mappings,es會根據值自動創建一個名為t_user的index。
使用如下腳本:
local ops = require("esOps") --加載elasticsearch操作模塊
local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱
local action = ops.rawAction() --當前數據庫事件,包括:insert、update、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個table,作為結果集
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["remark"] = remark
result["source"] = "binlog" -- 數據來源
if action == "insert" then -- 只監聽新增事件
ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串
end
同步到Elasticsearch的數據如下:
esOps模塊
提供的方法如下:
- INSERT: 插入操作,如:ops.INSERT(index,id,result)。參數index為索引名稱,字符串類型;參數index為要插入數據的主鍵;參數result為要插入的數據,可以為table類型或者json字符串
- UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。參數index為索引名稱,字符串類型;參數index為要修改數據的主鍵;參數result為要修改的數據,可以為table類型或者json字符串
- DELETE: 刪除操作,如:ops.DELETE(index,id)。參數index為索引名稱,字符串類型;參數id為要刪除的數據主鍵,類型不限;
同步數據到RocketMQ
RocketMQ配置
相關配置如下:
# app.yml
target: rocketmq #目標類型
#rocketmq連接配置
rocketmq_name_servers: 127.0.0.1:9876 #rocketmq命名服務地址,多個用逗號分隔
#rocketmq_group_name: transfer_test_group #rocketmq group name,默認為空
#rocketmq_instance_name: transfer_test_group_ins #rocketmq instance name,默認為空
#rocketmq_access_key: RocketMQ #訪問控制 accessKey,默認為空
#rocketmq_secret_key: 12345678 #訪問控制 secretKey,默認為空
基於規則同步
相關配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數據同步時不能為空
#column_lower_case:false #列名稱轉為小寫,默認為false
#column_upper_case:false#列名稱轉為大寫,默認為false
column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空
#column_mappings: CARD_NO=sfz #列名稱映射,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
#default_column_values: source=binlog,area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
value_encoder: json #值編碼,支持json、kv-commas、v-commas;默認為json
#value_formatter: '{{.ID}}|{{.USER_NAME}}|{{.REAL_NAME}}|{{if eq .STATUS 0}}停用{{else}}啟用{{end}}'
#rocketmq相關
rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認使用表名稱
#reserve_raw_data: false #保留update之前的數據,針對rocketmq、kafka、rabbitmq有用;默認為false
其中,
value_encoder表示值編碼格式,支持json、kv-commas、v-commas三種格式,不填寫默認為json,具體如下表:
格式 | 說明 | 舉例 |
---|---|---|
json | json | |
kv-commas | key-value逗號分隔 | id=1001,userName=admin,password=123456,createTime=2020-07-20 14:29:19 |
v-commas | value逗號分隔 | 1001,admin,123456,2020-07-20 14:29:19 |
value_formatter表示值的格式化表達式,具體模板語法參見"表達式模板"章節,當value_formatter不為空時value_encoder無效。
reserve_raw_data表示是否保留update之前的數據,即保留修改之前的老數據,默認不保留
示例
t_user表,數據如下:
在RocketMQ中創建名稱為transfer_test_topic的topic,注意topic名稱一定要和rule規則中rocketmq_topic配置項的值一致
示例一
使用上述配置
insert事件,同步到RocketMQ的數據如下:
update事件,同步到RocketMQ的數據如下:
reserve_raw_data設置為true,update事件,同步到RocketMQ的數據如下:
其中,raw屬性為update之前的舊數據
delete事件,同步到RocketMQ的數據如下:
示例二
配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數據同步時不能為空
column_lower_case: true #列名稱轉為小寫,默認為false
#column_upper_case:false#列名稱轉為大寫,默認為false
#column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空
column_mappings: USER_NAME=account #列名稱映射,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
value_encoder: json #值編碼,支持json、kv-commas、v-commas;默認為json
#value_formatter: '{{.ID}}|{{.USER_NAME}}|{{.REAL_NAME}}|{{if eq .STATUS 0}}停用{{else}}啟用{{end}}'
#rocketmq相關
rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認使用表名稱
#reserve_raw_data: false #保留update之前的數據,針對rocketmq、kafka、rabbitmq有用;默認為false
其中,
column_mappings表示對列名稱進行重新映射
insert事件,同步到RocketMQ的數據如下:
其中,屬性名稱USER_NAME變為了account
示例三
配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數據同步時不能為空
column_lower_case: true #列名稱轉為小寫,默認為false
#column_upper_case:false#列名稱轉為大寫,默認為false
#column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
# 包含的列,多值逗號分隔,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id 默認為空
#column_mappings: USER_NAME=account #列名稱映射,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
value_encoder: v-commas #值編碼,支持json、kv-commas、v-commas;默認為json
#value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值
#rocketmq相關
rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認使用表名稱
其中,
value_encoder表示消息編碼方式
insert事件,同步到RocketMQ的數據如下:
同步數據到Redis
同步數據到MongoDB
同步數據到RocketMQ
同步數據到Kafka
同步數據到RabbitMQ
全量數據導入
Lua腳本
Lua 是一種輕量小巧的腳本語言, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。開發者只需要花費少量時間就能大致掌握Lua的語法,照虎畫貓寫出可用的腳本。
基於Lua的高擴展性,可以實現更為復雜的數據解析、消息生成邏輯,定制需要的數據格式。
性能
總結
- go-mysql-elasticsearch 實現增量|全量 數據同步
- go-mysql-elasticsearch可以實現同步insert、update、delete操作
- go-mysql-elasticsearch 穩定性差點,出現過無法同步成功的情況,沒有詳細日志,不便於排查
常見問題
如何重置同步位置(Position)
1、停掉go-mysql-transfer應用
2、在數據庫執行 show master status語句,會看到結果如下:
File | Position | Binlog-Do-DB | Binlog-Ignore-DB |
---|---|---|---|
mysql-bin.000025 | 993779648 |
3、使用File和Position列的值
執行命令: ./go-mysql-transfer -config app.yml -position mysql-bin.000025 993779648
4、重啟應用: ./go-mysql-transfer -config app.yml
如何同步多張表
使用yml的數組語法:
#一組連詞線開頭的行,構成一個數組
animal:
- Cat
- Dog
- Goldfish
go-mysql-transfer支持單庫多表,也支持多庫多表,配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
column_underscore_to_camel: true
value_encoder: json
redis_structure: string
redis_key_prefix: USER_
-
schema: eseap #數據庫名稱
table: t_sign #表名稱
column_underscore_to_camel: true
value_encoder: json
redis_structure: string
redis_key_prefix: SIGN_
-
schema: gojob #數據庫名稱
table: t_triggered #表名稱
column_underscore_to_camel: true
value_encoder: json
redis_structure: string
redis_key_prefix: TRIGGERED_
t_user表和t_sign表屬於eseap數據庫,t_triggered表屬於gojob數據庫
參考:
官方手冊:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425
https://blog.csdn.net/weixin_30884743/article/details/114171282