DDD設計中的Unitwork與DomainEvent如何相容?


最近在開發過程中,遇到了一個場景,甚是棘手,在這里分享一下。希望大家腦洞大開一起來想一下解決思路。鄙人也想了一個方案拿出來和大家一起探討一下是否合理。

一、簡單介紹一下涉及的對象概念

  工作單元:維護變化的對象列表,在整塊業務邏輯處理完全之后一次性寫入到數據庫中。

  領域事件:領域對象本身發生某些變化時,發布的通知事件,告訴訂閱者處理相關流程。

 

二、問題來了

  我認為最合理的領域事件的觸發點應該設計在領域對象內部,那么問題來了。當這個領域對象發生變化的上下文是一個復雜的業務場景,整個流程中會涉及到多個領域對象,所以需要通過工作單元來保證數據寫入的一致性。此時其中各個產生變化的領域對象的領域事件如果實時被發布出去,那么當工作單元在最終提交到數據庫時,如果產生了回滾,那么會導致發布了錯誤的領域事件,產生未知的后果。

 

三、問題分析

  我能夠想到的方案是,這里領域事件的發布也通過一個類似於工作單元一樣的概念進行持續的管理,在領域對象中的發布只是做一個記錄,只有在工作單元提交成功之后,才實際發布其中所有的領域事件。

 

四、說干就干  

實現類:

 1    public class DomainEventConsistentQueue : IDisposable
 2     {
 3         private readonly List<IDomainEvent> _domainEvents = new List<IDomainEvent>();
 4         private bool _publishing = false;
 5 
 6         public void RegisterEvent(IDomainEvent domainEvent)
 7         {
 8             if (_publishing)
 9             {
10                 throw new ApplicationException("當前事件一致性隊列已被發布,無法添加新的事件!");
11             }
12 
13             if (_domainEvents.Any(ent => ent == domainEvent))  //防止相同事件被重復添加
14                 return;
15             
16             _domainEvents.Add(domainEvent);
17         }
18 
19         public void Clear()
20         {
21             _domainEvents.Clear();
22             _publishing = false;
23         }
24 
25         public void PublishEvents()
26         {
27             if (_publishing)
28             {
29                 return;
30             }
31 
32             if (_domainEvents == null)
33                 return;
34 
35             try
36             {
37                 _publishing = true;
38                 foreach (var domainEvent in _domainEvents)
39                 {
40                     DomainEventBus.Instance().Publish(domainEvent);
41                 }
42             }
43             finally
44             {
45                 Clear();
46             }
47         }
48 
49         public void Dispose()
50         {
51             Clear();
52         }
53     }

使用方式:

 1             var aggregateA = new AggregateRootA();
 2             var aggregateB = new AggregateRootB();
 3 
 4             using (var queue = new DomainEventConsistentQueue())
 5             {
 6                 using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString))
 7                 {
 8                     aggregateA.Event(queue);
 9                     aggregateB.Event(queue);
10 
11                     var isSuccess = unitwork.Commit();
12                     if (isSuccess)
13                         queue.PublishEvents();
14                 }
15             }
16 
17 
18         public class AggregateRootA : AggregateRoot
19         {
20             public void Event(DomainEventConsistentQueue queue)
21             {
22                 queue.RegisterEvent(new DomainEventA());
23             }
24         }
25 
26         public class AggregateRootB : AggregateRoot
27         {
28             public void Event(DomainEventConsistentQueue queue)
29             {
30                 queue.RegisterEvent(new DomainEventB());
31             }
32         }
33 
34         public class DomainEventA : IDomainEvent
35         {
36             public DateTime OccurredOn()
37             {
38                 throw new NotImplementedException();
39             }
40 
41             public void Read()
42             {
43                 throw new NotImplementedException();
44             }
45 
46             public bool IsRead
47             {
48                 get { throw new NotImplementedException(); }
49             }
50         }
51 
52         public class DomainEventB : IDomainEvent
53         {
54             public DateTime OccurredOn()
55             {
56                 throw new NotImplementedException();
57             }
58 
59             public void Read()
60             {
61                 throw new NotImplementedException();
62             }
63 
64             public bool IsRead
65             {
66                 get { throw new NotImplementedException(); }
67             }
68         }

問題是解決了,但是標紅的這段代碼看着特別變扭,在產生領域事件的領域對象方法上需要增加一個與表達的業務無關的參數,這個大大破壞了DDD設計的初衷——統一語言(Ubiquitous Language),簡潔明了的表達出每個業務行為,業務交流應與代碼保持一致。像這2行表達起來如“AggregateRootA Event DomainEventConsistentQueue”這個 DomainEventConsistentQueue其實並不是領域對象,所以其並不是領域的一部分。

 

五、陷入思考

  這里突然想到,如果在運行中的每個線程的共享區域存儲待發布的領域事件集合,那么不就可以隨時隨地的管理當前操作上下文中的領域事件了嗎?這里需要引入ThreadLocal<T> 類。MSDN的解釋參見https://msdn.microsoft.com/zh-cn/library/dd642243(v=vs.110).aspx。該泛型類可以提供僅針對當前線程的全局存儲空間,正好能夠恰到好處的解決我們現在遇到的問題。

 

六、說改就改

實現類:

 1    public class DomainEventConsistentQueue : IDisposable
 2     {
 3         private static readonly ThreadLocal<List<IDomainEvent>> _domainEvents = new ThreadLocal<List<IDomainEvent>>();
 4         private static readonly ThreadLocal<bool> _publishing = new ThreadLocal<bool> { Value = false };
 5 
 6         private static DomainEventConsistentQueue _current;
 7         /// <summary>
 8         /// 獲取當前的領域事件一致性隊列。
 9         /// 由於使用了線程本地存儲變量,此處為單例模式。
10         /// </summary>
11         /// <returns></returns>
12         public static DomainEventConsistentQueue Current()
13         {
14             if (_current != null)
15                 return _current;
16             var temp = new DomainEventConsistentQueue();
17             Interlocked.CompareExchange(ref _current, temp, null);
18             return temp;
19         }
20 
21         public void RegisterEvent(IDomainEvent domainEvent)
22         {
23             if (_publishing.Value)
24             {
25                 throw new ApplicationException("當前事件一致性隊列已被發布,無法添加新的事件!");
26             }
27 
28             var domainEvents = _domainEvents.Value;
29             if (domainEvents == null)
30             {
31                 domainEvents = new List<IDomainEvent>();
32                 _domainEvents.Value = domainEvents;
33             }
34 
35             if (domainEvents.Any(ent => ent == domainEvent))  //防止相同事件被重復添加
36                 return;
37 
38             domainEvents.Add(domainEvent);
39         }
40 
41         public void Clear()
42         {
43             _domainEvents.Value = null;
44             _publishing.Value = false;
45         }
46 
47         public void PublishEvents()
48         {
49             if (_publishing.Value)
50             {
51                 return;
52             }
53 
54             if (_domainEvents.Value == null)
55                 return;
56 
57             try
58             {
59                 _publishing.Value = true;
60                 foreach (var domainEvent in _domainEvents.Value)
61                 {
62                     DomainEventBus.Instance().Publish(domainEvent);
63                 }
64             }
65             finally
66             {
67                 Clear();
68             }
69         }
70 
71         public void Dispose()
72         {
73             Clear();
74         }
75     }

使用方式:

 1             var aggregateA = new AggregateRootA();
 2             var aggregateB = new AggregateRootB();
 3 
 4             using (var queue = DomainEventConsistentQueue.Current())
 5             {
 6                 using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString))
 7                 {
 8                     aggregateA.Event();
 9                     aggregateB.Event();
10 
11                     var isSuccess = unitwork.Commit();
12                     if (isSuccess)
13                         queue.PublishEvents();
14                 }
15             }
16 
17         public class AggregateRootA : AggregateRoot
18         {
19             public void Event()
20             {
21                 DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventA());
22             }
23         }
24 
25         public class AggregateRootB : AggregateRoot
26         {
27             public void Event()
28             {
29                 DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventB());
30             }
31         }
32 
33         public class DomainEventA : IDomainEvent
34         {
35             public DateTime OccurredOn()
36             {
37                 throw new NotImplementedException();
38             }
39 
40             public void Read()
41             {
42                 throw new NotImplementedException();
43             }
44 
45             public bool IsRead
46             {
47                 get { throw new NotImplementedException(); }
48             }
49         }
50 
51         public class DomainEventB : IDomainEvent
52         {
53             public DateTime OccurredOn()
54             {
55                 throw new NotImplementedException();
56             }
57 
58             public void Read()
59             {
60                 throw new NotImplementedException();
61             }
62 
63             public bool IsRead
64             {
65                 get { throw new NotImplementedException(); }
66             }
67         }

這樣代碼看起來比之前優雅多了。這里的 DomainEventConsistentQueue.Current() 中操作的變量針對同一個線程在哪都是共享的,所以我們只管往里丟數據就好了~

 

七、方案的局限性。

  對於執行上下文的要求較高,整個領域事件的發布必須要求在同一線程內操作。所以在使用的過程中盡量避免這種情況的發生。如果實在無法避免只能通過把DomainEventConsistentQueue 當作變量在多個線程之間傳遞了。

 

以上是個人的想法,可能有所考慮不周~ 不知道各位園子里的小伙伴們是否有處理過類似場景的經驗,歡迎留言探討,相互學習~

  

作者:  Zachary
出處:https://zacharyfan.com/archives/61.html

 

 

▶關於作者:張帆(Zachary,個人微信號:Zachary-ZF)。堅持用心打磨每一篇高質量原創。歡迎掃描右側的二維碼~。

定期發表原創內容:架構設計丨分布式系統丨產品丨運營丨一些思考。

 

如果你是初級程序員,想提升但不知道如何下手。又或者做程序員多年,陷入了一些瓶頸想拓寬一下視野。歡迎關注我的公眾號「跨界架構師」,回復「技術」,送你一份我長期收集和整理的思維導圖。

如果你是運營,面對不斷變化的市場束手無策。又或者想了解主流的運營策略,以豐富自己的“倉庫”。歡迎關注我的公眾號「跨界架構師」,回復「運營」,送你一份我長期收集和整理的思維導圖。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM