通過Dapr實現一個簡單的基於.net的微服務電商系統(十九)——分布式事務之Saga模式


在之前的系列文章中聊過分布式事務的一種實現方案,即通過在集群中暴露actor服務來實現分布式事務的本地原子化。但是actor服務本身有其特殊性,場景上並不通用。所以今天來講講分布式事務實現方案之saga模式,並在文后附上代碼供各位讀者參考,評論。

目錄:
一、通過Dapr實現一個簡單的基於.net的微服務電商系統

二、通過Dapr實現一個簡單的基於.net的微服務電商系統(二)——通訊框架講解

三、通過Dapr實現一個簡單的基於.net的微服務電商系統(三)——一步一步教你如何擼Dapr

四、通過Dapr實現一個簡單的基於.net的微服務電商系統(四)——一步一步教你如何擼Dapr之訂閱發布

五、通過Dapr實現一個簡單的基於.net的微服務電商系統(五)——一步一步教你如何擼Dapr之狀態管理

六、通過Dapr實現一個簡單的基於.net的微服務電商系統(六)——一步一步教你如何擼Dapr之Actor服務

七、通過Dapr實現一個簡單的基於.net的微服務電商系統(七)——一步一步教你如何擼Dapr之服務限流

八、通過Dapr實現一個簡單的基於.net的微服務電商系統(八)——一步一步教你如何擼Dapr之鏈路追蹤

九、通過Dapr實現一個簡單的基於.net的微服務電商系統(九)——一步一步教你如何擼Dapr之OAuth2授權 && 百度版Oauth2

十、通過Dapr實現一個簡單的基於.net的微服務電商系統(十)——一步一步教你如何擼Dapr之綁定

十一、通過Dapr實現一個簡單的基於.net的微服務電商系統(十一)——一步一步教你如何擼Dapr之自動擴/縮容

十二、通過Dapr實現一個簡單的基於.net的微服務電商系統(十二)——istio+dapr構建多運行時服務網格

十三、通過Dapr實現一個簡單的基於.net的微服務電商系統(十三)——istio+dapr構建多運行時服務網格之生產環境部署

十四、通過Dapr實現一個簡單的基於.net的微服務電商系統(十四)——開發環境容器調試小技巧

十五、通過Dapr實現一個簡單的基於.net的微服務電商系統(十五)——集中式接口文檔實現

十六、通過Dapr實現一個簡單的基於.net的微服務電商系統(十六)——dapr+sentinel中間件實現服務保護

十七、通過Dapr實現一個簡單的基於.net的微服務電商系統(十七)——服務保護之動態配置與熱重載

十八、通過Dapr實現一個簡單的基於.net的微服務電商系統(十八)——服務保護之多級緩存

十九、通過Dapr實現一個簡單的基於.net的微服務電商系統(十九)——分布式事務之Saga模式

二十、通過Dapr實現一個簡單的基於.net的微服務電商系統(二十)——Saga框架實現思路分享

附錄:(如果你覺得對你有用,請給個star)
一、電商Demo地址

二、通訊框架地址

三、Saga框架地址

一、什么是Saga

        在領域驅動設計中,由於領域邊界的存在,以往的分層設計中業務會按照其固有的領域知識被切分到不同的限界中,並且引入了領域事件這一概念來降低單個業務的復雜度,通過非耦合的事件驅動來完成復雜的業務。但是事件驅動帶來了一些新的問題,由於以往一個原子性極強的邏輯被拆散到了一個一個小的領域中,原子性事務數據的強一致性無法被保證。為了解決這個問題,一般會采用事務補償的方式來確保最終一致。

        當我們采用多個本地事務組合去進行業務處理時,由於業務其本身的復雜性,往往需要在多個事務中協調。而事件協調器(saga)就是一個專門降低其復雜度的設計,開發人員原則上只需要將事件和補償按照一定順序注冊到協調處理器中,原則上協調器會按照注冊的事件依次執行,若出現事件執行失敗時,也會按照補償列表進行相應的回滾。

        去年曾經在一篇文章里聊過Saga,鏈接在此。相比去年的那個小DEMO。我在此基礎上基於Dapr的狀態管理完成了另外一個開源項目Oxygen-Saga,地址:https://github.com/sd797994/Oxygen-Saga。當然這個項目既可以引入到dapr的環境中完成Saga事務,本身也可以獨立的基於rabbitmq+redis來完成非Dapr環境下的Saga事務。今天主要講解一下如何在Dapr環境下引入Saga來實現一個分布式事務。

  首先我們還是看看目前的一個訂單下單邏輯,可以看到基於Actor服務,我們的請求是在訂單服務里直連商品服務Actor完成庫存減扣的,並通過Actor的周期性持久化機制完成事務落庫,所以實際上是把分布式事務的復雜性通過Actor屏蔽掉了,並沒有涉及到真正的分布式事務。

  從代碼層面也可以看到,整個事務其實是在同一個方法里以同步調用Actor的方式完成的。

        #region 私有遠程服務包裝器方法
        async Task<List<OrderGoodsSnapshot>> GetGoodsListByIds(IEnumerable<Guid> input)
        {
            return (await goodsQueryService.GetGoodsListByIds(new GetGoodsListByIdsDto(input))).GetData<List<OrderGoodsSnapshot>>();
        }
        async Task<bool> DeductionGoodsStock(CreateOrderDeductionGoodsStockDto input)
        {
            var data = input.CopyTo<CreateOrderDeductionGoodsStockDto, DeductionStockDto>();
            data.ActorId = input.GoodsId.ToString();
            return (await goodsActorService.DeductionGoodsStock(data)).GetData<bool>();
        }
        async Task<bool> UnDeductionGoodsStock(CreateOrderDeductionGoodsStockDto input)
        {
            var data = input.CopyTo<CreateOrderDeductionGoodsStockDto, DeductionStockDto>();
            data.ActorId = input.GoodsId.ToString();
            return (await goodsActorService.UnDeductionGoodsStock(data)).GetData<bool>();
        }
        #endregion
        public async Task<ApiResult> CreateOrder(OrderCreateDto input)
        {
            var mockUser = (await accountQueryService.GetMockAccount()).GetData<CurrentUser>();
            //申明一個創建訂單領域服務實例,將遠程rpc調用作為匿名函數傳遞進去
            var createOrderService = new CreateOrderService(GetGoodsListByIds, DeductionGoodsStock, UnDeductionGoodsStock);
            return await ApiResult.Ok("訂單創建成功!").RunAsync(async () =>
            {
                var order = await createOrderService.CreateOrder(mockUser.Id, mockUser.UserName, mockUser.Address, mockUser.Tel, input.Items.CopyTo<OrderCreateDto.OrderCreateItemDto, OrderItem>().ToList());//通過訂單服務創建訂單
                repository.Add(order);
                if (await new CheckOrderCanCreateSpecification(repository).IsSatisfiedBy(order))
                    await unitofWork.CommitAsync();
                await eventBus.SendEvent(EventTopicDictionary.Order.CreateOrderSucc, new OperateOrderSuccessEvent(order, mockUser.UserName));//發送訂單創建成功事件
            },
            //失敗回滾
            createOrderService.UnCreateOrder);
        }

  而在Saga方案里就變得不一樣了,我們需要借助Saga的流程管理器在多個實例中流轉我們的事務,通過事件訂閱和發布來最終完成一個分布式事務。具體的調用流程如下:

可以看到整個流程完全依賴於SagaManger來提供對應的調用策略,而作為開發人員只需要為每個策略提供對應的委托函數,對應的具體流程如下:

  一、首先是訂單服務和商品服務在系統初始化時需要注冊對應的配置文件到本地Saga管理器,並且需要提供對應的流程處理委托(包括業務事件委托+補償事件委托+異常處理委托)。

  二、在客戶發起一個訂單時,只需要創建一個Saga流程實例,剩下的就交給Saga,它會自動幫我們流轉整個業務邏輯而無需人工插手。

二、talk is cheap, show me the code

首先我們需要為整個訂單創建流程設計一組Topic:

接着我們創建一個Saga配置注冊這組topic進去,流程比較簡單,第一步是扣除庫存,下一步是創建訂單,補償事件只有一個就是庫存回滾:

再然后我們需要創建這些Topic對應的事件處理器:

訂單服務:

商品服務: 

接着我們在各自的服務里去實現它們:

商品服務:

訂單服務:

然后我們在訂單用例里創建一個Action用於啟動saga流程(注意是第二個方法):

 最后我們在商品和訂單服務中引入saga組件並注冊事件和異常回調委托(注冊代碼相似,此處僅展示其中一個服務的):

接着我們修改m站的創建訂單接口,修改為新的Saga流程接口,然后編譯整個項目,啟動並測試一下:

 

 

 可以看到整個流程被順利的通過Saga管理器流轉完畢。接着我們嘗試注入一個故障,看看能否正常的被異常訂閱器捕獲到:

 編譯后再測試一下,我們可以看到事件異常訂閱器里已經成功捕獲到了這個異常:

 接着我們在創建訂單注入一個異常,讓商品庫存進行回滾操作,這里由於下單后異常商品補償所以在界面上是體現不出來的(當然在真實的業務場景中一般是下單5秒等待后查詢訂單創建情況,或發送一條站內信告知下單失敗),這里我們通過打印控制台來演示:

接着我們運行並下單,追蹤商品服務的日志:

可以看到由於訂單創建失敗,saga觸發了補償事件並成功執行了補償。好了,關於saga的演示就到這里,可嘗試拉取最新的商城源碼執行即可看到效果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM