需要注意的是標題中的CAP不是指的CAP理論,而是園區大神楊曉東實現的框架,CAP框架基於本地消息表用最終一致性實現分布式事務。
本地消息表
首先我們考慮一個場景,在將用戶信息更改后,需要發送一條消息到消息隊列、緩存或是寫入到其他庫中。這個過程涉及到一個本地庫與MQ、本地庫與Cache或是本地庫與其他庫兩者之間的事務問題,不能用簡單的數據庫事務控制了。
這種分布式事務下,常用的解決方案有2PC、3PC等強一致性保證的,也有TCC、Sagas模型、本地消息表、內嵌本地消息表的MQ等最終一致性保證的。
而在很多異步場景下,允許系統存在短暫的不一致,只需達到最終一致,比起強一致性那種剛性事務,采用柔性事務,在很多場景下更有利於我們去實現。
執行過程
在使用CAP框架前,先熟悉下作為分布式事務解決方案之一的本地消息表工作過程。
- 消息發起方(如圖左側部分)和消息接收方(如圖右側部分),先額外建一套消息表,用來記錄及跟蹤消息內容及狀態。
- 當有請求到消息發起方時,處理完業務邏輯發布消息將業務數據和消息數據一同提交到本地表中,此時為本地事務。
- 本地事務沒有問題后,將消息發送到MQ傳遞給消息消費方。如果消息發送失敗,會進行重試發送。
- 消息消費方,接收並處理消息,完成自己的業務邏輯,此時為消息消費方本地事務,如果本地事務完成,則更改接收消息的狀態,更改本地,如果處理失敗,那么可再次重試執行。
- 最終,左側事務與右側事務達到最終一致。
CAP框架
CAP是一個在分布式系統中(SOA,MicroService)實現事件總線及最終一致性(分布式事務)的一個開源的 C# 庫,具有輕量級,高性能,易使用等特點。
- 具有 Event Bus 的所有功能,提供了更加簡化的方式來處理EventBus中的發布/訂閱。
- 具有消息持久化的功能,當服務進行重啟或者宕機時,可以保證消息的可靠性。
- 基於本地消息表實現了分布式事務中的最終一致性。
- 集成了可視化頁面方便觀察消息狀態。
- 提供了一系列Nuget包以選擇需要的工具接入。
- 第一個包DotNetCore.CAP為必須要安裝的。
- 可以依據消息隊列的不同選擇用RabbitMQ、Kafka或是AzureServiceBus等。
- 根據服務使用的數據庫情況選擇需要將本地消息表落庫,可以選擇SqlServer、MySql、PostgreSql、MongoDB等,或是直接使用內存存儲,方便快速實踐。
場景案例
依照EShopOnContainers中的一張圖來實現一個例子,用戶更新用戶信息,將更新的部分通過事件發送到消息隊列中,下游的購物車和訂單服務偵聽到消息,更改買家信息。
在此基礎上行,設計三個上下文,並分別集成CAP,借助RabbitMQ作為消息隊列,對於UserService、BasketService和OrderService,都直接使用了數據庫(當然可以不僅限於數據庫)。
服務建立
項目創建
開始建立幾個服務,新建空白解決方案,依次建立三個WebApi項目,並移除默認的控制器。
簡單設計下,在三個服務中創建三個DbContext,對應三個獨立的數據庫。
- UserService中創建UserInfo實體及UserDbContext
- BasketService中創建Basket實體及BasketDbContext
- OrderService中創建Order實體及OrderDbContext
安裝Nuget包
在三個服務中均安裝完如下選中的包,此次Demo中為方便快速實踐,選擇RabbitMQ作為消息隊列,MySql作為數據庫存儲。
對於EFCore及MySql包,安裝了如下幾個包
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Design
Microsoft.EntityFrameworkCore.Tools
Pomelo.EntityFrameworkCore.MySql
注意此處EFCore中MySql版本和CAP中MySql版本兩者間依賴的MySqlConnector不一致會有點問題
配置服務
需要對CAP進行設置,比如使用的是什么數據庫、什么消息隊列及配置下消息隊列參數,這一系列初始化設置在Startup.cs中配置好。
- ConfigureService中配置DbContext和CAP服務
- Configure中CAP的引入中間件
- 利用EFCore的遷移命令生成下數據庫遷移腳本,將DbContext內實體生成到數據庫中
- 單個服務啟動后,CAP組件會將內置表創建到數據庫中。
- 服務全部啟動后,RabbitMQ Client會自動注冊到RabbitMQ Server中同時創建好給定的Exchange(不給定則使用默認值),存在訂閱的服務則注冊隊列綁定到給定的Exchange下。
發布事件
在 UserService中UserController 中注入ICapPublisher,使用Patch接口更新一個Address,然后使用ICapPublisher發布一條消息。
- 更新本地User表內信息。
- 借助_capPublisher發布事件,先將事件信息記錄到本地MqPublish表。
- 前兩步都是針對本地表操作,一個事務保證,寫入MqPublish成功后再由CAP將記錄發送到RabbitMQ中。
訂閱事件
在BasketService和OrderService中完成事件的訂閱。各自新建了一個Handler來處理消息。在Handler中對處理的方法加上CapSubscribe特性,其中監聽的是發布事件時發送的事件名或消息名。
- BasketService收到RabbitMQ中的消息,CAP將消息寫入到MqReceive中。
- 調用相應的Handler處理事件。
- 更新Basket本地表,本地事務完成被提交。
- CAP組件將本地的MqReceive相關記錄更改狀態到完成,如本地事務提交失敗,則再次重試。
總結
拋棄強一致性想法借助最終一致性完成,將分布式事務拆分成多個本地事務進行處理。采用最終一致性來使得所有本地事務完成,即使部分出現失敗,也可重試,如重試機制無效最終借助人力完成。
在異步場景下,CAP及其方便了我們去處理分布式事務的過程。
當前RabbitMQ場景下,當某個服務做多個部署時,同一個隊列仍能保證一個消費者消費。這也避免了有些場景下,需要對資源加鎖來防止同時消費場景。
參考
2021-04-28,望技術有成后能回來看見自己的腳步