Dotnet微服務:使用cap實現分布式服務的數據一致性


DotNetCore.CAP是一個在分布式系統中(SOA,MicroService)實現事件總線及最終一致性(分布式事務)的一個開源的 C# 庫,具有輕量級,高性能,易使用等特點。開源地址

Cap(Consistency(一致性)、Availability(可用性)、Partition tolerance(分區容錯性))是分布式系統中的一個重要理念,根據CAP定理,存在網絡分區(微服務即時網絡分區架構)時,Web應用不可能同時滿足可用性和一致性,DotNetCore.CAP使用“異步確保”方案,利用消息隊列和本地消息列表實現最終數據一致性。異步確保模式是補償模式的一個典型案例,通過異步的方式進行處理,處理后把結果通過通知系統通知給使用方。

一,准備內容:

運輸器:過運輸將數據從一個地方移動到另一個地方-在采集程序和管道之間,管道與實體數據庫之間,甚至在管道與外部系統之間。DotNetCore.CAP 支持以下幾種運輸方式:RabbitMQ,Kafka,Azure Service bus,Amazon SQS,In-memory queue。我使用的是RabbitMq。

二,DotnetCore webapi項目集成DotNetCore.CAP

 public void ConfigureServices(IServiceCollection services)
        {
            //獲取數據庫連接字符串
            var connection = Configuration.GetConnectionString("MySql");
            //Cap
            services.AddCap(conf => {
                //配置數據庫上下文
                conf.UseEntityFramework<Entity.MicoDatacontext>();
                //配置數據庫連接
                conf.UseMySql(connection);
                //使用RabbitMQ運輸器
                conf.UseRabbitMQ(rab => {
                    rab.HostName = "192.168.137.2";
                    rab.Password = "xxxxxx";
                    rab.Port = 5672;
                    rab.UserName = "xxxx";
                });
            });
            services.AddDbContext<Entity.MicoDatacontext>(options =>
            {
                options.UseMySql(connection);
            });
            services.AddControllers();
        }

三,在Contoller中使用CAP發送消息

比如下面是一個模擬創建訂單的接口,創建完訂單后發送一個主題為“order.create”的消息到消息總線。這個接口開啟了一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內,確保業務完成的同時消息能發送出去。

 readonly Entity.MicoDatacontext dbcontext;
        readonly ICapPublisher capPublisher;
        public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) {
            this.dbcontext = dbcontext;
            this.capPublisher = capPublisher;
        }
        [HttpGet("CreateOrder")]
        public async Task<bool> CreateOrderAsync(string name,string goodsSid)
        {
            Order order = new Order()
            {
                Sid = Guid.NewGuid().ToString(),
                CreateTime = DateTime.Now,
                GoodsSid = goodsSid,
                Name = name,
                Status = 0,
                UpdateTime = DateTime.Now
            };
            //開啟一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內。
            using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false))
            {
                dbcontext.Order.Add(order);
                await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name });
                dbcontext.SaveChanges();
                tran.Commit();
                return true;
            };
        } 

四,在Contoller中使用CAP接收消息 

 下面是模擬倉庫服務在接收到訂單創建消息后減庫存的操作。如果倉庫服務沒有成功消費這條消息,DotnetCore.CAP將會啟用重發機制。

[Route("api/[controller]")]
    [ApiController]
    public class TestController : ControllerBase
    {
        readonly Entity.MicoDatacontext dbcontext;
        readonly ICapPublisher capPublisher;
        public TestController(ICapPublisher capPublisher, Entity.MicoDatacontext dbcontext)
        {
            this.dbcontext = dbcontext;
            this.capPublisher = capPublisher;
        }
        [NonAction]
        [CapSubscribe("order.create")]
        public Task OrderCreate(Common.Publish.OrderCreate order)
        {
            var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault();
            if (storeage != null) throw new Exception("庫存不足");
            storeage.Count -= 1;
            storeage.UpdateTime = DateTime.Now;
            dbcontext.Storeage.Update(storeage);
            dbcontext.SaveChanges();
            return Task.CompletedTask;
        }
    }

五,消息重試配置

1、 發送重試

在消息發送過程中,當出現 Broker 宕機或者連接失敗的情況亦或者出現異常的情況下,這個時候 CAP 會對發送的重試,第一次重試次數為 3,4分鍾后以后每分鍾重試一次,進行次數 +1,當總次數達到50次后,CAP將不對其進行重試。

你可以在 CapOptions 中設置FailedRetryCount來調整默認重試的總次數。

當失敗總次數達到默認失敗總次數后,就不會進行重試了,你可以在 Dashboard 中查看消息失敗的原因,然后進行人工重試處理。

2、 消費重試

當 Consumer 接收到消息時,會執行消費者方法,在執行消費者方法出現異常時,會進行重試。這個重試策略和上面的 發送重試 是相同的

    //獲取數據庫連接字符串
            var connection = Configuration.GetConnectionString("MySql");
            //Cap
            services.AddCap(conf => {
                //配置數據庫上下文
                conf.UseEntityFramework<Entity.MicoDatacontext>();
                //配置數據庫連接
                conf.UseMySql(connection);
                //使用RabbitMQ運輸器
                conf.UseRabbitMQ(rab => {
                    rab.HostName = "192.168.137.2";
                    rab.Password = "xxxx114";
                    rab.Port = 5672;
                    rab.UserName = "xxxx";
                });
                //消息重試的最大次數
                conf.FailedRetryCount = 50;
                //消息重試間隔時間,4min后該值設置生效(默認快速重試3次)
                conf.FailedRetryInterval = 60;
                //發送成功的消息的過期時間(過期則刪除)
                conf.SucceedMessageExpiredAfter = 24 * 3600;
                //發送消息失敗后的回調
                conf.FailedThresholdCallback = (context) =>
                {
                    //通知管理人員或其它邏輯
                };

 六,事務補償

某些情況下,消費者需要返回值以告訴發布者執行結果,以便於發布者實施一些動作,通常情況下這屬於補償范圍。可以在消費者執行的代碼中通過重新發布一個新消息來通知上游,CAP 提供了一種簡單的方式來做到這一點。 你可以在發送的時候指定 callbackName 來得到消費者的執行結果。

比如上面的示例,倉庫消費后需要告訴訂單服務處理結果。

訂單服務:處理創建訂單業務后發送一條帶有補償回調的消息並通過CapSubscribe接收該回調消息,處理訂單狀態

[Route("api/[controller]")]
    [ApiController]
    public class TestController : ControllerBase
    {
        readonly Entity.MicoDatacontext dbcontext;
        readonly ICapPublisher capPublisher;
        public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) {
            this.dbcontext = dbcontext;
            this.capPublisher = capPublisher;
        }
        [HttpGet("CreateOrder")]
        public async Task<bool> CreateOrderAsync(string name,string goodsSid)
        {
            Order order = new Order()
            {
                Sid = Guid.NewGuid().ToString(),
                CreateTime = DateTime.Now,
                GoodsSid = goodsSid,
                Name = name,
                Status = 0,
                UpdateTime = DateTime.Now
            };
            //開啟一個需要手動提交的本地事務,插入本地消息和處理業務邏輯都在這個事務內。
            using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false))
            {
                dbcontext.Order.Add(order);
                //發送一個帶有補償回調的消息
                await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name },"Soreage.reduced");
                dbcontext.SaveChanges();
                tran.Commit();
                return true;
            };
        }
        /// <summary>
        /// 補償消息處理
        /// </summary>
        /// <param name="order"></param>
        /// <returns></returns>
        [NonAction]
        [CapSubscribe("Soreage.reduced")]
        public Task OrderCreate(Common.Publish.StoreageReduced msg)
        {
            var order = this.dbcontext.Order.Find(msg.OrderSid);
            if (order != null)
            {
                order.Status = 1;
                dbcontext.SaveChanges();
            }
            return Task.CompletedTask;
        }
    }

  倉庫服務:接收order.create主題消息並返回正確的返回值

 [NonAction]
        [CapSubscribe("order.create")]
        public Common.Publish.StoreageReduced OrderCreate(Common.Publish.OrderCreate order)
        {
            var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault();
            if (storeage != null) throw new Exception("庫存不足");
            storeage.Count -= 1;
            storeage.UpdateTime = DateTime.Now;
            dbcontext.Storeage.Update(storeage);
            dbcontext.SaveChanges();
            return new Common.Publish.StoreageReduced() {OrderSid=order.OrderSid,IsSuccess=true };
        }

,並發沖突處理

使用EF的RowVersion做樂觀鎖解決並發沖突的問題。

Do實體添加Timestamp列

        [Timestamp]
        public byte[] Timespan { get; set; }

DbContext類可以做如下配置

protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            modelBuilder.Entity<Model.Order>().Property(r => r.Timespan).IsRowVersion();
            base.OnModelCreating(modelBuilder);
        }

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM