常用的數據同步方案
Q:大家知道的數據庫同步方案或者工具有哪些?
數據庫遷移場景
以Mysql數據庫遷移為例,數據庫常用遷移方案有停機遷移和平滑遷移。
平滑遷移又分為雙寫和CDC(數據變更抓取)。
雙寫:即所有寫入操作同時寫入舊表和新表,這種方式可以完全控制應用代碼如何寫數據庫,聽上去簡單明了。但它會引入復雜的分布式一致性問題:要保證新舊庫中兩張表數據一致,雙寫操作就必須在一個分布式事務中完成,而分布式事務的代價太高了。
CDC:通過數據源的事務日志抓取數據源變更解決數據同步問題
數據同步場景
微服務開發環境下,為了提高搜索效率,以及搜索的精准度,會大量使用Redis、MongoBD等NoSQL數據庫,也會使用大量的Solr、Elasticsearch等全文檢索服務。那么,這個時候,就會有一個問題需要我們來思考和解決:那就是數據同步的問題!如何將實時變化的數據庫中的數據同步到Redis/MongoBD或者Solr/Elasticsearch中呢?

應用代碼中同步
在增加、修改、刪除之后,執行操作ES的邏輯代碼。例如下面的代碼片段。
public ResponseResult updateStatus(Long[] ids, String status){
try{
taskService.updateStatus(ids, status);
if("status_success".equals(status)){
List<Task> itemList = taskService.getTaskList(ids, status);
//數據寫入es
esClient.importList(itemList);
//數據寫入redis
// redisTemplate.save(itemList);
return new ResponseResult(true, "修改狀態成功")
}
}catch(Exception e){
return new ResponseResult(false, "修改狀態失敗");
}
}
優點:
實施起來比較簡單,簡單服務里面常用的方式。
缺點:
代碼耦合度高。
和業務邏輯同步執行,效率變低。
Q:這里有一個問題想和大家討論一下,對於一個方法里既有數據庫的操作又有同步調用http/rpc接口的方法,如何保證一致性?
比如下面這個場景:
一個售后工單的處理,首先需要經過【客訴系統】,然后需要轉到【工單系統】生成一個工單,方法邏輯大概如下:
@Transactional
public void handleKeSU(Integer orderId) {
//調用http接口插入工單
httpClient.saveGongDan(orderId);
//修改客訴單狀態為【已轉工單】
updateKeSuStatus(orderId);
}
因為流程問題,客訴單狀態修改和工單系統生成工單需要一致,即工單生成成功,則客訴單狀態修改成功,工單生成失敗,則客訴單修改失敗。
解決方案:將http調用放到本地數據庫修改后面,依據事物回滾。
這樣還有什么問題?當http調用響應時間超時,其實調用方工單已經生成成功,但是本地調用響應超時拋出異常導致回滾。
定時任務同步
在數據庫中執行完增加、修改、刪除操作后,通過定時任務定時的將數據庫的數據同步到ES索引庫中。
定時任務技術有:SpringTask,Quartz,XXLJOB。
這里執行定時任務時,需要注意的一個技巧是:第一次執行定時任務時,從MySQL數據庫中以時間字段進行倒序排列查詢相應的數據,並記錄當前查詢數據的時間字段的最大值,以后每次執行定時任務查詢數據的時候,只要按時間字段倒序查詢數據表中的時間字段大於上次記錄的時間值的數據,並且記錄本次任務查詢出的時間字段的最大值即可,從而不需要再次查詢數據表中的所有數據。
注意:這里所說的時間字段指的是標識數據更新的時間字段,也就是說,使用定時任務同步數據時,為了避免每次執行任務都會進行全表掃描,最好是在數據表中增加一個更新記錄的時間字段。
優點:
同步ES索引庫的操作與業務代碼完全解耦。
缺點:
數據的實時性並不高。
通過MQ實現同步
在數據庫中執行完增加、修改、刪除操作后,向MQ中發送一條消息,此時,同步程序作為MQ中的消費者,從消息隊列中獲取消息,然后執行同步Solr索引庫的邏輯。
我們可以使用下圖來簡單的標識通過MQ實現數據同步的過程。
我們可以使用如下代碼實現這個過程。
public ResponseResult updateStatus(Long[] ids, String status){
try{
goodsService.updateStatus(ids, status);
if("status_success".equals(status)){
List<TbItem> itemList = goodsService.getItemList(ids, status);
final String jsonString = JSON.toJSONString(itemList);
//發送消息
jmsTemplate.send(queueSolr, new MessageCreator(){
@Override
public Message createMessage(Session session) throws JMSException{
return session.createTextMessage(jsonString);
}
});
}
return new ResponseResult(true, "修改狀態成功");
}catch(Exception e){
return new ResponseResult(false, "修改狀態失敗");
}
}
優點:
業務代碼解耦,並且能夠做到准實時。目前tk的ES同步用的就是這中方式吧
缺點:
需要在業務代碼中加入發送消息到MQ的代碼,數據調用接口耦合。
通過CDC實現實時同步
通過CDC來解析數據庫的日志信息,來檢測數據庫中表結構和數據的變化,從而更新ES索引庫。
使用CDC可以做到業務代碼完全解耦,API完全解耦,可以做到准實時。
CDC(change data capture,數據變更抓取)
通過數據源的事務日志抓取數據源變更,這能解決一致性問題(只要下游能保證變更應用到新庫上)。它的問題在於各種數據源的變更抓取沒有統一的協議,如
MySQL 用 Binlog,PostgreSQL 用 Logical decoding 機制,MongoDB 里則是 oplog。
-
Canal,阿里開源的基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了mysql。
-
Databus,Linkedin 的分布式數據變更抓取系統。
它的 MySQL 變更抓取模塊很不成熟,官方支持的是 Oracle,MySQL 只是使用另一個開源組件 OpenReplicator 做了一個 demo。另一個不利因素 databus 使用了自己實現的一個 Relay 作為變更分發平台,相比於使用開源消息隊列的方案,這對維護和外部集成都不友好。 -
Mysql-Streamer,Yelp 的基於python的數據管道。
-
Debezium,Redhat 開源的數據變更抓取組件。
支持 MySQL、MongoDB、PostgreSQL 三種數據源的變更抓取。Snapshot Mode 可以將表中的現有數據全部導入 Kafka,並且全量數據與增量數據形式一致,可以統一處理,很適合數據庫遷移;
Canal
canal [kə'næl],譯意為水道/管道/溝渠,純Java開發,主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費,
目前canal只能支持row模式的增量訂閱(statement只有sql,沒有數據,所以無法獲取原始的變更日志)
基於日志增量訂閱&消費支持的業務
數據庫實時備份
多級索引 (賣家和買家各自分庫索引)
業務cache刷新
價格變化等重要業務消息
工作原理

- canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送 dump 協議
- MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
- canal 解析 binary log 對象(原始為 byte 流)
Mysql主備復制實現

從上層來看,復制分成三步:
- master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數據;
Canal架構

通過deployer模塊,啟動一個canal-server,一個cannal-server內部包含多個instance,每個instance都會偽裝成一個mysql實例的slave。client與server之間的通信協議由protocol模塊定義。client在訂閱binlog信息時,需要傳遞一個destination參數,server會根據這個destination確定由哪一個instance為其提供服務。

說明:
- server代表一個canal運行實例,對應於一個jvm
- instance對應於一個數據隊列(1個server對應1..n個instance)
instance模塊:
- eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
- eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)
- eventStore (數據存儲,目前只存在內存里)
- metaManager (增量訂閱&消費信息管理器)
Canal是怎么假裝成是Mysql Slave的?
- 與Mysql Master服務器建立Socket鏈接;
- 根據Mysql協議規范發送身份認證數據包進行身份認證;
- 根據Mysql協議規范發送slave注冊數據包將自己偽裝成Mysql Slave;
- 根據Mysql協議規范發送Dump請求,讓Master給自己推送Binlog日志;
Canal是怎么解析binlog的?
Mysql Binlog介紹:http://dev.mysql.com/doc/refman/5.5/en/binary-log.html
一個binlog包含一個四字節的模數和一系列描述數據變更的Event,每一個Event又包含header和data兩部分,大致結構如下:

基於Row模式的binlog主要包括以下幾個Event:
目前canal只能支持row模式的增量訂閱(statement只有sql,沒有數據,所以無法獲取原始的變更日志)
TABLE_MAP_EVENT:描述變更的數據庫表
WRITE_ROWS_EVENT:描述插入數據變更
UPDATE_ROWS_EVENT:描述修改數據變更
DELETE_ROWS_EVENT:描述刪除數據變更
根據Event的固定結構就可以解析出來相應的數據變更信息。
演示查看binlog:mysqlbinlog --no-defaults --base64-output=decode-rows -v ../data/binlog.000034 | more
Quick Start
准備
對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON . TO 'canal'@'%' ; FLUSH PRIVILEGES;
啟動
下載 canal, 訪問 release 頁面 , 選擇需要的包下載, 如以 1.1.4 版本為例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解壓縮
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
解壓完成后,進入 /tmp/canal 目錄,可以看到如下結構
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
配置修改
vi conf/example/instance.properties
##mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的數據庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的數據庫信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
- canal.instance.connectionCharset 代表數據庫的編碼方式對應到 java 中的編碼類型,比如 UTF-8,GBK , ISO-8859-1
- 如果系統是1個 cpu,需要將 canal.instance.parser.parallel 設置為 false
啟動
sh bin/startup.sh
查看 server 日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
修改表數據
關閉 sh bin/stop.sh
變更分發
抓取到數據變更之后,需要考慮怎么將這些變更分發出去。
再來回顧一下Canal基本架構。

從上圖可以看到Canal是Server-Client模式。
Server,主要是解析、分發、存儲binlog;
Client,通過ClientAPI你可以從Server獲取變更數據;
ClientAdapter,擴展Client的功能,包括將數據同步到RDB,ES,HBASE;
但其實這種Client模式並沒有達到真正的解耦,更關鍵的是目前只有Java語言的Client,為了解決這個問題,大家自然而然想到消息中間件。
事實上,Canal 1.1.1版本以后也是支持在Canal Server解析binlog以后直接將數據投遞到Kafka/RocketMQ。
於是就有了下面的同步方案:

總結
本文主要討論數據同步方案,並對canal做了簡單介紹。同時也對binlog的解析和mysql協議簡單介紹希望能了解這種CDC的基本原理。
