(C#版)使用TCC分布式事務改造現有下單流程(二)


引言

  上篇贅述了好多,無非就是想把tcc分布式事務的流程給講清楚,並介紹了它與另外兩種常用的分布式事務“可靠消息隊列”,“saga”的區別和適用場景。

  那接下來就引出我們的主角“dtm”吧,它類似於阿里的分布式事務框架seata,可惜由於seata的語言局限性,讓我們.neter們“望而卻步”,但更可憐的是我們.net自己的生態圈中竟然找不到能使用的類似的框架或中間件。眼看着自己設計好的新的下單流程因為沒有能夠支撐業務的中間件而流產的時候,突然收到群友的消息,扔給我一個dtm項目的鏈接,說可以看看。在這里我很感謝群友發給我的這個信息,讓我找到了能把訂單改造繼續做下去的信心。下面的篇幅針對我在dtm試用過程中遇到的問題,和一些細節做個記錄。

dtm介紹

  對於dtm這個跨語言的tcc分布式事務中間件的介紹,官網很多篇文章,循序漸進的看下來已經很清楚了,官網地址:https://dtm.pub/

  dtm官方推薦的c# sdk客戶端,是由張善友大佬提供的,地址是:https://github.com/yedf/dtmcli-csharp

,對應的demo地址:https://github.com/yedf/dtmcli-csharp-sample。張隊長“人狠話不多”,sdk和例子都提供了,但就是沒有使用的細節文檔,潛台詞就是結合着例子自己看源碼吧。sdk源碼還是比較簡單的,運行示例調試,並結合dtm服務端的數據庫,最終掌握dtm的使用還是不難的。只不過初次使用還是會遇到各種各樣的問題的。

dtm的使用

dtm服務端部署

關於docker-compose.yml的問題

我的部署環境是centos7.6和docker20.10,原始的docker-compose.yml內容如下:

 

 

其中,紅線框起來的部分原版是沒有的,因為使用中遇到了問題,通過跟作者的溝通請教,找到問題后加上去的。

1、 紅框一的作用:讓容器內的時間和時區與宿主機使用的一致,如果不加,會導致mysql容器使用了北京時間,而dtm服務程序使用的確是utc時間,相差8個小時,這樣會造成dtm服務端在管理事務時,因為時間混亂而判斷和查詢失誤,無法完成事務的正確調度。

2、紅框二的作用:很明顯是為了dtm服務端和mysql之間互聯的需要。

備注:作者說他在ubuntu下不需要加紅框部分也可以的,但centos7.6是需要的。

關於dtm服務程序和mysql容器的啟動順序問題

實際上,使用上述改過后的docker-compose.yml啟動容器后,還是有問題,dtm服務程序會閃退,報錯如下:

  看報錯應該是dtm沒有連上數據庫,在這里說一下作者本人真的是很負責,幫我看了下原因說應該是mysql還未啟動就緒,dtm就開始創建庫導致的,讓我等mysql容器就緒后再啟動dtm容器試試。經過作者的指點,我把退出的dtm容器,用docker start重新手動啟動后,終於mysql上相應的庫被dtm服務程序創建,而dtm再也不會報錯退出了,環境至此搭建完畢。

sdk的使用

dtm服務端環境搭建完畢后,接下來我們就可以運行c#示例看效果啦。

dtm服務的注冊:

public void ConfigureServices(IServiceCollection services)
{
    services.AddDtmcli(dtm => dtm.DtmUrl = "http://192.168.1.244:8080");

    services.AddControllers();

    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new OpenApiInfo { Title = "DtmTccSample", Version = "v1" });
    });
}

很簡單,只需要一句話:services.AddDtmcli(dtm => dtm.DtmUrl = "http://192.168.1.244:8080");,其中DtmUrl就是dtm服務端的地址。

sdk的介紹文檔和示例中只有如下一段代碼:

首先要說的是,示例代碼能夠運行沒有問題,但只是try階段執行成功的情況下,子事務順利提交的正常場景。

如果要測試try階段不成功或異常,子事務應該回滾,示例代碼要這樣寫:

 1 [HttpPost]
 2 public async Task<RestResult> Demo()
 3 {
 4     var svc = "http://172.16.2.14:5000/api";
 5     TransRequest request = new TransRequest() { Amount = 30 };
 6     var cts = new CancellationTokenSource();
 7     try
 8     {
 9         var res1 = string.Empty;
10         var res2 = string.Empty;
11         var isTryOk = true;
12         await globalTransaction.Excecute(async (tcc) =>
13         {
14             res1 = await tcc.CallBranch(request, svc + "/TransOut/Try", svc + "/TransOut/Confirm", svc + "/TransOut/Cancel", cts.Token);
15             res2 = await tcc.CallBranch(request, svc + "/TransIn/Try", svc + "/TransIn/Confirm", svc + "/TransIn/Cancel",cts.Token);
16             
17             if (!res1.Contains("SUCCESS") || !res2.Contains("SUCCESS"))
18             {
19                 //異常日志
20                 logger.LogError($"{res1}{res2}");
21                 //拋出異常:目的是讓sdk客戶端捕獲異常,通知dtm服務端,try階段遇到了異常,所有子事務回滾
22                 throw new AccessViolationException($"{res1}{res2}");
23             }
24             logger.LogInformation($"tcc returns: {res1}-{res2}");
25         }, cts.Token);
26         if (res1.Contains("SUCCESS") || res2.Contains("SUCCESS"))
27         {
28             logger.LogError($"{res1}{res2}");
29             return new RestResult() { Result = "FAILURE" };
30         }
31     }
32     catch(Exception ex)
33     {
34 
35     }
36     return new RestResult() { Result = "SUCCESS" };
37 }

實際上,核心代碼是17-23行,要想將try階段的失敗告知dtm,我們要做的就是捕獲所有子事務try階段的返回值,並判斷是否有任何一個返回值不是我們想要的(例子中子事務的返回值是res1和res2),如果返回值不正確,就在 globalTransaction.Excecute方法內拋出一個異常。拋出去的異常會被sdk客戶端捕獲,並告知dtm應該對子事務進行回滾了,具體源碼如下:

 1 public async Task<string> Excecute(Func<Tcc,Task> tcc_cb, CancellationToken cancellationToken =default)
 2 {
 3     var tcc = new Tcc(this.dtmClient, await this.GenGid());
 4     
 5     var tbody = new TccBody
 6     { 
 7         Gid = tcc.Gid,
 8         Trans_Type ="tcc"
 9     };
10  
11 
12     try
13     {
14         await dtmClient.TccPrepare(tbody, cancellationToken);
15 
16         await tcc_cb(tcc);
17 
18         await dtmClient.TccSubmit(tbody, cancellationToken);
19     }
20     catch(Exception ex)
21     {
22         logger.LogError(ex,"submitting or abort global transaction error");
23         await this.dtmClient.TccAbort(tbody, cancellationToken);
24         return string.Empty;
25     }
26     return tcc.Gid;
27 }

catch塊中,this.dtmClient.TccAbort就是去告知dtm,應該回滾子事務。

關於Confirm和Cancel階段失敗后的重試

  try階段執行成功,所有子事務會執行Confirm的接口,如果try階段失敗,則所有子事務會執行Cancel階段的接口。如果Confirm或Cancel階段有任何子事務執行失敗,dtm則會不斷地去重試這個子事務,直至成功為止,所以Confirm或Cancel階段,進行“最大努力交付”,沒有回頭路可走。

  這里要說明一點的是,Confirm或Cancel子事務的執行是按順序的,如果有哪個子事務因為執行失敗而不斷的重試,那么其余未執行的子事務也會被它阻塞而無法執行。

  重試第一次會間隔10s,后續的重試時間都會翻倍,20 40 80 160這樣子的間隔。

  如果子事務在try階段是按照ab的順序執行,那么cancel階段則是按照ba的順序執行。

dtm的日志

dtm的日志還是很詳細的,包括對事務接口的調用還有數據庫的執行以及錯誤異常,容器內查看dtm日志的命令:

docker container logs dtm-api-1 --follow

dtm相關表

tcc事務設計的表如下圖:

其中trans_global是全局事務表,trans_branch是分支子事務表。注釋也都一目了然,sql如下:

 1 CREATE TABLE `trans_global` (
 2   `id` int(11) NOT NULL AUTO_INCREMENT,
 3   `gid` varchar(128) NOT NULL COMMENT '事務全局id',
 4   `trans_type` varchar(45) NOT NULL COMMENT '事務類型: saga | xa | tcc | msg',
 5   `status` varchar(12) NOT NULL COMMENT '全局事務的狀態 prepared | submitted | aborting | finished | rollbacked',
 6   `query_prepared` varchar(128) NOT NULL COMMENT 'prepared狀態事務的查詢api',
 7   `protocol` varchar(45) NOT NULL COMMENT '通信協議 http | grpc',
 8   `create_time` datetime DEFAULT NULL,
 9   `update_time` datetime DEFAULT NULL,
10   `commit_time` datetime DEFAULT NULL,
11   `finish_time` datetime DEFAULT NULL,
12   `rollback_time` datetime DEFAULT NULL,
13   `next_cron_interval` int(11) DEFAULT NULL COMMENT '下次定時處理的間隔',
14   `next_cron_time` datetime DEFAULT NULL COMMENT '下次定時處理的時間',
15   `owner` varchar(128) NOT NULL DEFAULT '' COMMENT '正在處理全局事務的鎖定者',
16   PRIMARY KEY (`id`),
17   UNIQUE KEY `gid` (`gid`),
18   KEY `owner` (`owner`),
19   KEY `create_time` (`create_time`),
20   KEY `update_time` (`update_time`),
21   KEY `status_next_cron_time` (`status`,`next_cron_time`) COMMENT '這個索引用於查詢超時的全局事務,能夠合理的走索引'
22 ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
23 
24 CREATE TABLE `trans_branch` (
25   `id` int(11) NOT NULL AUTO_INCREMENT,
26   `gid` varchar(128) NOT NULL COMMENT '事務全局id',
27   `url` varchar(128) NOT NULL COMMENT '動作關聯的url',
28   `data` text COMMENT '請求所攜帶的數據',
29   `branch_id` varchar(128) NOT NULL COMMENT '事務分支名稱',
30   `branch_type` varchar(45) NOT NULL COMMENT '事務分支類型 saga_action | saga_compensate | xa',
31   `status` varchar(45) NOT NULL COMMENT '步驟的狀態 submitted | finished | rollbacked',
32   `finish_time` datetime DEFAULT NULL,
33   `rollback_time` datetime DEFAULT NULL,
34   `create_time` datetime DEFAULT NULL,
35   `update_time` datetime DEFAULT NULL,
36   PRIMARY KEY (`id`),
37   UNIQUE KEY `gid_uniq` (`gid`,`branch_id`,`branch_type`),
38   KEY `create_time` (`create_time`),
39   KEY `update_time` (`update_time`)
40 ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4;

 最后附上dtm c#溝通微信群,有問題可以一起來討論:

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM