Mysql數據實時增量同步工具之go-mysql-transfer


@

數據實時增量同步工具之go-mysql-transfer:https://blog.csdn.net/weixin_42526326/article/details/121302961

Elasticsearch筆記之安裝、配置、Kibana基礎:https://blog.csdn.net/weixin_42526326/article/details/121302809

go-mysql-transfer官方手冊:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425

GO筆記之環境安裝:https://blog.csdn.net/weixin_42526326/article/details/121302777

技術選型:Mysql8 + go-mysql-transfer + ElasticSearch7.13

簡介

go-mysql-transfer是一款MySQL數據庫實時增量同步工具。需要GO環境

能夠監聽MySQL二進制日志(Binlog)的變動,將變更內容形成指定格式的消息,實時發送到接收端。從而在數據庫和接收端之間形成一個高性能、低延遲的增量數據同步更新管道。

工作需要研究了下阿里開源的MySQL Binlog增量訂閱消費組件canal,其功能強大、運行穩定,但是有些方面不是太符合需求,主要有如下三點:

1、需要自己編寫客戶端來消費canal解析到的數據

2、server-client模式,需要同時部署server和client兩個組件,我們的項目中有6個業務數據庫要實時同步到redis,意味着要多部署12個組件,硬件和運維成本都會增加。

3、從server端到client端需要經過一次網絡傳輸和序列化反序列化操作,然后再同步到接收端,感覺沒有直接懟到接收端更高效。

前提條件

  • MySQL 服務器需要開啟 row 模式的 binlog。

  • 因為要使用 mysqldump 命令,因此該進程的所在的服務器需要部署這一工具。

  • 這一工具使用 GoLang 開發,需要 Go 1.9+ 的環境進行構建。

  • 新版(7.13+)的本地es必須關閉安全模式才可以

    yml配置文件添加

    xpack.security.enabled: false
    
  • 可用的 MySQL、Elasticsearch 以及 Kibana 實例。權限需要大一些。

    • mysql binlog必須是ROW模式
    • 要同步的mysql數據表必須包含主鍵,否則直接忽略,這是因為如果數據表沒有主鍵,UPDATE和DELETE操作就會因為在ES中找不到對應的document而無法進行同步
    • 不支持程序運行過程中修改表結構
    • 要賦予用於連接mysql的賬戶RELOAD權限以及REPLICATION權限, SUPER權限
    GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'IP';
    GRANT RELOAD ON *.* TO 'elastic'@'IP';
    UPDATE mysql.user SET Super_Priv='Y' WHERE user='elastic' AND host='IP';
    

特性

1、簡單,不依賴其它組件,一鍵部署
2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,無需編寫客戶端,開箱即用
3、內置豐富的數據解析、消息生成規則,支持模板語法
4、支持Lua腳本擴展,可處理復雜邏輯,如:數據的轉換、清洗、打寬
5、集成Prometheus客戶端,支持監控、告警
6、集成Web Admin監控頁面
7、支持高可用集群部署
8、數據同步失敗重試
9、支持全量數據初始化

與同類工具比較

特色 Canal mysql_stream go-mysql-transfer Maxwell
開發語言 Java Python Golang Java
高可用 支持 支持 支持 支持
接收端 編碼定制 Kafka等(MQ) Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ、HTTP API 等 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件等
全量數據初始化 不支持 支持 支持 支持
數據格式 編碼定制 Json(固定格式) Json(規則配置) 模板語法 Lua腳本 JSON
性能(4-8TPS)

實現原理

1、go-mysql-transfer將自己偽裝成MySQL的Slave,

2、向Master發送dump協議獲取binlog,解析binlog並生成消息

3、將生成的消息實時、批量發送給接收端

如下圖所示:

img

go-mysql部署運行

開啟MySQL的binlog

修改app.yml

命令行運行
Windows直接運行 go-mysql-transfer.exe
Linux執行 nohup go-mysql-transfer &

監控

go-mysql-transfer支持兩種監控模式,Prometheus和內置的Web Admin

相關配置:

# web admin相關配置
enable_web_admin: true #是否啟用web admin,默認false
web_admin_port: 8060 #web監控端口,默認8060

直接訪問127.0.0.1:8060 可以看到監控界面同步數據到Elasticsearch
在這里插入圖片描述
在這里插入圖片描述

同步數據到Elasticsearch

配置文件——相關配置如下:

# app.yml
#目標類型
target: elasticsearch 

#elasticsearch連接配置
es_addrs: 127.0.0.1:9200 #連接地址,多個用逗號分隔
es_version: 7 # Elasticsearch版本,支持6和7、默認為7
#es_password:  # 用戶名
#es_version:  # 密碼

目前支持Elasticsearch6、Elasticsearch7兩個版本

基於規則同步

相關配置如下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數據同步時不能為空
    #column_lower_case: true #列名稱轉為小寫,默認為false
    #column_upper_case:false#列名稱轉為大寫,默認為false
    column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認為空
    #default_column_values: area_name=合肥  #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss

    #Elasticsearch相關
    es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱
    #es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導
    #  -  
    #  	column: REMARK #數據庫列名稱
    #    field: remark #映射后的ES字段名稱
    #    type: text #ES字段類型
    #    analyzer: ik_smart #ES分詞器,type為text此項有意義
    #    #format: #日期格式,type為date此項有意義
    #   -  
    #    column: USER_NAME #數據庫列名稱
    #    field: account #映射后的ES字段名稱
    #    type: keyword #ES字段類型

規則示例

t_user表,數據如下:
img

示例一

使用上述配置
自動創建的Mapping,如下:

img

同步到Elasticsearch的數據如下:

img

示例二

配置如下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    order_by_column: id #排序字段,存量數據同步時不能為空
    column_lower_case: true #列名稱轉為小寫,默認為false
    #column_upper_case:false#列名稱轉為大寫,默認為false
    #column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認為空
    default_column_values: area_name=合肥  #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss

    #Elasticsearch相關
    es_index: user_index #Index名稱,可以為空,默認使用表(Table)名稱
    es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導
      -  
        column: REMARK #數據庫列名稱
        field: remark #映射后的ES字段名稱
        type: text #ES字段類型
        analyzer: ik_smart #ES分詞器,type為text此項有意義
        #format: #日期格式,type為date此項有意義
      -  
        column: USER_NAME #數據庫列名稱
        field: account #映射后的ES字段名稱
        type: keyword #ES字段類型

es_mappings配置項表示定義索引的mappings(映射關系),不定義es_mappings則使用列類型自動創建索引的mappings(映射關系)。

創建的Mapping,如下:

img

同步到Elasticsearch的數據如下:
img

基於Lua腳本同步

使用Lua腳本可以實現更復雜的數據處理邏輯,go-mysql-transfer支持Lua5.1語法

Lua示例

t_user表,數據如下:

img

示例一

引入Lua腳本:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    order_by_column: id #排序字段,存量數據同步時不能為空
    lua_file_path: lua/t_user_es.lua   #lua腳本文件
    es_index: user_index #Elasticsearch Index名稱,可以為空,默認使用表(Table)名稱
    es_mappings: #索引映射,可以為空,為空時根據數據類型自行推導ES推導
      -  
        field: id #映射后的ES字段名稱
        type: keyword #ES字段類型
      -  
        field: userName #映射后的ES字段名稱
        type: keyword #ES字段類型
      -  
        field: password #映射后的ES字段名稱
        type: keyword #ES字段類型
      -  
        field: createTime #映射后的ES字段名稱
        type: date #ES字段類型
        format: yyyy-MM-dd HH:mm:ss #日期格式,type為date此項有意義
      -  
        field: remark #映射后的ES字段名稱
        type: text #ES字段類型
        analyzer: ik_smart #ES分詞器,type為text此項有意義
      -  
        field: source #映射后的ES字段名稱
        type: keyword #ES字段類型

其中,
es_mappings 表示索引的mappings(映射關系),不定義es_mappings則根據字段的值自動創建mappings(映射關系)。根據es_mappings 生成的mappings如下:

img

Lua腳本:

local ops = require("esOps") --加載elasticsearch操作模塊

local row = ops.rawRow()  --當前數據庫的一行數據,table類型,key為列名稱
local action = ops.rawAction()  --當前數據庫事件,包括:insert、update、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local remark = row["REMARK"] --獲取REMARK列的值

local result = {}  -- 定義一個table,作為結果集
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["remark"] = remark
result["source"] = "binlog" -- 數據來源

if action == "insert" then -- 只監聽新增事件
    ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串
end 

同步到Elasticsearch的數據如下:

img

示例二

引入Lua腳本:

    schema: eseap #數據庫名稱
    table: t_user #表名稱
    lua_file_path: lua/t_user_es2.lua   #lua腳本文件

未明確定義index名稱、mappings,es會根據值自動創建一個名為t_user的index。

使用如下腳本:

local ops = require("esOps") --加載elasticsearch操作模塊

local row = ops.rawRow()  --當前數據庫的一行數據,table類型,key為列名稱
local action = ops.rawAction()  --當前數據庫事件,包括:insert、update、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {}  -- 定義一個table,作為結果集
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["remark"] = remark
result["source"] = "binlog" -- 數據來源

if action == "insert" then -- 只監聽新增事件
	ops.INSERT("t_user",id,result) -- 新增,參數1為index名稱,string類型;參數2為要插入的數據主鍵;參數3為要插入的數據,tablele類型或者json字符串
end 

同步到Elasticsearch的數據如下:

img

esOps模塊

提供的方法如下:

  1. INSERT: 插入操作,如:ops.INSERT(index,id,result)。參數index為索引名稱,字符串類型;參數index為要插入數據的主鍵;參數result為要插入的數據,可以為table類型或者json字符串
  2. UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。參數index為索引名稱,字符串類型;參數index為要修改數據的主鍵;參數result為要修改的數據,可以為table類型或者json字符串
  3. DELETE: 刪除操作,如:ops.DELETE(index,id)。參數index為索引名稱,字符串類型;參數id為要刪除的數據主鍵,類型不限;

同步數據到RocketMQ

RocketMQ配置

相關配置如下:

# app.yml

target: rocketmq #目標類型
#rocketmq連接配置
rocketmq_name_servers: 127.0.0.1:9876 #rocketmq命名服務地址,多個用逗號分隔
#rocketmq_group_name: transfer_test_group #rocketmq group name,默認為空
#rocketmq_instance_name: transfer_test_group_ins #rocketmq instance name,默認為空
#rocketmq_access_key: RocketMQ #訪問控制 accessKey,默認為空
#rocketmq_secret_key: 12345678 #訪問控制 secretKey,默認為空

基於規則同步

相關配置如下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數據同步時不能為空
    #column_lower_case:false #列名稱轉為小寫,默認為false
    #column_upper_case:false#列名稱轉為大寫,默認為false
    column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認為空
    #column_mappings: CARD_NO=sfz    #列名稱映射,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    #default_column_values: source=binlog,area_name=合肥  #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值編碼,支持json、kv-commas、v-commas;默認為json
    #value_formatter: '{{.ID}}|{{.USER_NAME}}|{{.REAL_NAME}}|{{if eq .STATUS 0}}停用{{else}}啟用{{end}}'
    
    #rocketmq相關
    rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認使用表名稱
    #reserve_raw_data: false #保留update之前的數據,針對rocketmq、kafka、rabbitmq有用;默認為false

其中,
value_encoder表示值編碼格式,支持json、kv-commas、v-commas三種格式,不填寫默認為json,具體如下表:

格式 說明 舉例
json json
kv-commas key-value逗號分隔 id=1001,userName=admin,password=123456,createTime=2020-07-20 14:29:19
v-commas value逗號分隔 1001,admin,123456,2020-07-20 14:29:19

value_formatter表示值的格式化表達式,具體模板語法參見"表達式模板"章節,當value_formatter不為空時value_encoder無效。

reserve_raw_data表示是否保留update之前的數據,即保留修改之前的老數據,默認不保留

示例

t_user表,數據如下:

img

在RocketMQ中創建名稱為transfer_test_topic的topic,注意topic名稱一定要和rule規則中rocketmq_topic配置項的值一致

示例一

使用上述配置

insert事件,同步到RocketMQ的數據如下:
img
update事件,同步到RocketMQ的數據如下:
img
reserve_raw_data設置為true,update事件,同步到RocketMQ的數據如下:
img
其中,raw屬性為update之前的舊數據
delete事件,同步到RocketMQ的數據如下:
img

示例二

配置如下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數據同步時不能為空
    column_lower_case: true #列名稱轉為小寫,默認為false
    #column_upper_case:false#列名稱轉為大寫,默認為false
    #column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認為空
    column_mappings: USER_NAME=account    #列名稱映射,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    default_column_values: area_name=合肥  #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值編碼,支持json、kv-commas、v-commas;默認為json
    #value_formatter: '{{.ID}}|{{.USER_NAME}}|{{.REAL_NAME}}|{{if eq .STATUS 0}}停用{{else}}啟用{{end}}'

    #rocketmq相關
    rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認使用表名稱
    #reserve_raw_data: false #保留update之前的數據,針對rocketmq、kafka、rabbitmq有用;默認為false

其中,
column_mappings表示對列名稱進行重新映射

insert事件,同步到RocketMQ的數據如下:
img
其中,屬性名稱USER_NAME變為了account

示例三

配置如下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數據同步時不能為空
    column_lower_case: true #列名稱轉為小寫,默認為false
    #column_upper_case:false#列名稱轉為大寫,默認為false
    #column_underscore_to_camel: true #列名稱下划線轉駝峰,默認為false
    # 包含的列,多值逗號分隔,如:id,name,age,area_id  為空時表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔,如:id,name,age,area_id  默認為空
    #column_mappings: USER_NAME=account    #列名稱映射,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    default_column_values: area_name=合肥  #默認的列-值,多個用逗號分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化,不填寫默認yyyy-MM-dd HH:mm:ss
    value_encoder: v-commas  #值編碼,支持json、kv-commas、v-commas;默認為json
    #value_formatter: '{{.ID}}|{{.USER_NAME}}' # 值格式化表達式,如:{{.ID}}|{{.USER_NAME}},{{.ID}}表示ID字段的值、{{.USER_NAME}}表示USER_NAME字段的值

    #rocketmq相關
    rocketmq_topic: transfer_test_topic #rocketmq topic,可以為空,默認使用表名稱

其中,
value_encoder表示消息編碼方式

insert事件,同步到RocketMQ的數據如下:

img

同步數據到Redis

同步數據到MongoDB

同步數據到RocketMQ

同步數據到Kafka

同步數據到RabbitMQ

全量數據導入

全量數據導入

Lua腳本

Lua 是一種輕量小巧的腳本語言, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。開發者只需要花費少量時間就能大致掌握Lua的語法,照虎畫貓寫出可用的腳本。

基於Lua的高擴展性,可以實現更為復雜的數據解析、消息生成邏輯,定制需要的數據格式。

性能

性能測試地址

總結

  • go-mysql-elasticsearch 實現增量|全量 數據同步
  • go-mysql-elasticsearch可以實現同步insert、update、delete操作
  • go-mysql-elasticsearch 穩定性差點,出現過無法同步成功的情況,沒有詳細日志,不便於排查

常見問題

如何重置同步位置(Position)

1、停掉go-mysql-transfer應用
2、在數據庫執行 show master status語句,會看到結果如下:

File Position Binlog-Do-DB Binlog-Ignore-DB
mysql-bin.000025 993779648

3、使用File和Position列的值
執行命令: ./go-mysql-transfer -config app.yml -position mysql-bin.000025 993779648

4、重啟應用: ./go-mysql-transfer -config app.yml

如何同步多張表

使用yml的數組語法:

#一組連詞線開頭的行,構成一個數組
animal:
  - Cat
  - Dog
  - Goldfish

go-mysql-transfer支持單庫多表,也支持多庫多表,配置如下:

rule:
  -
    schema: eseap #數據庫名稱
    table: t_user #表名稱
    column_underscore_to_camel: true 
    value_encoder: json  
    redis_structure: string 
    redis_key_prefix: USER_
  -
    schema: eseap #數據庫名稱
    table: t_sign #表名稱
    column_underscore_to_camel: true 
    value_encoder: json  
    redis_structure: string 
    redis_key_prefix: SIGN_
  -
    schema: gojob #數據庫名稱
    table: t_triggered #表名稱
    column_underscore_to_camel: true 
    value_encoder: json  
    redis_structure: string 
    redis_key_prefix: TRIGGERED_

t_user表和t_sign表屬於eseap數據庫,t_triggered表屬於gojob數據庫

參考:

官方手冊:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425

https://blog.csdn.net/weixin_30884743/article/details/114171282


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM