DotNetCore.CAP是一個在分布式系統中(SOA,MicroService)實現事件總線及最終一致性(分布式事務)的一個開源的 C# 庫,具有輕量級,高性能,易使用等特點。開源地址
Cap(Consistency(一致性)、Availability(可用性)、Partition tolerance(分區容錯性))是分布式系統中的一個重要理念,根據CAP定理,存在網絡分區(微服務即時網絡分區架構)時,Web應用不可能同時滿足可用性和一致性,DotNetCore.CAP使用“異步確保”方案,利用消息隊列和本地消息列表實現最終數據一致性。異步確保模式是補償模式的一個典型案例,通過異步的方式進行處理,處理后把結果通過通知系統通知給使用方。
一,准備內容:
運輸器:過運輸將數據從一個地方移動到另一個地方-在采集程序和管道之間,管道與實體數據庫之間,甚至在管道與外部系統之間。DotNetCore.CAP 支持以下幾種運輸方式:RabbitMQ,Kafka,Azure Service bus,Amazon SQS,In-memory queue。我使用的是RabbitMq。
二,DotnetCore webapi項目集成DotNetCore.CAP
public void ConfigureServices(IServiceCollection services)
{
//獲取數據庫連接字符串
var connection = Configuration.GetConnectionString("MySql");
//Cap
services.AddCap(conf => {
//配置數據庫上下文
conf.UseEntityFramework<Entity.MicoDatacontext>();
//配置數據庫連接
conf.UseMySql(connection);
//使用RabbitMQ運輸器
conf.UseRabbitMQ(rab => {
rab.HostName = "192.168.137.2";
rab.Password = "xxxxxx";
rab.Port = 5672;
rab.UserName = "xxxx";
});
});
services.AddDbContext<Entity.MicoDatacontext>(options =>
{
options.UseMySql(connection);
});
services.AddControllers();
}
三,在Contoller中使用CAP發送消息
比如下面是一個模擬創建訂單的接口,創建完訂單后發送一個主題為“order.create”的消息到消息總線。這個接口開啟了一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內,確保業務完成的同時消息能發送出去。
readonly Entity.MicoDatacontext dbcontext;
readonly ICapPublisher capPublisher;
public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) {
this.dbcontext = dbcontext;
this.capPublisher = capPublisher;
}
[HttpGet("CreateOrder")]
public async Task<bool> CreateOrderAsync(string name,string goodsSid)
{
Order order = new Order()
{
Sid = Guid.NewGuid().ToString(),
CreateTime = DateTime.Now,
GoodsSid = goodsSid,
Name = name,
Status = 0,
UpdateTime = DateTime.Now
};
//開啟一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內。
using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false))
{
dbcontext.Order.Add(order);
await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name });
dbcontext.SaveChanges();
tran.Commit();
return true;
};
}
四,在Contoller中使用CAP接收消息
下面是模擬倉庫服務在接收到訂單創建消息后減庫存的操作。如果倉庫服務沒有成功消費這條消息,DotnetCore.CAP將會啟用重發機制。
[Route("api/[controller]")]
[ApiController]
public class TestController : ControllerBase
{
readonly Entity.MicoDatacontext dbcontext;
readonly ICapPublisher capPublisher;
public TestController(ICapPublisher capPublisher, Entity.MicoDatacontext dbcontext)
{
this.dbcontext = dbcontext;
this.capPublisher = capPublisher;
}
[NonAction]
[CapSubscribe("order.create")]
public Task OrderCreate(Common.Publish.OrderCreate order)
{
var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault();
if (storeage != null) throw new Exception("庫存不足");
storeage.Count -= 1;
storeage.UpdateTime = DateTime.Now;
dbcontext.Storeage.Update(storeage);
dbcontext.SaveChanges();
return Task.CompletedTask;
}
}
五,消息重試配置
1、 發送重試
在消息發送過程中,當出現 Broker 宕機或者連接失敗的情況亦或者出現異常的情況下,這個時候 CAP 會對發送的重試,第一次重試次數為 3,4分鍾后以后每分鍾重試一次,進行次數 +1,當總次數達到50次后,CAP將不對其進行重試。
你可以在 CapOptions 中設置FailedRetryCount來調整默認重試的總次數。
當失敗總次數達到默認失敗總次數后,就不會進行重試了,你可以在 Dashboard 中查看消息失敗的原因,然后進行人工重試處理。
2、 消費重試
當 Consumer 接收到消息時,會執行消費者方法,在執行消費者方法出現異常時,會進行重試。這個重試策略和上面的 發送重試 是相同的
//獲取數據庫連接字符串
var connection = Configuration.GetConnectionString("MySql");
//Cap
services.AddCap(conf => {
//配置數據庫上下文
conf.UseEntityFramework<Entity.MicoDatacontext>();
//配置數據庫連接
conf.UseMySql(connection);
//使用RabbitMQ運輸器
conf.UseRabbitMQ(rab => {
rab.HostName = "192.168.137.2";
rab.Password = "xxxx114";
rab.Port = 5672;
rab.UserName = "xxxx";
});
//消息重試的最大次數
conf.FailedRetryCount = 50;
//消息重試間隔時間,4min后該值設置生效(默認快速重試3次)
conf.FailedRetryInterval = 60;
//發送成功的消息的過期時間(過期則刪除)
conf.SucceedMessageExpiredAfter = 24 * 3600;
//發送消息失敗后的回調
conf.FailedThresholdCallback = (context) =>
{
//通知管理人員或其它邏輯
};
六,事務補償
某些情況下,消費者需要返回值以告訴發布者執行結果,以便於發布者實施一些動作,通常情況下這屬於補償范圍。可以在消費者執行的代碼中通過重新發布一個新消息來通知上游,CAP 提供了一種簡單的方式來做到這一點。 你可以在發送的時候指定 callbackName 來得到消費者的執行結果。
比如上面的示例,倉庫消費后需要告訴訂單服務處理結果。
訂單服務:處理創建訂單業務后發送一條帶有補償回調的消息並通過CapSubscribe接收該回調消息,處理訂單狀態
[Route("api/[controller]")]
[ApiController]
public class TestController : ControllerBase
{
readonly Entity.MicoDatacontext dbcontext;
readonly ICapPublisher capPublisher;
public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) {
this.dbcontext = dbcontext;
this.capPublisher = capPublisher;
}
[HttpGet("CreateOrder")]
public async Task<bool> CreateOrderAsync(string name,string goodsSid)
{
Order order = new Order()
{
Sid = Guid.NewGuid().ToString(),
CreateTime = DateTime.Now,
GoodsSid = goodsSid,
Name = name,
Status = 0,
UpdateTime = DateTime.Now
};
//開啟一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內。
using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false))
{
dbcontext.Order.Add(order);
//發送一個帶有補償回調的消息
await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name },"Soreage.reduced");
dbcontext.SaveChanges();
tran.Commit();
return true;
};
}
/// <summary>
/// 補償消息處理
/// </summary>
/// <param name="order"></param>
/// <returns></returns>
[NonAction]
[CapSubscribe("Soreage.reduced")]
public Task OrderCreate(Common.Publish.StoreageReduced msg)
{
var order = this.dbcontext.Order.Find(msg.OrderSid);
if (order != null)
{
order.Status = 1;
dbcontext.SaveChanges();
}
return Task.CompletedTask;
}
}
倉庫服務:接收order.create主題消息並返回正確的返回值
[NonAction]
[CapSubscribe("order.create")]
public Common.Publish.StoreageReduced OrderCreate(Common.Publish.OrderCreate order)
{
var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault();
if (storeage != null) throw new Exception("庫存不足");
storeage.Count -= 1;
storeage.UpdateTime = DateTime.Now;
dbcontext.Storeage.Update(storeage);
dbcontext.SaveChanges();
return new Common.Publish.StoreageReduced() {OrderSid=order.OrderSid,IsSuccess=true };
}
七,並發沖突處理
使用EF的RowVersion做樂觀鎖解決並發沖突的問題。
Do實體添加Timestamp列
[Timestamp]
public byte[] Timespan { get; set; }
DbContext類可以做如下配置
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Model.Order>().Property(r => r.Timespan).IsRowVersion();
base.OnModelCreating(modelBuilder);
}
