在領域驅動設計中,由於領域邊界的存在,以往的分層設計中業務會按照其固有的領域知識被切分到不同的限界中,並且引入了領域事件這一概念來降低單個業務的復雜度,通過非耦合的事件驅動來完成復雜的業務。但是事件驅動帶來了一些新的問題,由於以往一個原子性極強的邏輯被拆散到了一個一個小的領域中,原子性事務數據的強一致性無法被保證。為了解決這個問題,一般會采用事務補償的方式來確保最終一致。
事務補償機制有多種實現方式,有基於數據庫自帶的基於2PC的XA協議、也有在邏輯層通過TCC實現,抑或采用多個本地事務組合的方式來實現。
當我們采用多個本地事務組合去進行業務處理時,由於業務其本身的復雜性,往往需要在多個事務中協調。而事件協調器(saga)就是一個專門降低其復雜度的設計,開發人員原則上只需要將事件和補償按照一定順序注冊到協調處理器中,原則上協調器會按照注冊的事件依次執行,若出現事件執行失敗時,也會按照補償列表進行相應的回滾。在微軟其開源項目eshopcontainer中就提出了process manager概念。通過一個process manager來協調多個微服務之間的事務。
今天我們就通過一些簡單的代碼設計,來還原一個簡易的基於事件總線的協調器。
廢話不多說,先上代碼:https://github.com/sd797994/EventCoordinator
解決方案包含兩個項目(TargetFramework為.net5,如果你沒有安裝.net,可以改成netcoreapp3.1),一個是演示用的webapi demo。包含基本的控制器和一組事件及事件訂閱處理服務。演示項目流程如下:
客戶端訪問接口下訂單->發布訂單創建事件->訂單預創建訂閱處理器創建訂單編號->發布訂單預創建成功事件->商品扣除訂閱處理器訂閱訂單預創建成功事件並進行商品庫存扣除->發布庫存扣除成功事件->用戶余額扣扣除訂閱處理器訂閱庫存扣除成功事件並執行用戶余額扣除->發布用戶余額扣除成功事件->訂單創建訂閱處理器訂閱用戶余額扣除成功事件創建訂單。若其中每一步處理失敗,則依次進行回滾。
若一切正常,則流程結果執行如下:
模擬最后一步訂單創建失敗時,全部回滾的結果如下:
模擬訂單創建失敗用戶金額回滾成功商品庫存回滾異常時,結果如下(在真實的業務場景中出現此類情況應該進行系統預警人工處理,我這里采用logger.error模擬警戒級別):
下面我們來看看實現思路和代碼,打開EventCoordinator項目,我們可以看到事件總線/事件協調器/通用三個文件夾。其中通用類是一些幫助方法不再贅述,事件也是基於System.Threading.Channels的簡易異步事件總線實現,也不再贅述。主要說說協調器。
協調器的核心主要是EventProcessManager.cs以及EventProcessManagerPipline.cs以及ProcessConfigure.cs三個文件來實現的。我的設計邏輯如下:EventProcessManager管理所有的流程性事務。所以其包含長事務的注冊和啟動邏輯。而事務注冊實際上就是一個構造委托代理的過程,我把它命名為ProcessConfigure,通過創建一個流程配置實例,將向EventProcessManager注冊的委托作為“配置”的一部分創建其對應的方法委托,再在具體執行流程時從隊列中取出配置並執行其的excute方法來發布事件,並且通過托管委托的方式在代理訂閱器里執行真正的委托。而所有的入隊、入棧我創建了一個EventProcessManagerPipline實例來保存我們的ProcessConfigure,這個管道包含一個隊列和一個棧,由於事件是按照先進先出的方式執行,所以事件委托創建的配置會以隊列的方式保存。而補償則是按照先進后出的方式執行,所以補償委托創建的配置會以棧的方式保存。而整個管道事務流轉的核心均在EventProcessManagerPipline的Start方法中。
Start的核心邏輯如下:啟動一個流程,在一個while循環中 從當前的隊列中彈出一個事件配置。執行事件配置的send方法,並且通過AutoResetEvent的方式阻塞等待信號。當真正的業務委托執行完畢后會觸發代理的AutoResetEvent.set(),如果業務執行callnext,則會將當前事件處理的結果作為callbackevent的一部分存儲在緩存里,方便順序回滾。如果執行callback,則直接執行回滾。如果所有的事件/補償執行完畢,則流程執行完畢。
結語:整個代碼其實比較簡單,僅僅是我對長事務治理思想的一個粗淺理解,可能有誤,懇請評論區大佬指正。。。