使用 docker 運行 RocketMQ + Canal + ElasticSearch + Golang 示例


0 引言

在很多業務情況下,我們都會在系統中引入ElasticSearch搜索引擎作為做全文檢索的優化方案。

如果數據庫數據發生更新,這時候就需要在業務代碼中寫一段同步更新ElasticSearch的代碼。

下面我會以一個blog文章管理為例來演示canal+RocketMQGolang實現MySQLElasticSearch的數據同步。

示例地址:https://gitee.com/thepoy/RocketMQ_Canal_ElasticSearch_Golang

盡量不要在 macOS 中使用,創建的容器多多少少會有問題,出問題時很難找到症結所在,而在 linux 系統中使用則一切正常。

1 RocketMQ

RocketMQ是沒有官方鏡像的,所以需要在本地創建:

cd rocketMQ
docker build --no-cache -f Dockerfile -t rocketmq:4.8.0 --build-arg version=4.8.0 .

可根據自己的需求對 Dockerfile 進行修改

修改環境變量文件.env中的主機地址為自己的 ip 地址,然后使用 rocketMQ 目錄中的配置文件創建容器:

docker-compose --file compose.yml up

2 Canal

2.1 創建容器

使用項目根目錄中的配置文件創建mysqlcanal-admincanal-server容器:

cd ..
docker-compose --file compose.yml up

也有一個環境變量文件需要修改,另外,compos 文件中的信息也需要根據需要修改,如 mysql 的 root 密碼。

2.2 為 canal 賬號授權

創建 mysql 容器時也創建了 canal 賬號,需要為這個賬號授權。

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.3 打開 canal 管理后台

http://localhost:8089,打開后需要用admin賬號登錄,默認密碼為123456,管理后台的界面如下圖所示:

image-20210507142335569

因為 compose.yml 文件中已經配置了 canal-server,所以在后台中能看見已經啟動的一個 server。

2.4 配置實例 / Instance

點擊側邊欄的Instance管理,選擇新建 Instance,選擇那個唯一的主機,再點擊載入模板,修改下面的一些參數:

# 取消第 3 行中 mysql slaveId 的注釋,隨便修改為一個數字(不能是 1,因為 mysql 的 server_id=1)
canal.instance.mysql.slaveId=1234
# 修改 mysql 的地址,canal-admin 容器中也有一個 mysql 實例,我們不使用這個 mysql,而使用單獨的 mysql 容器
canal.instance.master.address=192.168.31.129:3306
# 改成自己的數據庫信息(需要監聽的數據庫,新建一個 database 就可以),這一行需要添加
canal.instance.defaultDatabaseName = blog
# table regex 需要過濾的表 這里數據庫的中所有表
canal.instance.filter.regex = .\*\\..\*
# MQ 配置 日志數據會發送到 blog_articles 這個 topic 上
canal.mq.topic=blog_articles

實例名稱隨便填一個就行。

創建好的新實例默認是停止狀態,將其啟動。

image-20210507145221376

創建 database 和 table:

CREATE DATABASE IF NOT EXISTS `blog`;
USE blog;
CREATE TABLE IF NOT EXISTS `blog_articles` (
	`id` INT AUTO_INCREMENT PRIMARY KEY NOT NULL,
    `title` VARCHAR(100) NOT NULL UNIQUE,
    `content` TEXT NOT NULL,
    `created_date` VARCHAR(10) NOT NULL
);

2.5 配置 canal-server

image-20210507145359290

修改下面的參數:

# 默信是 tcp, 修改為 rocketMQ
canal.serverMode = rocketMQ
##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = blog
rocketmq.namesrv.addr = 192.168.31.129:9876

保存后 server 會重啟,這時打開 rocketMQ 控制台,能夠看到新增加了一個主題blog_articles

image-20210507150313195

可以通過添加一行數據來測試是否成功:

INSERT INTO blog.blog_articles
(title, content, created_date)
VALUES('test1', '這是第 1 個測試文章', '2020-01-01');

添加后,在 rocketMQ 控制台查看消息:

image-20210507150908030

image-20210507151008624

可以看到,添加數據的消息已經產生等待消費。

3 Elasticsearch

elasticsearch 容器會在使用配置文件創建 Canal 時一同創建,需要注意的是,如果你想修改 elasticsearch 的 tag,可以在.env文件中修改ES_TAG的值。

我沒有創建 Kibana 容器,有需要的話可以自行創建。

4 代碼設計

當數據庫發生變化時,Canal 會將變化信息發送到 RocketMQ 中,所以我們只需要消費 RocketMQ 中的消息就可以做到即時或很快地將變化的數據同步到 Elasticsearch 中。

4.1 RocketMQ

常量
const (
    // topic 在 Canal 中已經配置了,這里一定不能寫錯
	topic              string = "blog_articles"
    // 消費者組可以自定義,但要與 2.5 節中設置的 rocketmq.producer.group 相同
	consumerGroup      string = "blog"
)

從環境變量中獲取host,並生成server

var (
	server string
	Host   string
)

func init() {
	Host = os.Getenv("HOST")
	if Host == "" {
		Host = "localhost"
	}
	server = Host + ":9876"
}
結構體的設計

雖然代碼中沒有用到這個結構體,但我覺得需要拿出來聊一聊:

