一. CAP框架異常處理
1. RabbitMQ宕機
(1).模擬場景
直接把RabbitMq服務關閉,然后發送5次請求,會發現Published表中多了5條數據!!!!Received表中沒有數據;然后打開RabbitMq服務,觀察現象,仔細觀察Published表,有3條記錄已經重試了3次,是Failed,另外兩條打開服務后,重新發送成功。
(2).實現原理
當將RabbitMQ啟動后,消息正常發送,CAP框架內部使用定時器輪詢機制實現讀取DB中的未成功的數據發送給MQ.
(3).問題
重試3次仍然失敗的數據后續怎么辦,后續怎么是否還重試呢?
答:大約4分鍾后,失敗的3條數據重新發送成功了.(這里可能有個bug,重試次數還是為3,沒有加1)
2. 發布者執行業務成功,發送消息時宕機
(1).模擬場景
這里指的是調用publish的時候失敗(不太好模擬),數據還是會存放在Published表中。
(2).實現原理
本地消息表、定時器輪詢、冪等性
3. 訂閱者接收消息失敗(或者接收消息成功,執行業務失敗)
(1).模擬場景1:接收消息失敗
關閉訂閱者服務,然后發送5次請求,發現Published表中多了5條數據,且都是發送成功的, 此時Received表中沒有數據!!打開訂閱者服務,發現很快多了5條數據,且為成功狀態,還有訂閱者的控制台收到了發送過來的5次請求。
(2).模擬場景2:接收消息成功,執行業務失敗
有3種寫法,只有直接異常不補獲 或者 進入catch顯式throw拋出異常, Cap框架才認為消費失敗,,Received表中有數據,但狀態是Failed!!
A. 直接異常不補獲,Cap認為是失敗的,會快速重試3次,然后4min中后,每隔FailedRetryInterval(自己配置),重試1次。
B. try-catch捕獲異常,Cap認為是成功的
C. catch顯式throw拋出異常, Cap認為是失敗的,會快速重試3次,然后4min中后,每隔FailedRetryInterval(自己配置),重試1次。
補充:這里即使Cap認為是失敗,MQ的中的消息也被刪除了,這里CAP框架就是這么設計的,個人不太傾向失敗也刪除消息(個人傾向消息發送成功且本地業務也執行成功,再反饋ACK,MQ刪除消息)。
代碼分享:

/// <summary> /// 訂閱者的方法 /// </summary> /// <param name="time"></param> [NonAction] [CapSubscribe("ypfkey1")] public void ReceiveMsg(DateTime time) { //1. 正常接收 { _log.LogInformation($"我是訂閱者,收到的內容為:{time}"); } //2. 異常接收(Cap認為是失敗的) //{ // int.Parse("dsfdsf"); //模擬業務報錯 // _log.LogInformation($"我是訂閱者,收到的內容為:{time}"); //} //3. 異常接收(Cap認為是成功的) //{ // try // { // int.Parse("dsfdsf");//模擬業務報錯 // _log.LogInformation($"我是訂閱者,收到的內容為:{time}"); // } // catch (Exception ex) // { // _log.LogInformation($"業務執行失敗了:{ex.Message}"); // } //} //4. 異常接收(catch中顯式拋異常) //{ // try // { // int.Parse("dsfdsf");//模擬業務報錯 // _log.LogInformation($"我是訂閱者,收到的內容為:{time}"); // } // catch (Exception ex) // { // _log.LogInformation($"業務執行失敗了:{ex.Message}"); // throw new Exception($"業務執行失敗了:{ex.Message}"); // } //} }
(2).實現原理:
A.使用本地消息表解決(思想:持久化操作);
B.定時器,消息重試;
C.冪等性 一個函數每次都是相同的結果,狀態只有一個;
4. 訂閱者消費重試后,還是消費失敗
(1). 准備:
通過‘人工干預’的方式解決,通過Nuget安裝給兩個項目安裝【DotNetCore.CAP.Dashboard 5.0.3】,然后在ConfigureService中的AddCap添加代碼:x.UseDashboard(); 通過下面兩個地址進行人工干預:
http://localhost:9001/cap 處理發布者-發送失敗的消息
http://localhost:9011/cap 處理訂閱者-接收失敗的消息
(2). 模擬場景:
先制造一個訂閱者接收消息成功,但執行業務失敗的情況,登錄后台cap,查看異常;此時把訂閱者接口改成正常接收,在cap后台手動點擊異常記錄,進行重新消費,消費成功,DB的記錄變為succeed。
如下圖:
二. 基於Cap實現分布式事務
1. 什么是分布式事務
分布式系統會把一個應用系統拆分為可獨立部署的多個服務,因此需要服務與服務之間遠程協作才能完成事務操作,這種分布式系統環境下由不同的服務之間通過網絡遠程協作完成事務稱之為分布式事務,例如用戶注冊送積分事務、創建訂單減庫存事務,銀行轉賬事務等都是分布式事務。
簡單的來說:接口A中除了要執行自身業務,還需要通過網絡通信調用另外一個服務中的接口,自身業務+調用另外一個服務接口 就組成一個分布式事務。
2. Cap框架是處理分布式事務的?
CAP 不直接提供開箱即用的基於 DTC 或者 2PC 的分布式事務,相反我們提供一種可以用於解決在分布式事務遇到的問題的一種解決方案。在分布式環境中,由於涉及通訊的開銷,使用基於2PC或DTC的分布式事務將非常昂貴,在性能方面也同樣如此。另外由於基於2PC或DTC的分布式事務同樣受**CAP定理**的約束,當發生網絡分區時它將不得不放棄可用性(CAP中的A)。針對於分布式事務的處理,CAP 采用的是“異步確保”這種方案(也叫本地消息表),實現最終一致性。
下面實操的結論:
多個publish 和 本地業務DB操作,每個publish可能對應一項其它DB的操作,最后統一提交,
(1). 本地業務DB操作失敗則整體失敗,publish不會執行,本地published表中也不會存儲消息記錄(無論先業務DB操作,還是先publish操作,結果都一樣)。
(2). publish向MQ中發送失敗,本地DB業務執行成功,這種情況下本地published表中會存儲消息發送記錄,可以進行發送重試,然后訂閱者那也有持久化功能,可以進行消費重試或者手動干預,最終保證都能成功。
3. 實操分析
詳見:PubController中的SendMsg2方法,調用地址:http://localhost:9001/api/Pub/SendMsg2 進行測試.
(1).模擬1:DB操作和消息publish統一提交成功。
(2).模擬2:先消息publish發送,后DB操作,統一提交,模擬DB操作失敗,則消息的publish也發送失敗(本地表中沒數據,根本不執行哦) 【本地業務執行失敗,是必須要回滾的,所以不往消息表中插入數據】
(3).模擬3:先DB操作,后消息publish,統一提交,關閉RabbitMQ,模擬消息發送失敗,但業務正常執行完,沒有進入catch,DB操作成功,publish在本地表中插入數據,只不過數據狀態是失敗的。
(4).模擬4:事務自動提交,不需要調用trans.Commit(),提交成功。
(5).模擬5:多個publish統一提交(假設每個publish對應一個不同的DB),但這里必須使用手動事務提交,手動trans.Commit(),最終提交成功。
PS:特別注意以上2和3的對比,5要手動提交
/// <summary> /// 發布者調用的方法2(有事務) /// </summary> /// <returns></returns> public IActionResult SendMsg2() { try { #region 01-業務都執行成功 //using (var trans = _dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) //{ // //業務1 // _dbContext.OrderInfor.Add(new OrderInfor() // { // id = Guid.NewGuid().ToString("N"), // orderNum = new Random().Next(99999, 999999).ToString(), // addTime = DateTime.Now // }); // //業務2 // var nowTime = DateTime.Now; // _capBus.Publish("ypfkey1", nowTime); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); // _dbContext.SaveChanges(); // trans.Commit(); //} #endregion #region 02-模擬DB插入失敗 //using (var trans = _dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) //{ // //業務1 // var nowTime = DateTime.Now; // _capBus.Publish("ypfkey1", nowTime); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); // //業務2 // _dbContext.OrderInfor.Add(new OrderInfor() // { // id = Guid.NewGuid().ToString("N"), // orderNum = new Random().Next(9999999, 999999999).ToString(), //模擬DB插入失敗 // addTime = DateTime.Now // }); // _dbContext.SaveChanges(); // trans.Commit(); //} #endregion #region 03-模擬消息發送失敗(關閉RabbitMq) //using (var trans = _dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) //{ // //業務1 // _dbContext.OrderInfor.Add(new OrderInfor() // { // id = Guid.NewGuid().ToString("N"), // orderNum = new Random().Next(99999, 999999).ToString(), // addTime = DateTime.Now // }); // //業務2 // var nowTime = DateTime.Now; // _capBus.Publish("ypfkey1", nowTime); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); // _dbContext.SaveChanges(); // trans.Commit(); //} #endregion #region 04-業務都執行成功(事務自動提交) //using (var trans = _dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) //{ // //業務1 // _dbContext.OrderInfor.Add(new OrderInfor() // { // id = Guid.NewGuid().ToString("N"), // orderNum = new Random().Next(99999, 999999).ToString(), // addTime = DateTime.Now // }); // //業務2 // var nowTime = DateTime.Now; // _capBus.Publish("ypfkey1", nowTime); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); // _dbContext.SaveChanges(); //} #endregion #region 05-多個publish //using (var trans = _dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) //{ // var nowTime = DateTime.Now; // //業務1 // _capBus.Publish("ypfkey1", nowTime); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); // Thread.Sleep(5000); // //業務2 // _capBus.Publish("ypfkey1", nowTime.AddDays(1)); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime.AddDays(1)}"); // Thread.Sleep(8000); // //業務3 // _capBus.Publish("ypfkey1", nowTime.AddDays(2)); // _log.LogInformation($"我是發布者,發布的內容為:{nowTime.AddDays(2)}"); // trans.Commit(); //} #endregion return Content("發送成功"); } catch (Exception ex) { return Content($"發送失敗:{ex.Message}"); } }
三. Cap其它用法
1. 其它用法
(1).配置
x.FailedRetryInterval = 60; //重試間隔,4min后該值設置生效(默認快速重試3次)--服務於發送重試和消費重試
x.FailedRetryCount = 50; //重試的最大次數
x.ConsumerThreadCount = 1; //消費者線程並行處理消息的線程數,提高消費速度,但這個值大於1時,將不能保證消息執行的順序。
x.SucceedMessageExpiredAfter = 24 * 3600; //成功消息的過期時間(過期則刪除)
x.FailedThresholdCallback //重試閾值的失敗回調
詳見:https://cap.dotnetcore.xyz/user-guide/zh/cap/configuration/
(2).消息
事務補償、header和body分離、消息重試(發送和消費)、消息數據清理
詳見:https://cap.dotnetcore.xyz/user-guide/zh/cap/messaging/
(3).冪等性
詳見:https://cap.dotnetcore.xyz/user-guide/zh/cap/idempotence/
2.事務補償實操
(1).含義
某些情況下,消費者需要返回值以告訴發布者執行結果,以便於發布者實施一些動作,通常情況下這屬於補償范圍。
(2).模擬場景
發布者中有兩個方法SendMsg3和MarkStatus方法,MarkStatus用於接收消費者回傳的信息,該方法需要標記 [CapSubscribe("mStatusKey")]特性,特性的值為publish方法的第三個參數聲明,eg:_capBus.Publish("pOrderkey", orderNum, "mStatusKey");
(3).測試
請求 http://localhost:9001/api/Pub/SendMsg3 ,發布者的控制台除了發送成功外,還收到消費者消費成功的信息,說明測試通過。
發布者代碼:
public IActionResult SendMsg3() { var orderNum = new Random().Next(99999, 999999); _capBus.Publish("pOrderkey", orderNum, "mStatusKey"); _log.LogInformation($"我是發布者,發布的內容為:{orderNum}"); return Content("發送成功"); } [NonAction] [CapSubscribe("mStatusKey")] public void MarkStatus(string msg) { _log.LogInformation($"我是發布者,接收到回傳信息為:{msg}"); if (msg=="ok") { _log.LogInformation($"消費者消費成功了"); } else { _log.LogInformation($"消費者消費失敗了"); } }
訂閱者代碼:
[NonAction] [CapSubscribe("pOrderkey")] public string ReceiveMsg2(string orderNum) { _log.LogInformation($"我是訂閱者,收到的內容為:{orderNum}"); return "ok"; }
運行截圖:
3.表頭發送實操
請求 http://localhost:9001/api/Pub/SendMsg4,代碼如下:
發布者:
public IActionResult SendMsg4() { var header = new Dictionary<string, string>() { ["my.header.first"] = "first", ["my.header.second"] = "second" }; var orderNum = new Random().Next(99999, 999999); _capBus.Publish("ypfkey2", orderNum, header); _log.LogInformation($"我是發布者,發布的內容為:{orderNum}"); return Content("發送成功"); }
訂閱者:
[CapSubscribe("ypfkey2")] public void ReceiveMessage(string orderNum, [FromCap] CapHeader header) { Console.WriteLine($"我是訂閱者,收到的內容為:{orderNum}"); Console.WriteLine("message firset header :" + header["my.header.first"]); Console.WriteLine("message second header :" + header["my.header.second"]); }
4.獲取發送表和接收表的名稱
未測試,基於關系型數據庫存儲的時候可以使用,比如基於:SQLServer存儲和MySQL存儲
參考:https://cap.dotnetcore.xyz/user-guide/zh/storage/sqlserver/
5.類中接收消息
未測試,參考:https://www.cnblogs.com/savorboard/p/cap.html
!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。