本文將介紹canal項目中client-adapter的使用,以及落地生產中需要考慮的可靠性、高可用與監控報警。(基於canal 1.1.4版本)
canal作為mysql的實時數據訂閱組件,實現了對mysql binlog數據的抓取。
雖然阿里也開源了一個純粹從mysql同步數據到mysql的項目otter(github.com/alibaba/otter,基於canal的),實現了mysql的單向同步、雙向同步等能力。但是我們經常有從mysql同步數據到es、hbase等存儲的需求,就需要用戶自己用canal-client獲取數據進行消費,比較麻煩。
從1.1.1版本開始,canal實現了一個配套的落地模塊,實現對canal訂閱的消息進行消費,就是client-adapter(github.com/alibaba/canal/wiki/ClientAdapter)。
目前的最新穩定版1.1.4版本中,client-adapter已經實現了同步數據到RDS、ES、HBase的能力。
1. Client-Adapter基本能力
目前Adapter具備以下基本能力:
- 對接上游消息,包括kafka、rocketmq、canal-server
- 實現mysql數據的增量同步
- 實現mysql數據的全量同步
- 下游寫入支持mysql、es、hbase
2.Client-Adapter架構
Adapter本質上是為了將canal-server訂閱到的實時增量數據進行消費,所以必須有上游canal-server產生數據。
整體架構如下:

3. 遷移與同步配置(以Mysql為例)
官方文檔地址:github.com/alibaba/canal/wiki/Sync-RDB
下面給出實踐過程中的注意事項。
3.1 參數配置
1)總配置文件application.yml

說明:
- 一份數據可以被多個group同時消費, 多個group之間會是一個並行執行, 一個group內部是一個串行執行多個outerAdapters, 比如例子中logger和hbase
- 目前client adapter數據訂閱的方式支持兩種,直連canal server 或者 訂閱kafka/RocketMQ的消息
- zookeeperHosts填了以后,可以支持分布式鎖;如果對接Canal-Server為集群模式,那么還是需要填寫的,具體原因見下面高可用部分。
2)對應任務的Adapter配置
同步到mysql去的任務配置在conf/rdb路徑下,本文使用的任務配置文件名叫 mysql1.yml

注意!targetPk下面填的是源主鍵和目標主鍵的映射關系, srcPk:targetPk。
3)日志格式修改
logback.xml中默認日志等級為debug,線上使用時,記得改到info,否則日志會打爆
3.2 增量同步能力
1) DML 增量同步
完成上面的配置,啟動后就能正常訂閱增量數據了。Adapter能夠接收到mq到信息,並在目標庫投遞成功。
具體會打出如下日志。

2)DDL同步
如果需要使用DDL同步能力,必須在rdb中配置mirroDb為true才可以。

3.3 全量同步能力
Adapter提供了全量同步的能力,具體操作可以參考官網 github.com/alibaba/canal/wiki/ClientAdapter中的3.2節。
這里我們使用命令
curl http://127.0.0.1:8081/etl/rdb/mysql1/mysql1.yml -X POST
輸出結果如下

4. 動態配置
4.1 任務開關
curl http://127.0.0.1:8081/syncSwitch/dts-dbvtest-insertdata/on -X PUT
如果在application.yml里面配置了zk地址,那么會使用分布式開關,這個任務開關會注冊到zk上,對任意機器執行開關,會把所有同樣任務的機器進行啟停。
相關源碼實現如下:
- 獲取zk上的任務開關狀態信息
- 如果是false,就斷開連接

4.2 配置變更
1)本地配置文件
adapter默認是讀取本地的配置文件進行配置的。
有個比較意外的地方,就是修改配置文件,任務會自動刷新配置,實現了動態配置。
我們看下實現原理。
- 繼承了FileAlterationListenerAdaptor
- 發現文件變更后
- 銷毀目前的canalAdapterService
- 刷新contextRefresher
- sleep 2秒
- 重新初始化canalAdapterService

最終日志會打印

2)基於mysql的遠程配置
如果配置了多個adapter,可以采用mysql存儲配置信息,實現全局統一的配置。
這個的實現原理也比較簡單:
- 本地異步線程輪訓mysql
- 如果有更新就將更新的配置寫入本地配置文件
- 動態更新
5. 數據可靠性分析
5.1 ack機制
Adapter的一個任務采用一個多線程模型。
- 主線程抓取mq的message寫入隊列queue,CountDownLatch等待
- 異步線程poll隊列queue,投遞下游
- 投遞成功后,主線程釋放latch,向mq返回ack
這里需要注意,這里同步的行為是重新執行一次,比如update一行數據,如果目標庫由於某種原因沒有這條數據的主鍵id,導致update返回0,也是認為消費成功了
5.2 重試機制
application.yml里面的retries參數用於從queue隊列poll后,投遞下游的重試。
需要仔細權衡一下,重試間隔0.5s,可以設置個x次,避免網絡抖動丟失數據。
重試次數到了,會自動ack。所以這里在使用過程中需要注意采集失敗的日志,及時報警提醒。
6. 性能問題分析
具體性能要求還是需要通過壓測來得到結論。
這里給出兩個從源碼中看到的性能優化相關的點。
6.1 全量同步多線程
全量同步的時候,同步效率是一個值得考慮的問題。
adapter對全量數據同步效率做了一些設計,當全量同步數量大於1W會開多線程,代碼如下所示:

但是這里有個mysql的深分頁的問題,可以注意一下,會對源數據庫造成比較大的性能壓力。
6.2 全量同步select *
全量同步的另一個效率問題,就在於select * ,避免客戶端內存被打爆。
看了下源碼,果然也已經考慮了這個問題,開啟了JDBC的流式查詢。

7. 監控告警
如果要在生產使用,少不了監控告警的輔助。
雖然Adapter不像canal-server那樣提供了監控指標的相關api,但是我們還是可以做一些輔助的監控告警。
1) mq的消息堆積告警
利用mq已有的topic下的堆積告警,如果Adapter出現故障,造成了mq的消息堆積,可以及時發現。
2) 日志異常告警
Adapter有自己的日志格式,可以跟已有監控系統確認下日志收集的配置方式與日志解析格式。
然后通過修改 conf/logback.xml的pattern來修改日志的打印格式,進行配置采集。
8. 高可用
通過源碼閱讀發現
tcp模式支持通過zk做HA(非自身高可用),mq模式不支持zk做HA

TCP模式需要HA跟我們的HA理解又不太一樣。
因為需要直接對接上游的Canal-Server,而Canal-Server的HA會導致ip變化,所以adapter的tcp模式的HA是為了支持這個,可以監聽IP變化,對接不同的上游server,並不是自身的高可用架構。
而MQ模式本身是不支持HA的。
但是,我們如果我們對接上游MQ的模式,就可以做一個取巧的高可用。
目前從binlog抓取mq以后,只會投遞到指定topic的一個隊列中(即使哈希做了多隊列,道理也一樣),因此,mq消費時采用集群模式,就會只有一個client能夠順序消費對應隊列中的消息。
這樣,我們部署兩台adapter,有兩個mq的消費者同時運行,正常情況下,只會有一台機器在消費任務,一旦一台機器掛了,mq會自動用另一台機器的任務進行繼續消費。做了一個簡易的高可用。

缺點也比較明顯,任務無法負載均衡,只能跑在一台機器上
因此,需要考慮分多個消費者組進行任務處理。
推薦閱讀:
都看到最后了,原創不易,點個關注,點個贊吧~
文章持續更新,可以微信搜索「阿丸筆記 」第一時間閱讀,回復關鍵字【學習】有我准備的一線大廠面試資料。
知識碎片重新梳理,構建Java知識圖譜: github.com/saigu/JavaK…(歷史文章查閱非常方便)