type ChangedData struct {
	// 變化的文檔集合
	Data []es.Document `json:"data"`
	// 發生變化的數據庫
	Database string `json:"database"`
	// 數據庫內執行時間
	ES uint64 `json:"es"`
	// 就是 id
	ID uint `json:"id"`
	// 是否為 DDL 語句,create database、create table、alter table
	IsDDL bool `json:"isDdl"`
	// 表結構的字段類型
	MysqlType map[string]string `json:"mysqlType"`
	// 主鍵名稱
	PrimaryKeyNames []string `json:"pkNames"`
	// sql 語句
	SQL string `json:"sql"`
	// sql 語句類型
	SqlType map[string]uint `json:"sqlType"`
	// 表名稱
	Table string `json:"table"`
	// 操作類型,(新增)INSERT、(更新)UPDATE、(刪除)DELETE、(刪除表)ERASE等等
	Type string `json:"type"`
	// 數據庫內解析時間
	Timestamp uint `json:"ts"`
	// 舊數據
	Old []map[string]string `json:"old"`
}

其中es.Document結構如下:

type Document struct {
	ID          string `json:"id,omitempty"`
	Title       string `json:"title,omitempty"`
	Content     string `json:"content,omitempty"`
	CreatedDate string `json:"created_date,omitempty"`
}
使用第三方 json 庫

這也是為什么沒用到上面的結構體的原因。

使用 json 標准庫處理消息數據並同步到 es 中,完全是小題大做,會浪費很多的性能。

data := gjson.Get(string(msg.body), "data")

使用 gjson 庫,可以方便地從 json 字符串中獲取想要的數據,並進行后續處理,無需將整個 json 反序列化。

使用 context 阻塞或退出消費線程

啟動消費訂閱后,阻塞多久,就會消費多久,為了能夠控制何時結束消費,這里使用contextcancle()函數控制:

	err = c.Start()
	...

	select {
	case <-ctx.Done():
		fmt.Println(strings.Repeat("*", 60))
		fmt.Println("shutdown consumer")
		fmt.Println(strings.Repeat("*", 60))
	}

	err = c.Shutdown()
	...

4.2 Elasticsearch

es 的代碼是通用的,沒有特別說明的意義,直接看代碼即可。

4.3 二者結合

結合 RocketMQ 和 Elasticsearch 的代碼,就能完成消息的即時消費文檔的即時更新

需要從消息中取出的數據

上面的結構體對每個字段都有注釋,此示例只取dataoldtype三個字段:

// 將消息體解析成 gjson.Result
body := gjson.Parse(string(msg.Body))
// 從消息體中取 data
data := body.Get("data").Array()
// 從消息體中取 old
old := body.Get("old").Array()
// 從消息體中取 type
canalTypeStr := body.Get("type").String()
根據不同的操作以不同的方式更新數據

本示例中的僅包括非 DDL 操作,僅限於基本的增、刪、改,因為數據已同步到 es 中,所以 查 應該在 es 中進行。

switch canalType {
    case canal.DELETE:
    ...
    case canal.UPDATE:
    ...
    case canal.INSERT:
    ...
    default:
    log.Fatal("未知操作", canalType)
}

5 操作結果

設置環境變量(可選操作):

export HOST=192.168.31.129

運行示例,示例項目在core目錄中:

cd core
go run main.go

然后在數據庫中添加一篇文章:

INSERT INTO blog.blog_articles
(title, content, created_date)
VALUES('test9', '這是第 9 篇測試文章', '2020-01-01');

在終端中就能看見日志:

...
2021/05/08 15:05:41 已創建新的文檔: map[content:這是第 9 篇測試文章 created_date:2020-01-01 id:10 title:test9]
...

在 es 中查詢一下id=10的文檔:

curl -X GET "http://localhost:9200/canal_es/_doc/10?pretty"

查詢結果:

{
  "_index" : "canal_es",
  "_type" : "_doc",
  "_id" : "10",
  "_version" : 1,
  "_seq_no" : 14,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "content" : "這是第 9 篇測試文章",
    "created_date" : "2020-01-01",
    "id" : "10",
    "title" : "test9"
  }
}

在數據庫中更新一下這篇文章的創建日期:

UPDATE blog.blog_articles
SET created_date='2009-04-15'
WHERE id=10;

終端日志:

2021/05/08 15:15:08 文檔已存在,即將更新...
[200 OK] {"_index":"canal_es","_type":"_doc","_id":"10","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":15,"_primary_term":1}
2021/05/08 15:15:08 已更新文檔:id=10, new-data=map[created_date:2009-04-15]

再查詢一下這篇文章信息,結果為:

{
  "_index" : "canal_es",
  "_type" : "_doc",
  "_id" : "10",
  "_version" : 2,
  "_seq_no" : 15,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "content" : "這是第 9 篇測試文章",
    "created_date" : "2009-04-15",
    "id" : "10",
    "title" : "test9"
  }
}

可見,創建日期已經更新。

下面刪除這篇文章:

DELETE FROM blog.blog_articles
WHERE id=10;

終端日志:

2021/05/08 15:18:03 即將刪除文檔  10
2021/05/08 15:18:03 已刪除: {"id":"10","title":"test9","content":"這是第 9 篇測試文章","created_date":"2009-04-15"}

再查詢一下這篇文檔:

{
  "_index" : "canal_es",
  "_type" : "_doc",
  "_id" : "10",
  "found" : false
}

es 中也已刪除此文章。


示例結束。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM