前幾篇隨筆中討論了CQRS中的Command,本篇隨筆中將討論CQRS中的領域事件(Domain Event)。
概念
先回顧下CQRS中一個UI操作的執行過程:
首先,用戶在UI中點擊一個按鈕,繼而UI層構造了一個相應的Command對象並放到CommandBus中執行,在Command的執行過程中,領域模型中的類和方法得到調用,而領域事件,正是在此時產生的,之所以稱之為“領域”事件,也正是因為它產生於領域模型。這可以用下面這張圖來說明(先忽略UnitOfWorkContext):
從上圖也可以看出,領域模型的調用被“包裹”在Command的執行上下文中,所以,UI層的所有操作都只是創建Command,再把Command丟給CommandBus,而不會直接調用領域模型中的類和方法。
基本實現
領域事件的實現和Command的實現多少有些類似,但需要注意的是,每個Command只能對應一個CommandExecutor,而一個領域事件卻可以綁定多個事件處理器(EventHandler)。下面是一個初始版本的實現代碼:

/// <summary> /// 標記接口(Marker Interface),所有領域事件都要實現該接口。 /// </summary> public interface IEvent { } /// <summary> /// 事件處理器接口,所有事件處理器都要實現該接口。 /// </summary> public interface IEventHandler<in TEvent> where TEvent : IEvent { /// <summary> /// 處理事件。 /// </summary> void Handle(TEvent evnt); } /// <summary> /// /// </summary> public static class EventBus { public static void Publish<TEvent>(TEvent evnt) where TEvent : IEvent { // 獲取所有綁定到傳入事件類型的事件處理器,遍歷執行 } }
假設現在有個需求:一個網上書店新進了一本書,我們希望在將這本書添加到數據庫中后,發送該新書的促銷郵件給注冊用戶。
那我們在實現時就會有一個AddBookCommand,這個Command執行時,會觸發BookAddedEvent,而系統中會有一個EventHandler綁定到這個事件,它會將新書的信息通過郵件發送給注冊用戶(可以有其它的Event Handler,比如用於更新網站的統計信息的Event Handler)。
AddBookCommand就不再贅述,詳情可參考《Command的實現》一文,BookAddedEvent的代碼如下所示,它實現了IEvent接口:

public class BookAddedEvent : IEvent { public string BookISBN { get; set; } public decimal Price { get; set; } }
對應的BookAddedEventHandler的代碼如下:

public class BookAddedEventHandler : IEventHandler<BookAddedEvent> { public void Handle(BookAddedEvent evnt) { var msg = "新書到!ISBN: " + evnt.BookISBN + ", 價格: " + evnt.Price; // 這里發送郵件到各用戶 } }
事件和命令不同,命令表達的是一個將要執行的操作,而事件表達的是一個發生過的事情,所以事件類的命名采用過去式。和命令一樣的是,領域事件的類名有很清晰的語義,所以《Command的實現》一文中的“千萬不要隨意復用"的原則同樣適用於領域事件。
改進實現
上面的實現忽略了一個很重要的問題:在BookAddedEventHandler中向用戶發送了郵件,而領域事件是在領域模型的調用過程中產生的,這也就意味着在發送郵件的時候數據庫事務還沒有提交。如果數據庫事務提交失敗了呢?新書沒有添加進來,但郵件卻發送出去了,這是無法接受的。所以,對於發送郵件之類的Event Handler,我們要保證它們在事務提交成功后才被執行。
但我們又不能把所有的Event Handler放到事件提交成功后執行,比如添加一本新書后,我們要將網站統計信息中的圖書總數加一(假設統計信息用一張表來存放,這個統計信息可以認為是CQRS中的ReadModel),這時候圖書總數的增加則要和書的添加處於同一個數據庫事務中,對於這種EventHandler又需要在領域事件觸發時馬上執行。
因此我們可以對上面的領域事件實現做一點改造:把EventHandler分為兩種,一種是普通的直接執行的EventHandler,一種是數據庫事務提交成功后才執行的PostCommitEventHandler。我們再引入UnitOfWorkContext,在Command開始執行時,我們創建一個新的UnitOfWorkContext對象,它在Command執行過程中會一直存在。在Command執行過程中,一旦有領域事件被觸發,我們就馬上執行所有綁定到該事件的普通EventHandler,再將該事件添加到當前的UnitOfWorkContext中,在UnitOfWork提交成功后,遍歷所有當前UnitOfWorkContext中所有添加進來的領域事件,逐一執行相應的PostCommitEventHandler,Command執行結束后關閉當前的UnitOfWorkContext。相關代碼如下(完整代碼見文末中提到的Taro項目):
首先我們要把EventHandler接口分為兩個:IEventHandler<TEvent>和IPostCommitEventHandler<TEvent>:

public interface IEventHandler<in TEvent> where TEvent : IEvent { /// <summary> /// 處理事件。 /// </summary> void Handle(TEvent evnt); } public interface IPostCommitEventHandler<in TEvent> where TEvent : IEvent { void Handle(TEvent evnt); }
然后是UnitOfWork(已去除了不重要的代碼):

public abstract class AbstractUnitOfWork : IUnitOfWork { // IEventHandlerFinder用於獲取所有綁定到事件的EventHandler private IEventHandlerFinder _eventHandlerFinder; public ICollection<IEvent> UncommittedEvents { get; private set; } public void Commit() { CommitChanges(); // 在提交數據庫事務后,執行所有IPostCommitEventHandler InvokePostCommitHandlers(); } protected abstract void CommitChanges(); protected virtual void InvokePostCommitHandlers() { // 遍歷領域事件,執行相應的IPostCommitEventHandler foreach (var evnt in UncommittedEvents) { foreach (var handler in _eventHandlerFinder.FindPostCommitHandlers(evnt)) { EventHandlerInvoker.Invoke(handler, evnt); } } UncommittedEvents.Clear(); } }
然后是UnitOfWorkContext,它很簡單,采用ThreadStatic實現:

public static class UnitOfWorkContext { [ThreadStatic] private static IUnitOfWork _current; public static IUnitOfWork Current { get { return _current; } } public static void Open(IUnitOfWork unitOfWork) { _current = unitOfWork; } public static void Close() { _current = null; } }
我們還需要一個AbstractCommandExecutor抽象基類來控制UnitOfWorkContext的開啟和關閉(所有CommandExecutor都要繼承AbstractCommandExecutor):

public abstract class AbstractCommandExecutor<TCommand> : ICommandExecutor<TCommand> where TCommand : ICommand { protected Func<IUnitOfWork> GetUnitOfWork { get; private set; } protected AbstractCommandExecutor(Func<IUnitOfWork> getUnitOfWork) { Require.NotNull(getUnitOfWork, "getUnitOfWork"); GetUnitOfWork = getUnitOfWork; } public void Execute(TCommand cmd) { using (var uow = GetUnitOfWork()) { UnitOfWorkContext.Open(uow); try { Execute(uow, cmd); } finally { UnitOfWorkContext.Close(); } } } protected abstract void Execute(IUnitOfWork unitOfWork, TCommand cmd); }
為了讓領域模型方便觸發領域事件,我們添加一個DomainEvent的靜態類,它的Apply方法中先執行普通的EventHandler,再將事件添加到當前的UnitOfWorkContext中:

public static class DomainEvent { public static void Apply<TEvent>(TEvent evnt) where TEvent : IEvent { var handlerFinder = EventHandlerFinders.Current; // 找到所有綁定到該事件的普通EventHandler並執行 foreach (var handler in handlerFinder.FindPreCommitHandlers(evnt)) { EventHandlerInvoker.Invoke(handler, evnt); } var unitOfWork = UnitOfWorkContext.Current; if (unitOfWork == null) throw new InvalidOperationException("Current unit of work context is null. Domain events can only be applied inside a unit of work context."); // 將領域事件添加到當前UnitOfWorkContext中 unitOfWork.UncommittedEvents.Add(evnt); } }
最后在領域模型中,我們可以如下調用:
// 圖書倉庫,偽代碼 public class BookWarehouse { public IList<Book> Books { get; private set; } public void AddBook(Book book) { Books.Add(book); // 觸發領域事件 DomainEvent.Apply(new BookAddedEvent(book)); } }
總結
本文討論了領域事件的基本概念及其一個基本實現,在其之上又做了進一步的改進以解決部分EventHandler需要在數據庫事務提交成功后才可以執行的問題。
領域事件的引入有着極其重要的意義,它使領域模型變得更加純凈,並將不同的邏輯進行了解耦,如果沒有領域事件,那新書入庫的代碼和發送促銷郵件以及更改網站統計信息的代碼都要耦合在一起。領域事件並非CQRS的私有品,它可以脫離CQRS,作為輕量級的組件而獨立存在。
到此為止,CQRS中的主要組成部分已討論完畢,至於其它的進階主題,如Event Sourcing、事件的異步分發等將不在本系列中討論。在接下來的隨筆中,我們將以一個迷你的CQRS框架(Taro)和一個可運行的網上書店(BookStore)示例項目來展示如何在實際項目中應用CQRS。
歡迎討論。