canal本質就是"冒充"從庫,通過訂閱mysql bin-log來獲取數據庫的更改信息。
mysql配置(my.cnf)
mysql需要配置my.cnf開啟bin-log日志並且將bin-log日志格式設置為row, 同時為了防止bin-log日志占用過多磁盤,可以設置一下過期時間,
[mysqld]
log-bin=mysql-bin # 打開binlog
binlog-format=ROW # ROW格式
server_id=1 # mysql Replication 需要設置 在mysql集群里唯一
expire_logs_days=7 # binlog文件保存7天
max_binlog_size = 500m # 每個binlog日志文件大小
canal配置
除了kafka之外,canal還支持將數據庫修改的消息投遞到rocketMQ, 或者不經過消息隊列直接投遞到canal的客戶端,然后再在客戶端實現自己的代碼(如寫入其他存儲/其他消息隊列) ,但是只能選其一。而如果選擇canal客戶端的方式, 一個canal server也只能將消息投遞到一個canal client。
但是可以開啟多個canal服務端和客戶端(同一個實例,即對mysql來說只算是一個從庫), 他們通過zookeeper保證只有一個服務端和客戶端是有效的,其他只是作為HA的冗余。
然后需要修改canal目錄下(以下為近最小配置)
conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
# 數據庫address
canal.instance.master.address = 127.0.0.1:3306
# 數據庫賬號密碼
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 需要訂閱的數據庫.表名 默認全部
canal.instance.filter.regex = .\*\\\\..\* # 去掉轉義符其實就是 .*\..*
# topic名 固定
canal.mq.topic=canal
# 動態topic
# canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
# 庫名.表名: 唯一主鍵,多個表之間用逗號分隔
# canal.mq.partitionHash=mytest.person:id,mytest.role:id
其中動態topic 和 主鍵hash看上去有點難理解,去看其他人的博客找到的解釋和例子如下
動態topic
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號分隔
例子1:test.test 指定匹配的單表,發送到以 test_test為名字的topic上
例子2:.\…* 匹配所有表,每個表都會發送到各自表名的topic上
例子3:test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
例子4:test.* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
例子5:test,test1.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1.test1的表發送到對應的test1_test1 topic上,其余的表發送到默認的canal.mq.topic值
支持指定topic名稱匹配, 配置格式:topicName:schema 或 schema.table,多個配置之間使用逗號分隔, 多組之間使用 ; 分隔
例子:test:test,test1.test1;test2:test2,test3.test1 針對匹配的表會發送到指定的topic上
————————————————
版權聲明:本文為CSDN博主「BillowX_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_35852328/article/details/87600871
主鍵
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔
例子1:test.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
例子2:.\…:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
例子3:.\…:pkpkpk 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
例子4: 匹配規則啥都不寫,則默認發到0這個partition上
例子5:.\…* ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名
按表hash: 一張表的所有數據可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
例子6: test.test:id,.\…* , 針對test的表按照id散列,其余的表按照table散列
注意:大家可以結合自己的業務需求,設置匹配規則,多條匹配規則之間是按照順序進行匹配(命中一條規則就返回)
————————————————
版權聲明:本文為CSDN博主「BillowX_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_35852328/article/details/87600871
最后實現消費kafka上canal topic上消息的代碼
這里以go為例,可以寫入到elasticsearch/redis/其他
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v6"
"os"
)
var esClient *elasticsearch.Client
func init() {
var err error
config := elasticsearch.Config{}
config.Addresses = []string{"http://127.0.0.1:9200"}
esClient, err = elasticsearch.NewClient(config)
checkErr(err)
}
type Msg struct {
Data []struct {
Id string `json:"id"`
A string `json:"a"`
} `json:"data"`
Type string `json:"type"`
DataBase string `json:"database"`
Table string `json:"table"`
}
func checkErr(err error) {
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
}
type Consumer struct{}
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
// fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
msg := &Msg{}
err := json.Unmarshal(message.Value, msg)
checkErr(err)
if msg.DataBase == "test" && msg.Table == "tbltest" {
if msg.Type == "INSERT" {
for k, _ := range msg.Data {
// 寫elasticsearch 邏輯
body := map[string]interface{}{
"id": msg.Data[k].Id,
"a": msg.Data[k].A,
}
jsonBody, _ := json.Marshal(body)
req := esapi.IndexRequest{
Index: msg.DataBase,
DocumentID: msg.Table + "_" + msg.Data[k].Id,
Body: bytes.NewReader(jsonBody),
}
res, err := req.Do(context.Background(), esClient)
checkErr(err)
fmt.Println(res.String())
res.Body.Close()
session.MarkMessage(message, "")
}
}
}
}
return nil
}
func main() {
consumer := &Consumer{}
config := sarama.NewConfig()
config.Version = sarama.MaxVersion
client, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "tg", config)
checkErr(err)
ctx := context.Background()
client.Consume(ctx, []string{"canal"}, consumer)
}