DotNetCore.CAP是一個在分布式系統中(SOA,MicroService)實現事件總線及最終一致性(分布式事務)的一個開源的 C# 庫,具有輕量級,高性能,易使用等特點。開源地址
Cap(Consistency(一致性)、Availability(可用性)、Partition tolerance(分區容錯性))是分布式系統中的一個重要理念,根據CAP定理,存在網絡分區(微服務即時網絡分區架構)時,Web應用不可能同時滿足可用性和一致性,DotNetCore.CAP使用“異步確保”方案,利用消息隊列和本地消息列表實現最終數據一致性。異步確保模式是補償模式的一個典型案例,通過異步的方式進行處理,處理后把結果通過通知系統通知給使用方。
一,准備內容:
運輸器:過運輸將數據從一個地方移動到另一個地方-在采集程序和管道之間,管道與實體數據庫之間,甚至在管道與外部系統之間。DotNetCore.CAP 支持以下幾種運輸方式:RabbitMQ,Kafka,Azure Service bus,Amazon SQS,In-memory queue。我使用的是RabbitMq。
二,DotnetCore webapi項目集成DotNetCore.CAP
public void ConfigureServices(IServiceCollection services) { //獲取數據庫連接字符串 var connection = Configuration.GetConnectionString("MySql"); //Cap services.AddCap(conf => { //配置數據庫上下文 conf.UseEntityFramework<Entity.MicoDatacontext>(); //配置數據庫連接 conf.UseMySql(connection); //使用RabbitMQ運輸器 conf.UseRabbitMQ(rab => { rab.HostName = "192.168.137.2"; rab.Password = "xxxxxx"; rab.Port = 5672; rab.UserName = "xxxx"; }); }); services.AddDbContext<Entity.MicoDatacontext>(options => { options.UseMySql(connection); }); services.AddControllers(); }
三,在Contoller中使用CAP發送消息
比如下面是一個模擬創建訂單的接口,創建完訂單后發送一個主題為“order.create”的消息到消息總線。這個接口開啟了一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內,確保業務完成的同時消息能發送出去。
readonly Entity.MicoDatacontext dbcontext; readonly ICapPublisher capPublisher; public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) { this.dbcontext = dbcontext; this.capPublisher = capPublisher; } [HttpGet("CreateOrder")] public async Task<bool> CreateOrderAsync(string name,string goodsSid) { Order order = new Order() { Sid = Guid.NewGuid().ToString(), CreateTime = DateTime.Now, GoodsSid = goodsSid, Name = name, Status = 0, UpdateTime = DateTime.Now }; //開啟一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內。 using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false)) { dbcontext.Order.Add(order); await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name }); dbcontext.SaveChanges(); tran.Commit(); return true; }; }
四,在Contoller中使用CAP接收消息
下面是模擬倉庫服務在接收到訂單創建消息后減庫存的操作。如果倉庫服務沒有成功消費這條消息,DotnetCore.CAP將會啟用重發機制。
[Route("api/[controller]")] [ApiController] public class TestController : ControllerBase { readonly Entity.MicoDatacontext dbcontext; readonly ICapPublisher capPublisher; public TestController(ICapPublisher capPublisher, Entity.MicoDatacontext dbcontext) { this.dbcontext = dbcontext; this.capPublisher = capPublisher; } [NonAction] [CapSubscribe("order.create")] public Task OrderCreate(Common.Publish.OrderCreate order) { var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault(); if (storeage != null) throw new Exception("庫存不足"); storeage.Count -= 1; storeage.UpdateTime = DateTime.Now; dbcontext.Storeage.Update(storeage); dbcontext.SaveChanges(); return Task.CompletedTask; } }
五,消息重試配置
1、 發送重試
在消息發送過程中,當出現 Broker 宕機或者連接失敗的情況亦或者出現異常的情況下,這個時候 CAP 會對發送的重試,第一次重試次數為 3,4分鍾后以后每分鍾重試一次,進行次數 +1,當總次數達到50次后,CAP將不對其進行重試。
你可以在 CapOptions 中設置FailedRetryCount來調整默認重試的總次數。
當失敗總次數達到默認失敗總次數后,就不會進行重試了,你可以在 Dashboard 中查看消息失敗的原因,然后進行人工重試處理。
2、 消費重試
當 Consumer 接收到消息時,會執行消費者方法,在執行消費者方法出現異常時,會進行重試。這個重試策略和上面的 發送重試 是相同的
//獲取數據庫連接字符串 var connection = Configuration.GetConnectionString("MySql"); //Cap services.AddCap(conf => { //配置數據庫上下文 conf.UseEntityFramework<Entity.MicoDatacontext>(); //配置數據庫連接 conf.UseMySql(connection); //使用RabbitMQ運輸器 conf.UseRabbitMQ(rab => { rab.HostName = "192.168.137.2"; rab.Password = "xxxx114"; rab.Port = 5672; rab.UserName = "xxxx"; }); //消息重試的最大次數 conf.FailedRetryCount = 50; //消息重試間隔時間,4min后該值設置生效(默認快速重試3次) conf.FailedRetryInterval = 60; //發送成功的消息的過期時間(過期則刪除) conf.SucceedMessageExpiredAfter = 24 * 3600; //發送消息失敗后的回調 conf.FailedThresholdCallback = (context) => { //通知管理人員或其它邏輯 };
六,事務補償
某些情況下,消費者需要返回值以告訴發布者執行結果,以便於發布者實施一些動作,通常情況下這屬於補償范圍。可以在消費者執行的代碼中通過重新發布一個新消息來通知上游,CAP 提供了一種簡單的方式來做到這一點。 你可以在發送的時候指定 callbackName
來得到消費者的執行結果。
比如上面的示例,倉庫消費后需要告訴訂單服務處理結果。
訂單服務:處理創建訂單業務后發送一條帶有補償回調的消息並通過CapSubscribe接收該回調消息,處理訂單狀態
[Route("api/[controller]")] [ApiController] public class TestController : ControllerBase { readonly Entity.MicoDatacontext dbcontext; readonly ICapPublisher capPublisher; public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) { this.dbcontext = dbcontext; this.capPublisher = capPublisher; } [HttpGet("CreateOrder")] public async Task<bool> CreateOrderAsync(string name,string goodsSid) { Order order = new Order() { Sid = Guid.NewGuid().ToString(), CreateTime = DateTime.Now, GoodsSid = goodsSid, Name = name, Status = 0, UpdateTime = DateTime.Now }; //開啟一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內。 using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false)) { dbcontext.Order.Add(order); //發送一個帶有補償回調的消息 await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name },"Soreage.reduced"); dbcontext.SaveChanges(); tran.Commit(); return true; }; } /// <summary> /// 補償消息處理 /// </summary> /// <param name="order"></param> /// <returns></returns> [NonAction] [CapSubscribe("Soreage.reduced")] public Task OrderCreate(Common.Publish.StoreageReduced msg) { var order = this.dbcontext.Order.Find(msg.OrderSid); if (order != null) { order.Status = 1; dbcontext.SaveChanges(); } return Task.CompletedTask; } }
倉庫服務:接收order.create主題消息並返回正確的返回值
[NonAction] [CapSubscribe("order.create")] public Common.Publish.StoreageReduced OrderCreate(Common.Publish.OrderCreate order) { var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault(); if (storeage != null) throw new Exception("庫存不足"); storeage.Count -= 1; storeage.UpdateTime = DateTime.Now; dbcontext.Storeage.Update(storeage); dbcontext.SaveChanges(); return new Common.Publish.StoreageReduced() {OrderSid=order.OrderSid,IsSuccess=true }; }
七,並發沖突處理
使用EF的RowVersion做樂觀鎖解決並發沖突的問題。
Do實體添加Timestamp列
[Timestamp] public byte[] Timespan { get; set; }
DbContext類可以做如下配置
protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<Model.Order>().Property(r => r.Timespan).IsRowVersion(); base.OnModelCreating(modelBuilder); }