上篇中說到了面臨的問題(傳送門:DDD設計中的Unitwork與DomainEvent如何相容?),和當時實現的一個解決方案。在實際使用了幾天后,有了新的思路,和@trunks 兄提出的觀點類似。下面且聽我娓娓道來。
一、回顧
先回顧一下,代碼中的核心類。
DomainEventConsistentQueue : 用於把多個領域事件放到一個集合中,批量進行實際的發布操作。
SqlServerUnitOfWork : 基於SQL SERVER的工作單元實現。
上篇最終的編碼效果。
var aggregateA = new AggregateRootA(); //從倉儲中獲取 var aggregateB = new AggregateRootB(); //從倉儲中獲取 using (var queue = DomainEventConsistentQueue.Current()) { using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString)) { aggregateA.Event(); unitwork.RegisterModfied(aggregateA); aggregateB.Event(); unitwork.RegisterModfied(aggregateA); var isSuccess = unitwork.Commit(); if (isSuccess) queue.PublishEvents(); } } public class AggregateRootA : AggregateRoot { public void Event() { DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventA()); } } public class AggregateRootB : AggregateRoot { public void Event() { DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventB()); } } public class DomainEventA : IDomainEvent { } public class DomainEventB : IDomainEvent { }
二、問題
1.其中紅色標識出來的代碼顯得與整個上下文格格不入,此處是應用層中的一個跨多個聚合根的業務處理操作。對於編碼業務邏輯的人來說,其實沒有必要去管理整個領域事件如何發布,因為領域事件本身表達的就是已經發生的事情,所以概念上是在數據已經完成修改后給我成功發布出去就行。那么此處標記出的代碼顯得有點多余,因為這里需要編碼人員去管理領域事件的發布。
2.其中橙色標識出來的代碼的副作用很大,導致所有調用此方法發布的領域事件都得通過一致性隊列進行批量發布。哪怕是單個聚合根的操作,也都得在外層加個 using (var queue = DomainEventConsistentQueue.Current())。這樣的方式與常規的DomainEventBus.Instance().Publish方式產生了差異,讓編碼業務代碼的人多了一份職責,去決定此處加不加using (var queue = DomainEventConsistentQueue.Current())。
三、解決方案
此時我想到的方案是,把工作單元的生命周期提煉出來作為執行上下文中的一個概念。這樣可以使用類似Thread.CurrentThread這樣的方式來在任何地方獲取到當前的工作單元。有了這個可以做2件事:
①根據當前是否處於工作單元的環境中來處理領域事件的發布方式。這樣可以隱藏起直接發布還是通過DomainEventConsistentQueue來發布的邏輯。
②在工作單元中拋出必要的事件,如(提交事件、回滾事件),通過注冊其事件來關聯DomainEventConsistentQueue的發布操作。
四、進行改造
1.先定義一個執行上下文。
public class ExcutingContext { private static readonly ThreadLocal<IUnitOfWork> _unitWork = new ThreadLocal<IUnitOfWork>(); public static UnitOfWork UseSqlServerUnitOfWork(string dbConnectString) { if (_unitWork.Value != null) throw new ApplicationException("當前線程已經啟動了一個工作單元"); var unitWork = new SqlServerUnitOfWork(dbConnectString); _unitWork.Value = unitWork; unitWork.CommittedEvent += CommittedEventHandle; unitWork.RollBackEvent += RollBackEventHandle; return unitWork; } public static UnitOfWork GetCurrentUnitOfWork() { return _unitWork.Value as UnitOfWork; } private static void CommittedEventHandle(bool isSuccess) { _unitWork.Value = null; } private static void RollBackEventHandle() { _unitWork.Value = null; } }
2.改造DomainEventBus的發布方法
public void Publish<T>(T aDomainEvent) where T : IDomainEvent { if (aDomainEvent.IsRead) return; var unitOfWork = ExcutingContext.GetCurrentUnitOfWork(); if (unitOfWork != null) //工作單元環境 { var domainEventConsistentQueue = DomainEventConsistentQueue.Current(); if (domainEventConsistentQueue.IsEmpty()) { unitOfWork.CommittedEvent += AutoPublishDomainEventConsistentQueue; unitOfWork.RollBackEvent += domainEventConsistentQueue.Dispose; } domainEventConsistentQueue.RegisterEvent(aDomainEvent); return; } var registeredSubscribers = _subscribers; if (registeredSubscribers != null) { var domainEventType = aDomainEvent.GetType(); List<IDomainEventSubscriber> subscribers; if (!registeredSubscribers.TryGetValue(domainEventType, out subscribers)) { aDomainEvent.Read(); //未找到訂閱者,但是消息還是消費掉。 return; } foreach (var domainEventSubscriber in subscribers) { var subscribedTo = domainEventSubscriber.SubscribedToEventType(); if (subscribedTo == domainEventType || subscribedTo is IDomainEvent) { Distribute(domainEventSubscriber, aDomainEvent); } } aDomainEvent.Read(); } } private void AutoPublishDomainEventConsistentQueue(bool isSuccess) { if (isSuccess) DomainEventConsistentQueue.Current().PublishEvents(); }
這里有一點要說明一下,因為這里的2個注冊CommittedEvent的事件,AutoPublishDomainEventConsistentQueue的注冊在CommittedEventHandle之后,所以當DomainEventConsistentQueue中調用Publish方法時ExcutingContext.GetCurrentUnitOfWork()已經獲取到null了,就會進入到實際的發布操作。
五、使用方式
var aggregateA = new AggregateRootA(); //從倉儲中獲取 var aggregateB = new AggregateRootB(); //從倉儲中獲取 using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString)) { aggregateA.Event(); unitwork.RegisterModfied(aggregateA); aggregateB.Event(); unitwork.RegisterModfied(aggregateA); var isSuccess = unitwork.Commit(); } public class AggregateRootA : AggregateRoot { public void Event() { DomainEventBus.Instance().Publish(new DomainEventA()); } } public class AggregateRootB : AggregateRoot { public void Event() { DomainEventBus.Instance().Publish(new DomainEventB()); } } public class DomainEventA : IDomainEvent { } public class DomainEventB : IDomainEvent { }
這樣代碼又精簡了些,並且隱藏了領域事件的實際發布過程,業務編碼時無需關注領域事件是如何發布的。
歡迎大家繼續探討~
作者: Zachary
出處:https://zacharyfan.com/archives/81.html
▶關於作者:張帆(Zachary,個人微信號:Zachary-ZF)。堅持用心打磨每一篇高質量原創。歡迎掃描右側的二維碼~。
定期發表原創內容:架構設計丨分布式系統丨產品丨運營丨一些思考。
如果你是初級程序員,想提升但不知道如何下手。又或者做程序員多年,陷入了一些瓶頸想拓寬一下視野。歡迎關注我的公眾號「跨界架構師」,回復「技術」,送你一份我長期收集和整理的思維導圖。
如果你是運營,面對不斷變化的市場束手無策。又或者想了解主流的運營策略,以豐富自己的“倉庫”。歡迎關注我的公眾號「跨界架構師」,回復「運營」,送你一份我長期收集和整理的思維導圖。