Canal 源碼走讀


前言

canal 是什么? 引用一下官方回答:

阿里巴巴mysql數據庫binlog的增量訂閱&消費組件

canal 能做什么?

基於日志增量訂閱&消費支持的業務:

  1. 數據庫鏡像
  2. 數據庫實時備份
  3. 多級索引 (賣家和買家各自分庫索引)
  4. search build
  5. 業務cache刷新
  6. 價格變化等重要業務消息

比如 LZ 目前就使用 canal 實現數據實時復制,搜索引擎數據構建等功能。既然要使用,就好好的研究一下。

時間有限,一起來簡單看看。

軟件架構

關於 canla 的工作原理,我就不展開了,有興趣的可以看看官方文檔,或者這個 ppt : https://docs.google.com/presentation/d/1MkszUPYRDkfVPz9IqOT1LLT5d9tuwde_WC8GZvjaDRg/edit#slide=id.p16

說白了, canal 就是偽裝成 mysql 的 slave,dump binlog,解析 binlog,然后傳遞給應用程序,總體還是蠻簡單的。

好,我們來看看 canal 的代碼架構。

image

我們看到,canal server 內部由幾個模塊組成, 最外部的是 Server,該 Server 接收 Canal Client 請求,並返回 Client 數據。一個 Server 就是一個 JVM。每個 Server 內部由多個 CanalInstance,每個 CanalInstance 其實就是我們設置的 destination,通常是一個數據庫。

每個 CanalInstance 內部由 5 個模塊,分別是 parser 解析,sink 過濾,store 存儲,metaManager 元數據管理,Alarm 報警。

這 5 個模塊是干嘛的呢?

簡單說一下:

當 Canal Server 啟動后,會根據配置啟動 N 個 CanalInstance, 每個 CanalInstance 都會使用 socket 連接 mysql,dump binlog,然后將數據交給 parser 解析,sink 過濾,store 存儲,當 client 連接時,會從 zk 上讀取該 client 的信息,而 metaManager 元數據管理就是管理 zk(當然有多種實現,比如存儲在文件中) 信息的,如果發生錯誤了,就調用 Alarm 發送報警信息(你可以接入自己公司的監控系統),目前是打印日志。

Canal 啟動流程

canal 代碼量目前有 6 萬多行,去除 2 個 ProtocolBuffer 生成類大概 1.7 萬行,也還有 4.3 萬行,代碼還是不少的。

啟動過程也比較繞。這里我簡單畫了一個流程圖:

image

解釋一下這個圖:

canal 腳本從 CanalLauncher main 方法啟動,然后調用 CanalController 的 start 方法,CanalController 調用 InstanceConfigMonitor 的 start 方法,最后調用 canal 關鍵組件 CanalServerWithEmbedded 的 start 方法。

在 Canal 內部, 有 CanalServerWithEmbedded 和 CanalServerWithNetty,前者是沒有 Server 端口的,是一個無端口的代理。后者是基於 Netty 實現的服務器,在 channelRead 方法中,會調用 CanalServerWithEmbedded 的相關方法。

CanalServerWithEmbedded 是單例的, 內部會有多個 CanalInstance, 他有多個實現,獨立版本中使用的是 CanalInstanceWithSpring 版本,基於 Spring 管理組件的生命周期。

每個 CanalInstance 內部有 5 個組件,也就是上面說的幾個組件,他們會分別啟動。

其中,比較關鍵的是 parser,sink,store。

CanalEventParser 啟動后,會啟動一個叫做 parseThread 線程,不停的循環。主要是:構造與 mysql 的連接,然后啟動心跳線程,然后開始 dump binlog。

dump 出來的 binlog 通過 disruptor 無鎖隊列發布,內部由 3 個消費者按照順序消費 binlog,處理完之后,交給了 sink 模塊。

然后是 sink,這個比較簡單,就不說了。sink 處理完之后,交給了 store 模塊。

store 模式是一個類似 RingBuffer 的循環數組,存儲着從 mysql dump 出來的數據,client 也是從這里獲取數據的。該數組維護着 3 個指針,get,put, ack。

這里比較奇怪的是,為什么不使用責任鏈模式夠組裝組件?

Canal 數據流向

看了啟動流程,再來看看 canal 內部運行的數據流向是什么樣子的。我這里簡單畫了一個圖。

image

獨立版本的 Canal 使用 Netty 暴露端口,使用自己構造的 SessionHandler 處理 TCP 請求,SessionHandler 將請求交給 CanalServerWithEmbedded 來處理。

我們看 CanalServerWithEmbedded 的一些方法,例如 subscribe,get,ack 等,都是和 client 對應的方法,也就是說,CanalServerWithEmbedded 是和 client 打交道的一個類。

CanalServerWithEmbedded 內部管理所有的 CanalInstance,通過 Client 的信息,找到 Client 訂閱的 CanalInstance,然后調用 CanalInstance 內部的 Store 模塊,也就是那個 RingBuffer 的 get 方法,獲取 RingBuffer 的數據。

從 Myslq 的角度看,MysqlConnection 從 Myslq dump 數據,交給 parser 解析,parser 解析完,交給 sink,sink 處理完,交給 store 保存,等待 client 前來獲取。

看完了數據流向,如果對哪里有什么疑問,就可以看看哪個模塊對應的代碼是什么,直接看是看就好了。

總結

花了點時間看了看 Canal 的代碼,總體上還是非常好的,只是有些地方有點疑問,例如 parser,sink,store 為什么不使用過濾器模式。

Client 和 CanalServerWithEmbedded 為什么不使用 RPC 的方式交互,這樣更簡單明了。

代碼里回調方法太多太長,影響閱讀。

但總體瑕不掩瑜,值得一讀。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM