Tip: 此篇已加入.NET Core微服務基礎系列文章索引
一、CAP簡介
下面的文字來自CAP的Wiki文檔:https://github.com/dotnetcore/CAP/wiki
CAP 是一個在分布式系統中(SOA,MicroService)實現事件總線及最終一致性(分布式事務)的一個開源的 C# 庫,她具有輕量級,高性能,易使用等特點。我們可以輕松的在基於 .NET Core 技術的分布式系統中引入CAP,包括但限於 ASP.NET Core 和 ASP.NET Core on .NET Framework。
CAP 的應用場景主要有以下兩個:
- 分布式事務中的最終一致性(異步確保)的方案
- 具有高可用性的 EventBus
CAP 同時支持使用 RabbitMQ 或 Kafka 進行底層之間的消息發送,我們不需要具備 RabbitMQ 或者 Kafka 的使用經驗,仍然可以輕松的將CAP集成到項目中。
CAP 目前支持使用 Sql Server,MySql,PostgreSql 數據庫的項目;
CAP 同時支持使用 EntityFrameworkCore 和 Dapper 的項目,可以根據需要選擇不同的配置方式;
CAP的作者為園友savorboard(楊曉東),成都地區的.NET社區領導者,棒棒噠!
二、案例結構
此次試驗仍然和上一篇基於MassTransit的案例一樣(其實是我懶得再改,直接拿來復用),共有四個MicroService應用程序,當用戶下訂單時會通過CAP作為事件總線發布消息,作為訂閱者的庫存和配送服務會接收到消息並消費消息。此次試驗會采用RabbitMQ作為消息隊列,采用MSSQL作為關系型數據庫(同時CAP也是支持MSSQL的)。
准備工作:為所有服務通過NuGet安裝CAP及其相關包
PM> Install-Package DotNetCore.CAP
下面是RabbitMQ的支持包
PM> Install-Package DotNetCore.CAP.RabbitMQ
下面是MSSQL的支持包
PM> Install-Package DotNetCore.CAP.SqlServer
三、具體實現
3.1 OrderService
(1)啟動配置:這里主要需要給CAP指定數據庫(它會在這個數據庫中創建本地消息表Published和Received)以及使用到的消息隊列(這里是RabbitMQ)
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); // Repository services.AddScoped<IOrderRepository, OrderRepository>(); // EF DbContext services.AddDbContext<OrderDbContext>(); // Dapper-ConnString services.AddSingleton(Configuration["DB:OrderDB"]); // CAP services.AddCap(x => { x.UseEntityFramework<OrderDbContext>(); // EF x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server x.UseRabbitMQ(cfg => { cfg.HostName = Configuration["MQ:Host"]; cfg.VirtualHost = Configuration["MQ:VirtualHost"]; cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]); cfg.UserName = Configuration["MQ:UserName"]; cfg.Password = Configuration["MQ:Password"]; }); // RabbitMQ // Below settings is just for demo x.FailedRetryCount = 2; x.FailedRetryInterval = 5; }); ...... } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... app.UseMvc(); // CAP app.UseCap(); ...... }
(2)Controller:這里會調用Repository去實現業務邏輯和發送消息
[Route("api/Order")] public class OrderController : Controller { public IOrderRepository OrderRepository { get; } public OrderController(IOrderRepository OrderRepository) { this.OrderRepository = OrderRepository; } [HttpPost] public string Post([FromBody]OrderDTO orderDTO) { var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult(); return result ? "Post Order Success" : "Post Order Failed"; } }
(3)Repository:這里實現了兩種方式:EF和Dapper(基於ADO.NET),其中EF方式中不需要傳transaction(當CAP檢測到 Publish 是在EF事務區域內的時候,將使用當前的事務上下文進行消息的存儲),而基於ADO.NET方式中需要傳transaction(由於不能獲取到事務上下文,所以需要用戶手動的傳遞事務上下文到CAP中)。
public class OrderRepository : IOrderRepository { public OrderDbContext DbContext { get; } public ICapPublisher CapPublisher { get; } public string ConnStr { get; } // For Dapper use public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr) { this.DbContext = DbContext; this.CapPublisher = CapPublisher; this.ConnStr = ConnStr; } public async Task<bool> CreateOrderByEF(IOrder order) { using (var trans = DbContext.Database.BeginTransaction()) { var orderEntity = new Order() { ID = GenerateOrderID(), OrderUserID = order.OrderUserID, OrderTime = order.OrderTime, OrderItems = null, ProductID = order.ProductID // For demo use }; DbContext.Orders.Add(orderEntity); await DbContext.SaveChangesAsync(); // When using EF, no need to pass transaction var orderMessage = new OrderMessage() { ID = orderEntity.ID, OrderUserID = orderEntity.OrderUserID, OrderTime = orderEntity.OrderTime, OrderItems = null, ProductID = orderEntity.ProductID // For demo use }; await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage); trans.Commit(); } return true; } public async Task<bool> CreateOrderByDapper(IOrder order) { using (var conn = new SqlConnection(ConnStr)) { conn.Open(); using (var trans = conn.BeginTransaction()) { // business code here string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID) VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)"; order.ID = GenerateOrderID(); await conn.ExecuteAsync(sqlCommand, param: new { OrderID = order.ID, OrderTime = DateTime.Now, OrderUserID = order.OrderUserID, ProductID = order.ProductID }, transaction: trans); // For Dapper/ADO.NET, need to pass transaction var orderMessage = new OrderMessage() { ID = order.ID, OrderUserID = order.OrderUserID, OrderTime = order.OrderTime, OrderItems = null, MessageTime = DateTime.Now, ProductID = order.ProductID // For demo use }; await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans); trans.Commit(); } } return true; } private string GenerateOrderID() { // TODO: Some business logic to generate Order ID return Guid.NewGuid().ToString(); } private string GenerateEventID() { // TODO: Some business logic to generate Order ID return Guid.NewGuid().ToString(); } }
這里摘抄一段CAP wiki中關於事務的一段介紹:
事務在 CAP 具有重要作用,它是保證消息可靠性的一個基石。 在發送一條消息到消息隊列的過程中,如果不使用事務,我們是沒有辦法保證我們的業務代碼在執行成功后消息已經成功的發送到了消息隊列,或者是消息成功的發送到了消息隊列,但是業務代碼確執行失敗。
這里的失敗原因可能是多種多樣的,比如連接異常,網絡故障等等。
只有業務代碼和CAP的Publish代碼必須在同一個事務中,才能夠保證業務代碼和消息代碼同時成功或者失敗。
換句話說,CAP會確保我們這段邏輯中業務代碼和消息代碼都成功了,才會真正讓事務commit。
3.2 StorageService
(1)啟動配置:這里主要是指定Subscriber
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); // EF DbContext services.AddDbContext<StorageDbContext>(); // Dapper-ConnString services.AddSingleton(Configuration["DB:StorageDB"]); // Subscriber services.AddTransient<IOrderSubscriberService, OrderSubscriberService>(); // CAP services.AddCap(x => { x.UseEntityFramework<StorageDbContext>(); // EF x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server x.UseRabbitMQ(cfg => { cfg.HostName = Configuration["MQ:Host"]; cfg.VirtualHost = Configuration["MQ:VirtualHost"]; cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]); cfg.UserName = Configuration["MQ:UserName"]; cfg.Password = Configuration["MQ:Password"]; }); // RabbitMQ // Below settings is just for demo x.FailedRetryCount = 2; x.FailedRetryInterval = 5; }); ...... } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... app.UseMvc(); // CAP app.UseCap(); ...... }
(2)實現Subscriber
首先定義一個接口,建議放到公共類庫中
public interface IOrderSubscriberService { Task ConsumeOrderMessage(OrderMessage message); }
然后實現這個接口,記得讓其實現ICapSubscribe接口,然后我們就可以使用 CapSubscribeAttribute
來訂閱 CAP 發布出來的消息。
public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe { private readonly string _connStr; public OrderSubscriberService(string connStr) { _connStr = connStr; } [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)] public async Task ConsumeOrderMessage(OrderMessage message) { await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}"); await UpdateStorageNumberAsync(message); } private async Task<bool> UpdateStorageNumberAsync(OrderMessage order) { //throw new Exception("test"); // just for demo use using (var conn = new SqlConnection(_connStr)) { string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1 WHERE StorageID = @ProductID"; int count = await conn.ExecuteAsync(sqlCommand, param: new { ProductID = order.ProductID }); return count > 0; } } }
*.CAP約定消息端在方法實現的過程中需要實現冪等性,所謂冪等性就是指用戶對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了副作用。這里我沒有考慮,實際中需要首先進行驗證,避免二次更新。
3.3 DeliveryService
(1)啟動配置:與StorageService高度類似,只是使用的不是同一個數據庫
(2)實現Subscriber
public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe { private readonly string _connStr; public OrderSubscriberService(string connStr) { _connStr = connStr; } [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)] public async Task ConsumeOrderMessage(OrderMessage message) { await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}"); await AddDeliveryRecordAsync(message); } private async Task<bool> AddDeliveryRecordAsync(OrderMessage order) { //throw new Exception("test"); // just for demo use using (var conn = new SqlConnection(_connStr)) { string sqlCommand = @"INSERT INTO [dbo].[Deliveries] (DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime) VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)"; int count = await conn.ExecuteAsync(sqlCommand, param: new { DeliveryID = Guid.NewGuid().ToString(), OrderID = order.ID, OrderUserID = order.OrderUserID, ProductID = order.ProductID, CreatedTime = DateTime.Now }); return count > 0; } } }
3.4 快速測試
(1)啟動3個微服務,Check 數據庫表狀態
首先會看到在各個數據庫中均創建了本地消息表,這兩個表的含義如下:
Cap.Published:這個表主要是用來存儲 CAP 發送到MQ(Message Queue)的客戶端消息,也就是說你使用 ICapPublisher
接口 Publish 的消息內容。
Cap.Received:這個表主要是用來存儲 CAP 接收到 MQ(Message Queue) 的客戶端訂閱的消息,也就是使用 CapSubscribe[]
訂閱的那些消息。
然后看看各個表的數據,目前只有庫存表有數據,因為我們要做的只是更新。
(2)通過Postman發一個Post請求
(3)Check控制台輸出的日志信息
(4)Check數據庫中的業務表和消息表數據:可以看到發送者和接收者都執行成功了,如果其中任何一個參與者發生了異常或者連接不上,CAP會有默認的重試機制(默認是50次最大重試次數,每次重試間隔60s),當失敗總次數達到默認失敗總次數后,就不會進行重試了,我們可以在 Dashboard 中查看消息失敗的原因,然后進行人工重試處理。
另外,由於CAP會在數據庫中創建消息表,因此難免會考慮到其性能。CAP提供了一個數據清理的機制,默認情況下會每隔一個小時將消息表的數據進行清理刪除,避免數據量過多導致性能的降低。清理規則為 ExpiresAt (字段名)不為空並且小於當前時間的數據。
四、小結
本篇首先簡單介紹了一下CAP這個開源項目,然后基於上一篇中的下訂單的小案例來進行了基於CAP的改造,並通過一個實例的運行來看到了結果。當然,這個實例並不完美,很多點都沒有考慮(比如消息端消費時的冪等性)和失敗重試的場景實踐等等等等。由於時間和精力的關系,目前只使用到這兒,以后有機會能夠應用上會研究下CAP的源碼,最后感謝楊曉東為.NET社區帶來了一個優秀的開源項目!
示例代碼
Click Here => 點我點我
參考資料
CAP - GitHub : https://github.com/dotnetcore/CAP
CAP - Wiki : https://github.com/dotnetcore/CAP/wiki
楊曉東,《BASE:一種ACID的替代方案》