實時數據同步服務(canal+kafka)是如何保證消息的順序性?


 可以訪問 這里  查看更多關於 消息中間件 的原創文章。 

上一篇 介紹了移山(數據遷移平台)實時數據同步的整體架構; 
本文主要介紹移山(數據遷移平台)實時數據同步是如何保證消息的順序性。

可以訪問 這里 查看更多關於大數據平台建設的原創文章。

一. 什么是消息的順序性?

  1. 消息生產端將消息發送給同一個MQ服務器的同一個分區,並且按順序發送;

  2. 消費消費端按照消息發送的順序進行消費。

二. 為什么要保證消息的順序性?

在某些業務功能場景下需要保證消息的發送和接收順序是一致的,否則會影響數據的使用。

需要保證消息有序的場景

移山的實時數據同步使用 canal 組件訂閱MySQL數據庫的日志,並將其投遞至 kafka 中(想了解移山實時同步服務架構設計的可以點 這里); 
kafka 消費端再根據具體的數據使用場景去處理數據(存入 HBase、MySQL 或直接做實時分析); 
由於binlog 本身是有序的,因此寫入到mq之后也需要保障順序。

  1. 假如現在移山創建了一個實時同步任務,然后訂閱了一個業務數據庫的訂單表;

  2. 上游業務,向訂單表里插入了一個訂單,然后對該訂單又做了一個更新操作,則 binlog 里會自動寫入插入操作和更新操作的數據,這些數據會被 canal server 投遞至 kafka broker 里面;

  3. 如果 kafka 消費端先消費到了更新日志,后消費到插入日志,則在往目標表里做操作時就會因為數據缺失導致發生異常。

三. 移山實時同步服務是怎么保證消息的順序性

實時同步服務消息處理整體流程如下:

我們主要通過以下兩個方面去保障保證消息的順序性。

1. 將需要保證順序的消息發送到同一個partition

1.1 kafka的同一個partition內的消息是有序的
  • kafka 的同一個 partition 用一個write ahead log組織, 是一個有序的隊列,所以可以保證FIFO的順序;

  • 因此生產者按照一定的順序發送消息,broker 就會按照這個順序把消息寫入 partition,消費者也會按照相同的順序去讀取消息;

  • kafka 的每一個 partition 不會同時被兩個消費者實例消費,由此可以保證消息消費的順序性。

1.2 控制同一key分發到同一partition

要保證同一個訂單的多次修改到達 kafka 里的順序不能亂,可以在Producer 往 kafka 插入數據時,控制同一個key (可以采用訂單主鍵key-hash算法來實現)發送到同一 partition,這樣就能保證同一筆訂單都會落到同一個 partition 內。

1.3 canal 需要做的配置

canal 目前支持的mq有kafka/rocketmq,本質上都是基於本地文件的方式來支持了分區級的順序消息的能力。我們只需在配置 instance 的時候開啟如下配置即可:

1> canal.properties

# leader節點會等待所有同步中的副本確認之后再確認這條記錄是否發送完成
canal.mq.acks = all

備注:

  • 這樣只要至少有一個同步副本存在,記錄就不會丟失。

2> instance.properties

1 # 散列模式的分區數
2 canal.mq.partitionsNum=2
3 # 散列規則定義 庫名.表名: 唯一主鍵,多個表之間用逗號分隔
4 canal.mq.partitionHash=test.lyf_canal_test:id

備注:

  • 同一條數據的增刪改操作 產生的 binlog 數據都會寫到同一個分區內;

  • 查看指定topic的指定分區的消息,可以使用如下命令:

    bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0

2. 通過日志時間戳和日志偏移量進行亂序處理

將同一個訂單數據通過指定key的方式發送到同一個 partition 可以解決大部分情況下的數據亂序問題。

2.1 特殊場景

對於一個有着先后順序的消息A、B,正常情況下應該是A先發送完成后再發送B。但是在異常情況下:

  • A發送失敗了,B發送成功,而A由於重試機制在B發送完成之后重試發送成功了;

  • 這時對於本身順序為AB的消息順序變成了BA。

移山的實時同步服務會在將訂閱到的數據存入HBase之前再加一層亂序處理 。

2.2 binlog里的兩個重要信息

使用 mysqlbinlog 查看 binlog:

/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001

執行時間和偏移量:

備注:

  1. 每條數據都會有執行時間和偏移量這兩個重要信息,下邊的校驗邏輯核心正是借助了這兩個值

  2. 執行的sql 語句在 binlog 中是以base64編碼格式存儲的,如果想查看sql 語句,需要加上:--base64-output=decode-rows -v 參數來解碼;

  3. 偏移量:

    • Position 就代表 binlog 寫到這個偏移量的地方,也就是寫了這么多字節,即當前 binlog 文件的大小;

    • 也就是說后寫入數據的 Position 肯定比先寫入數據的 Position 大,因此可以根據 Position 大小來判斷消息的順序。

3.消息亂序處理演示

3.1 在訂閱表里插入一條數據,然后再做兩次更新操作:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test');
Query OK, 1 row affected (0.00 sec)
 
MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
 
MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13;
Query OK, 1 row affected (0.00 sec)
3.2 產生三條需要保證順序的消息

插入,第一次更新,第二次更新這三次操作產生的 binlog 被 canal server 推送至 kafka 中的消息分別稱為:消息A,消息B,消息C

  • 消息A: 

  • 消息B: 

  • 消息C: 

3.3 網絡原因造成消息亂序

假設由於不可知的網絡原因:

  • kafka broker收到的三條消息分別為:消息A,消息C,消息B

  • 則 kafka 消費端消費到的這三條消息先后順序就是:消息A,消息C,消息B

  • 這樣就造成了消息的亂序,因此訂閱到的數據在存入目標表前必須得加亂序校驗處理

3.4 消息亂序處理邏輯

我們利用HBase的特性,將數據主鍵做為目標表的 rowkey。當 kafka 消費端消費到數據時,亂序處理主要流程(摘自禧雲數芯大數據平台技術白皮書)如下:

demo的三條消息處理流程如下: 
1> 判斷消息A 的主鍵id做為rowkey在hbase的目標表中不存在,則將消息A的數據直接插入HBase: 

2> 消息C 的主鍵id做為rowkey,已經在目標表中存在,則這時需要拿消息C 的執行時間和表中存儲的執行時間去判斷:

  • 如果消息C 中的執行時間小於表中存儲的執行時間,則證明消息C 是重復消息或亂序的消息,直接丟棄;

  • 消息C 中的執行時間大於表中存儲的執行時間,則直接更新表數據(本demo即符合該種場景): 

  • 消息C 中的執行時間等於表中存儲的執行時間,則這時需要拿消息C 的偏移量和表中存儲的偏移量去判斷:

    • 消息C 中的偏移量小於表中存儲的偏移量,則證明消息C 是重復消息,直接丟棄;

    • 消息C 中的偏移量大於等於表中存儲的偏移量,則直接更新表數據。

3> 消息B 的主鍵id做為rowkey,已經在目標表中存在,則這時需要拿消息B 的執行時間和表中存儲的執行時間去判斷:

  • 由於消息B中的執行時間小於表中存儲的執行時間(即消息C 的執行時間),因此消息B 直接丟棄。

3.5 主要代碼

kafka 消費端將消費到的消息進行格式化處理和組裝,並借助 HBase-client API 來完成對 HBase 表的操作。

1> 使用Put組裝單行數據

/**
* 包名: org.apache.hadoop.hbase.client.Put
* hbaseData 為從binlog訂閱到的數據,通過循環,為目標HBase表
* 添加rowkey、列簇、列數據。
* 作用:用來對單個行執行加入操作。
*/
Put put = new Put(Bytes.toBytes(hbaseData.get("id")));
// hbaseData 為從binlog訂閱到的數據,通過循環,為目標HBase表添加列簇和列
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));

 

2> 使用 checkAndMutate,更新HBase表的數據

只有服務端對應rowkey的列數據與預期的值符合期望條件(大於、小於、等於)時,才會將put操作提交至服務端。

// 如果 update_info(列族) execute_time(列) 不存在值就插入數據,如果存在則返回false
boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put);
 
// 如果存在,則去比較執行時間
if (!res1) {
// 如果本次傳遞的執行時間大於HBase中的執行時間,則插入put
boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),
Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put);
 
// 執行時間相等時,則去比較偏移量,本次傳遞的值大於HBase中的值則插入put
if (!res2) {
boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")),
Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put);
}
}

 

四.總結

  1. 目前移山的實時同步服務,kafka 消費端是使用一個線程去消費數據;

  2. 如果將來有版本升級需求,將消費端改為多個線程去消費數據時,要考慮到多線程消費時有序的消息會被打亂這種情況的解決辦法。

更多文章

歡迎訪問更多關於消息中間件的原創文章:

關注微信公眾號

歡迎大家關注我的微信公眾號閱讀更多關於 消息隊列 的原創文章:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM