canal+kafka訂閱Mysql binlog將數據異構到elasticsearch(或其他存儲方式)


canal本質就是"冒充"從庫,通過訂閱mysql bin-log來獲取數據庫的更改信息。

mysql配置(my.cnf)

mysql需要配置my.cnf開啟bin-log日志並且將bin-log日志格式設置為row, 同時為了防止bin-log日志占用過多磁盤,可以設置一下過期時間,

[mysqld]
log-bin=mysql-bin # 打開binlog
binlog-format=ROW # ROW格式
server_id=1 # mysql Replication 需要設置 在mysql集群里唯一

expire_logs_days=7 # binlog文件保存7天
max_binlog_size = 500m # 每個binlog日志文件大小 

canal配置

除了kafka之外,canal還支持將數據庫修改的消息投遞到rocketMQ, 或者不經過消息隊列直接投遞到canal的客戶端,然后再在客戶端實現自己的代碼(如寫入其他存儲/其他消息隊列) ,但是只能選其一。而如果選擇canal客戶端的方式, 一個canal server也只能將消息投遞到一個canal client。

但是可以開啟多個canal服務端和客戶端(同一個實例,即對mysql來說只算是一個從庫), 他們通過zookeeper保證只有一個服務端和客戶端是有效的,其他只是作為HA的冗余。

然后需要修改canal目錄下(以下為近最小配置)

conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234

# 數據庫address
canal.instance.master.address = 127.0.0.1:3306

# 數據庫賬號密碼
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal

# 需要訂閱的數據庫.表名 默認全部
canal.instance.filter.regex = .\*\\\\..\*  # 去掉轉義符其實就是 .*\..*

# topic名 固定
canal.mq.topic=canal

# 動態topic
# canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*

# 庫名.表名: 唯一主鍵,多個表之間用逗號分隔
# canal.mq.partitionHash=mytest.person:id,mytest.role:id

其中動態topic 和 主鍵hash看上去有點難理解,去看其他人的博客找到的解釋和例子如下

動態topic

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號分隔

例子1:test.test 指定匹配的單表,發送到以 test_test為名字的topic上
例子2:.\…* 匹配所有表,每個表都會發送到各自表名的topic上
例子3:test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
例子4:test.* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
例子5:test,test1.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1.test1的表發送到對應的test1_test1 topic上,其余的表發送到默認的canal.mq.topic值
支持指定topic名稱匹配, 配置格式:topicName:schema 或 schema.table,多個配置之間使用逗號分隔, 多組之間使用 ; 分隔

例子:test:test,test1.test1;test2:test2,test3.test1 針對匹配的表會發送到指定的topic上
————————————————
版權聲明:本文為CSDN博主「BillowX_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_35852328/article/details/87600871

主鍵

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔

例子1:test.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
例子2:.\…:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
例子3:.\…
:pkpkpk 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
例子4: 匹配規則啥都不寫,則默認發到0這個partition上
例子5:.\…* ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名
按表hash: 一張表的所有數據可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
例子6: test.test:id,.\…* , 針對test的表按照id散列,其余的表按照table散列
注意:大家可以結合自己的業務需求,設置匹配規則,多條匹配規則之間是按照順序進行匹配(命中一條規則就返回)
————————————————
版權聲明:本文為CSDN博主「BillowX_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_35852328/article/details/87600871

最后實現消費kafka上canal topic上消息的代碼

這里以go為例,可以寫入到elasticsearch/redis/其他

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/elastic/go-elasticsearch/esapi"
	"github.com/elastic/go-elasticsearch/v6"
	"os"
)

var esClient *elasticsearch.Client

func init() {
	var err error
	config := elasticsearch.Config{}
	config.Addresses = []string{"http://127.0.0.1:9200"}
	esClient, err = elasticsearch.NewClient(config)
	checkErr(err)
}

type Msg struct {
	Data []struct {
		Id string `json:"id"`
		A  string `json:"a"`
	} `json:"data"`
	Type     string `json:"type"`
	DataBase string `json:"database"`
	Table    string `json:"table"`
}

func checkErr(err error) {
	if err != nil {
		fmt.Println(err)
		os.Exit(-1)
	}
}

type Consumer struct{}

func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
    // fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
		msg := &Msg{}
		err := json.Unmarshal(message.Value, msg)
		checkErr(err)
		if msg.DataBase == "test" && msg.Table == "tbltest" {
			if msg.Type == "INSERT" {
				for k, _ := range msg.Data {
					// 寫elasticsearch 邏輯
					body := map[string]interface{}{
						"id": msg.Data[k].Id,
						"a":  msg.Data[k].A,
					}
					jsonBody, _ := json.Marshal(body)
					req := esapi.IndexRequest{
						Index:      msg.DataBase,
						DocumentID: msg.Table + "_" + msg.Data[k].Id,
						Body:       bytes.NewReader(jsonBody),
					}
					res, err := req.Do(context.Background(), esClient)
					checkErr(err)
					fmt.Println(res.String())
					res.Body.Close()
					session.MarkMessage(message, "")
				}
			}
		}
	}
	return nil
}

func main() {
	consumer := &Consumer{}

	config := sarama.NewConfig()
	config.Version = sarama.MaxVersion
	client, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "tg", config)
	checkErr(err)
	ctx := context.Background()

	client.Consume(ctx, []string{"canal"}, consumer)
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM