分布式事務框架.NetCore CAP總結


來自CAP原作者yang-xiaodong的原理圖:

本文撰寫者:cmliu,部分內容引用自官方文檔,部分內容待更新
# .NetCore CAP #
1,簡介 CAP 是一個遵循 .NET Standard 標准庫的C#庫,用來處理分布式事務以及提供EventBus的功能,她具有輕量級,高性能,易使用等特點。 目前 CAP 使用的是 .NET Standard 1.6 的標准進行開發,目前最新預覽版本已經支持 .NET Standard 2.0 ## CAP 的應用場景主要有以下兩個: ### 分布式事務中的最終一致性(異步確保)的方案。 >分布式事務是在分布式系統中不可避免的一個硬性需求,CAP 沒有采用兩階段提交(2PC)這種事務機制 >而是采用的 本地消息表+MQ 這種經典的實現方式,這種方式又叫做 異步確保。 ### 具有高可用性的 EventBus(事件總線)。 >CAP 實現了 EventBus 中的發布/訂閱,它具有 EventBus 的所有功能。 >也就是說你可以像使用 EventBus 一樣來使用 CAP,另外 CAP 的 EventBus 是具有高可用性的, CAP 借助於本地消息表來對 EventBus 中的消息進行了持久化 >這樣可以保證 EventBus 發出的消息是可靠的,當消息隊列出現宕機或者連接失敗的情況時,消息也不會丟失 ##### 注意本文最底部內容 # 2,入門 1),引用基本包 DotNetCore.CAP 2),引用消息層包(用於服務端【生產者】與客戶端【訂閱者】之間的通信) RabbitMQ 引用DotNetCore.CAP.RabbitMQ Kafka 引用DotNetCore.CAP.Kafka 3)引用數據庫包(用於保存本地的收發消息記錄表) SqlServer 引用DotNetCore.CAP.SqlServer MySql 引用DotNetCore.CAP.MySql MongODB 引用DotNetCore.CAP.MongoDB 4)啟動配置:Startup.cs的ConfigureServices()方法中配置Cap服務 ```c# //此處用於注冊繼承自:ICapSubscribe接口的訂閱服務,以下CapUserService類繼承了ICapSubscribe接口 //繼承自ICapSubscribe接口的訂閱,需要在:AddCap方法之前注冊服務,否則將不會被掃描到 //services.AddTransient<ICapUserService, CapUserService>(); //下面語句用於開啟支持使用EntityFramework,使用此方案時,無需配置UseSqlServer或者Mysql //services.AddDbContext<AppDbContext>(); //配置Cap services.AddCap(x => { //配置Cap的本地消息記錄庫,用於服務端保存Published消息記錄表;客戶端保存Received消息記錄表 // 此方法默認使用的數據庫Schema為Cap;2,要求最低sql server2012(因為使用了Dashboard的sql查詢語句使用了Format新函數) //x.UseSqlServer("Integrated Security=False;server=服務器;database=cap;User ID=sa;Password=密碼;Connect Timeout=30"); // 配置Cap的本地消息記錄庫,用於服務端保存Published消息記錄表;客戶端保存Received消息記錄表 // 此方法可以指定是否使用sql server2008,數據庫Schema,鏈接字符串 x.UseSqlServer((options) => { //數據庫連接字符串 options.ConnectionString="Integrated Security=False;server=192.168.1.109;database=cap;User ID=sa;Password=密碼;Connect Timeout=30"; //標記使用的是SqlServer2008引擎(此處設置的是2008,因為192.168.1.109數據庫是2008) options.UseSqlServer2008(); //Cap默認使用的數據庫Schema為Cap;此處可以指定使用自己的數據庫Schema //options.Schema = "dbo"; }); //使用Kafka作為底層之間的消息發送 x.UseKafka("192.168.1.230:9092,192.168.1.231:9092,192.168.1.232:9092"); //x.UseKafka(options => //{ // options.Servers = "192.168.1.230:9092,192.168.1.231:9092,192.168.1.232:9092"; //}); //使用Dashboard,這是一個Cap的可視化管理界面;默認地址:http://localhost:端口/cap x.UseDashboard(); //默認分組名,此值不配置時,默認值為當前程序集的名稱 //x.DefaultGroup = "m"; //失敗后的重試次數,默認50次;在FailedRetryInterval默認60秒的情況下,即默認重試50*60秒(50分鍾)之后放棄失敗重試 //x.FailedRetryCount = 10; //失敗后的重拾間隔,默認60秒 //x.FailedRetryInterval = 30; //設置成功信息的刪除時間默認24*3600秒 //x.SucceedMessageExpiredAfter = 60 * 60; }); ``` 5)消息推送者(生產者) >5.1)以下代碼是在一個Controler中進行推送 ```c# //省略其他代碼 public class CapDemoController : Controller { //注入一個ICapPublisher private readonly ICapPublisher _capBus; public CapDemoController(ICapPublisher capPublisher) { _capBus = capPublisher; } //簡單的推送使用 public async Task<IActionResult> GetDemo() { //發送消息給客戶端,第一個參值數"kjframe.test"為消息隊列的topic await _capBus.PublishAsync("kjframe.test", DateTime.Now); return Ok(); } } ``` >5.2)以下是添加了一個手動提交事務推送的Api接口 ```c# public async Task<IActionResult> GetTransaction() { using (var connectionn = new SqlConnection("鏈接字符串")) { //創建手動提交的事務,false,表示手動提交 using (var transaction = connectionn.BeginTransaction(_capBus, false)) { //sqlserver,執行自定義業務 connectionn.Execute("update TableA set Name='嘻嘻' where id=1", null, transaction); //mysql //connection.Execute("sql語句",null,(IDbTransaction)transaction.DbTransaction); //執行異步的分布式事務,推送必須在transaction.Commit()事務提交語句之前執行 _capBus.PublishAsync("m.test", DateTime.Now); //事務提交:如果connectionn.BeginTransaction(_capBus, false)的autoCommit參數為false,則需要手動提交事務 transaction.Commit(); } } return Ok(); } ``` >5.3)以下是添加了一個自動提交事務推送的Api接口 ```c# public async Task<IActionResult> GetAutoTransaction() { using (var connectionn = new SqlConnection("鏈接字符串")) { //創建自動提交的事務 using (var transaction = connectionn.BeginTransaction(_capBus, true)) { //sqlserver,執行自定義業務 connectionn.Execute("update TableA set Name='嘻嘻' where id=1", null, transaction); //mysql //connection.Execute("sql語句",null,(IDbTransaction)transaction.DbTransaction); //此處connectionn.BeginTransaction的autoCommit參數true,所以cap在PublishAsync方法中會自動提交事務 //當使用的EntityFramework(EF)操作數據庫時,此處也會保存EF的上下文(SaveChanges) //自動提交事務時,PublishAsync需要在最后面 _capBus.PublishAsync("m.test", DateTime.Now); } } return Ok(); } ``` 6)訂閱(在Controller中的訂閱) > 訂閱的Controller無需繼承ICapSubscribe接口,也無需像繼承自ICapSubscribe接口的訂閱那樣要在Startup中的AddCap方法之前注冊服務 ```c# [Route("api/[controller]")] [ApiController] public class CapSubscribeController : Controller { //使用指定訂閱組 //無需返回值,void或Task即可 [CapSubscribe("m.test", Group = "group4")] public void TestSubscribe(string date) { Console.WriteLine($"接收到訂閱:{date}"); } //使用默認訂閱組(當前程序集名,或者是Startup中配置的DefaultGroup參數) //無需返回值,void或Task即可 //此處訂閱了兩個topic, [CapSubscribe("m.test")] [CapSubscribe("xxx.services.bar")] public void TestSubscribe(string date) { Console.WriteLine($"接收到訂閱:{date}"); } } ``` 7)訂閱(在服務層或者非Controller中訂閱的實現方式:繼承ICapSubscribe接口,並在Startup.cs配置文件中的AddCap方法之前注冊該服務) > 訂閱處:ICapUserService.cs;CapUserService.cs ```c# public interface ICapUserService { void SubscribeWithnoController(string date); } public class CapUserService : ICapSubscribe, ICapUserService { [CapSubscribe("m.test")] public void SubscribeWithnoController(string date) { Console.WriteLine($"SubscribeWithnoController接收到訂閱:{date}"); } } ``` > 配置處:Startup.cs的ConfigureServices方法 ```c# public void ConfigureServices(IServiceCollection services) { //注冊繼承了繼承ICapSubscribe接口的訂閱 services.AddTransient<ICapUserService, CapUserService>(); //省略代碼 //注冊CAP服務 services.AddCap(x => { //省略代碼 }); //省略代碼 } ``` 8)同一topic,被多個不同group的訂閱者訂閱 >此時,每一個訂閱者都會收到消息,且Received表中會給每一個訂閱者插入一條【Content字段】相同的訂閱記錄,有3個訂閱者,就有3條Received數據記錄 >示例 ```c# //生產者 [HttpGet] public async Task<IActionResult> GetDemo() { //省略其他代碼 await _capBus.PublishAsync("m.test", DateTime.Now); //省略其他代碼 } //訂閱者1 [CapSubscribe("m.test")] public void TestSubscribe(DateTime date) { Console.WriteLine($"接收到訂閱:{date.ToString("yyyy-MM-dd hh:mm:ss")}"); } //訂閱者2(group1) [CapSubscribe("m.test", Group = "group1")] public void SubscribeGroup2(string date) { Console.WriteLine($"group1接收到消息:{date}"); } //訂閱者3(WDB) [CapSubscribe("m.test", Group = "group4")] public void TestSubscribe(string date) { Console.WriteLine($"group4接收到消息:{date}"); } ``` >Received表消費記錄,如下圖,產生了3條記錄,其中“cap.queue.kjframe.core.capdemo.v1”是默認topic組名,注意截圖中的Group的組名與本處的代碼有出入
9)失敗回調FailedThresholdCallback(失敗達到重試上線時,觸發此回調)

>配置

```c#
        services.AddCap(x=>{
        //其他代碼
        x.FailedThresholdCallback = FailCallBack
        //其他代碼
        });
        
       //失敗時的回調通知函數
       public void FailCallBack(DotNetCore.CAP.Models.MessageType messageType, string messageName, string messageContent)
        {
            Console.WriteLine($"失敗回調:messageType:{messageType};messageName:{messageName};
                messageContent:{messageContent}");
        }
```

>失敗回調返回的樣本(此處做了格式化顯示):

        失敗回調:
            messageType:Subscribe;
            messageName:m.test;
            messageContent:
            {
                "Id": "5cfdf02ded40720ed4e98de9",
                "Timestamp": "2019-06-10T13:52:45.4107162+08:00",
                "Content": "2019-06-10 13:52:45",
                "CallbackName": null,
                "ExceptionMessage": {
                    "Source": "DotNetCore.CAP",
                    "Message": "我要扔出異常",
                    "InnerMessage": "我要扔出異常"
                }
            }

10)回調callbackName(此處的回調與失敗回調不一樣)

        這里的callbackName指的是PublishAsync/Publish推送方法中的callbackName參數,這個參數是個string ,實際上是一個topic
        
        注意:具有callbackName回調值的訂閱方法必須有返回值,否則回調將會失敗
        
        當服務端PublishAsync/Publish消息時,會將callbackName放入message的content字段中
        客戶端的訂閱方法(此訂閱方法必須有返回值,否者content會為null,為null則會回調失敗)消費成功后,
        客戶端訂閱方法將會把客戶端訂閱方法的返回值(設為A)PublishAsync/Publish一條topic為callbackName,content包含返回值A的消息到隊列中,
        服務器端只需要在回調方法中訂閱callbackName這個topic即可觸發回調


> *服務端示例:*
    
```c#
    //服務端的生產者
    [HttpGet]
    public async Task<IActionResult> TestCallback()
    {
     await _capBus.PublishAsync("m.test", DateTime.Now, "FailCallBack");
     return Content("發起一個帶 callbackName參數的消費");
    }
```
    
```c#
    //服務端處理來自客戶端的訂閱,即訂閱回調topic: FailCallBack
    [CapSubscribe("FailCallBack", Group = "CallbackServer")]
    public void FailedCallback(string message)
    {
        Console.WriteLine($"接收到回調:{message}");
    }
```

>*客戶端示例:*
```c#
    //客戶端的訂閱方法,此方法必須要有返回值,否則回調的content將會為null,如果content為null,
    //那么服務端的訂閱將無法消費回調消息
    //服務端的【回調訂閱方法】所接收到的參數值就是這個【客戶端的訂閱方法】的返回值
    [CapSubscribe("m.test",Group = "CallbackClient")]
    public DateTime SubscribeCallback(DateTime date)
    {
         Console.WriteLine($"已處理,請回調:{date.ToString("yyyy-MM-dd hh:mm:ss")}");
         return DateTime.Now.AddDays(10);
    }
```

#####

        TestCallback()
        ----> PublishAsync("m.test", DateTime.Now, "FailCallBack") 【服務端推送】
        ----> var response=DateTime SubscribeCallback(DateTime date)     【客戶端訂閱】
        ----> 底層方法(PublishAsync("FailCallBack", response)             【客戶端推送回調消息】
        ----> FailedCallback(string message)                             【服務端訂閱回調消息】

# 注意事項:

    1),自動提交事務時,PublishAsync應放在最后面
    
    2),PublishAsync<T>(string name,T object, string callBackName)中的callBackName是一個回調,當失敗重試超過重試限制次數(默認50次:FailedRetryCount)時,
    會觸發此回調函數
    回調函數委托簽名:FailCallBack(DotNetCore.CAP.Models.MessageType messageType,string messageName,string messageContent)
    
    3),框架無法做到100%確保消息只執行一次,所以在一些關鍵場景消息端在方法實現的過程中自己注意業務去重
    
    4),一個訂閱方法可以訂閱多個Topic,但多個方法訂閱了相同的topic+group時,只會有一個訂閱方法消費到同一條消息
    
    5),支持內存消息隊列(2.5版本),需要引入DotNetCore.CAP.InMemoryStorage,並UseInMemoryStorage,此模式用於開發環境下沒有Kafka或者RabbitMQ時,可以使用內存隊列來模擬 
    
    6),開啟.AddDbContext<AppDbContext>()用於支持EF時,無需再配置UseSqlServer或者UseMySql
    
    7),支持Cap版本隔離(2.4版本),通過本地數據表的Version字段進行版本隔離
    
    8),Cap會自動創建"Published", "Received"兩個本地數據庫表
    
    9),如果多個微服務使用同一個數據庫實例,可以通過指定Schema(SqlServer)或者TableNamePrefix(MySql)來隔離不同的微服務之間的本地消息記錄
    
    10),StatusName為Failed會不斷進行重試,直到達到重試上線
    
    11),Successed的消息會在根據該消息的ExpiresAt時間進行清理(默認24小時),每1小時執行一次清理任務;Failed失敗的信息會在15天后過期並進行清理
    
    12),SqlServer2008版本的數據庫需要在UseSqlServer()的配置方法中調用UseSqlServer2008(),因為Cap的UseDashboard在SqlServer2012+版本上使用了新的語法Format內置函數
    
    13),Cap中Kafka訂閱者是IConsumer<Null, string>,如果你要使用.NET Framework向Cap的訂閱者推消息,需要注意是<Null,string>
    
    14),回調函數,PublishAsync/Publish中的callbackName參數是一個回調,這是一個topic的值,你可以在服務端訂閱這個topic用於處理客戶端消費完信息后的回調,
    注意,如果callbackName不為空,那么這個客戶端的訂閱方法必須有返回值,返回值將傳參回調回去,詳細參考【回調函數】
    
    15),Cap文檔http://cap.dotnetcore.xyz/user-guide-cn/getting-started/
    
    16),Cap GitHub https://github.com/dotnetcore/CAP
    
    17),Cap作者博文https://www.cnblogs.com/savorboard/
    

CAP框架待探討的問題

1,重試機制;當服務器重啟時,對於一些已處理的消息,或者已改變狀態的消費;需要手動清理已處理消息,避免重試;重試的頻率和次數可以不用那么高

2,如果消費端系統,生產子系統的本地化消息如果放在同一個庫里面是否會出現異常

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM