前言
經過不斷的堅持和努力,ENode 2.0的第一個真實案例終於出來了。這個案例是一個簡易的論壇,開發這個論壇的初衷是為了驗證用ENode框架來開發一個真實項目的可行性。目前這個論壇在UI上是使用了最終一致性,也就是說當我們發帖或回帖后不會立馬顯示你的帖子或回復。當我們下一次刷新頁面時,會顯示出來。這點貌似很多人向我反饋不太習慣,接受不了,呵呵。我之所以這樣做也是想看看最終一致性大家的接受程度如何,看來UI層面上的最終一致性,大部分人接受不了。回頭我改進下效果,改為立即可以看到帖子或回復吧!另外,關於ENode是什么,本文就不多介紹了,可以參考這篇文章的介紹。本文重點介紹一下ENode是如何幫助我們開發一個基於DDD+CQRS+Event Sourcing架構的應用程序的。這個論壇使用到了ENode, EQueue兩個框架,EQueue是一個分布式的消息隊列組件,該組件的主體思想是參考阿里的RocketMQ。當我們使用EQueue時,面向的不是Queue,而是Topic。EQueue也是完全用C#實現的,關於EQueue詳細介紹,大家可以看一下這篇文章。
ENode, EQueue, Forum 開源項目地址
- ENode開源地址:https://github.com/tangxuehua/enode
- EQueue開源地址:https://github.com/tangxuehua/equeue
- ECommon開源地址:https://github.com/tangxuehua/ecommon
- Forum開源地址:https://github.com/tangxuehua/forum
- Forum論壇線上地址(臨時域名,以后會改為enode.me):http://www.enode.me
另外,項目中如果要開發引用程序集,可以通過Nuget來獲取,輸入關鍵字ENode就能看到所有相關的Package了,如下圖所示:
Forum總體架構分析
Forum采用DDD+CQRS+Event Sourcing的架構。借助於ENode,使得Forum本身無須再做技術架構方面的設計了,直接使用ENode就能完成這種架構。所以我們只要明白了ENode的架構,就知道這個Forum的架構是怎樣的了。以下是ENode的架構圖(已經理解了這個圖的朋友請直接跳過這一節):
上圖是一個CQRS架構的數據流向圖。UI請求會分為兩類:Command和Query。
Command用於寫數據,Query用於讀數據,寫數據和讀數據完全采用不同的架構實現。寫數據支持同步和異步的方式,讀數據完全走簡單高效思路來實現。當我們要對系統做寫操作時,如果你是用ASP.NET MVC來開發站點,那就可以在Controller中創建並發送一個Command即可。該Command會被發送到消息隊列(EQueue中),然后消息隊列的訂閱方,也就是處理Command的進程會拉取這些Command,然后調用Command Handler完成Command的處理;Command Handler處理Command時,是調用Domain的方法來完成相關的業務邏輯操作。Domain就是DDD中的領域層,負責實現整個系統的業務邏輯。 然后由於是Event Sourcing的架構,所以Domain中任何聚合根的修改都會產生相應的領域事件(Domain Event),領域事件會先被持久化到EventStore中,持久化如果沒有遇到並發沖突,成功后,則會被發布(Publish)到消息隊列(EQueue中),然后消息隊列的訂閱方,也就是處理Domain Event的進程會拉取這些Domain Event,然后調用相關的Event Handler做相關的更新,比如有些Event Handler是會更新讀庫(Read DB),有些是會產生新的Command,這種我把它叫做流程管理器(Process Manager,也有人叫做Saga)。當我們有時一個業務場景需要涉及到多個聚合根的修改時,我們會需要用到Process Manager。Process Manager負責對流程進行建模,它的原理是基於事件驅動的流程實現。Process Manager處理事件,然后產生響應的Command,從而完成聚合根之間的交互。一般一個流程,我們會設計一個流程聚合根以及其他的參與該流程的聚合根,Process Manager則是用於負責協調這些聚合根之間的交互。具體的例子可以看一下ENode源代碼中的BankTransferSample。
關於Query端,由於都是查詢,這些查詢都是用於UI展示數據或者為第三方接口提供數據為目的,查詢對系統無副作用。我們可以用我們自己任意喜歡的方式來實現Query端。查詢面向的是Read DB。上面提到,Read DB中的數據是通過Event Handler(老外叫Denormalizer)來更新的。
所以我們可以看到,整個架構中,Command端和Query端的數據源是完全分離的。Command端最后的結果就是Domain Event,Domain Event是持久化在Event Store中的;Query端的數據源就是Read DB,一般可以用關系型數據庫來作為存儲。CQ兩端的數據同步通過Domain Event來實現。
上圖的CQRS架構最大的好處是在架構級別以及數據存儲級別,把讀寫都分離了。這樣我們可以方便的對讀或寫單獨做優化。另外由於使用了Event Sourcing的架構,使得我們的Command端只要持久化了Domain Event,就意味着保存了這個Domain的所有狀態。這個特性,可以讓我們的框架有很多設計余地,比如不必考慮Domain Event和業務數據要強一致等問題,因為Domain Event本身就是業務數據本身了,我們通過Domain Event隨時可以還原出任意時刻的Domain的狀態。當我們要查詢Domain的當前最新數據時,就走Query端即可。當然,由於Query端是異步更新的,所以Query端的數據可能會有一點點延遲。這點也就是我們平時一直講到的最終一致性(CQ兩端的數據最終會一致)。
通過上面的架構圖,我們知道,一個Command發出后會經過兩個階段的處理:1)先被某個Command Service處理(調用Domain完成業務邏輯產生Domain Event);2)再被Event Service處理(響應Domain Event,完成Read DB的更新或者產生新的Command);理解這兩個階段對理解下面的Forum的項目結構很有用處。
Forum項目結構分析
以上是Forum的項目工程結構,項目中包含四個宿主工程,分別是:
Forum.BrokerService:
這個工程用於宿主EQueue的Broker,整個論壇中所有的Command,Domain Event的消息,都會被放在Broker上。比如Controller發送的Command會被發送到Broker,同樣Domain產生的Domain Event也會被發送到Broker;然后消費者消費消息則都是從BrokerService拉取消息。由於該宿主工程不需要和用戶交互,所以我部署為Windows Service。
Forum.CommandService:
這個工程就是用於處理Command的進程,同樣也部署為Windows Service。
Forum.EventService:
這個工程就是用於處理Domain Event的進程,同樣也部署為Windows Service。
Forum.Web:
這個就是論壇的Web站點了,不用多講了;這個Web站點做的事情就是發送Command或者調用Query端的查詢服務查詢數據;Web站點只需要依賴於Forum.Commands和Forum.QueryServices即可,因為它只需要發送Command和查詢數據即可。
Forum.CommandHandlers:
所有的Command Handler都在這個工程,Command Handler的職責是處理Command,調用Domain的方法完成業務邏輯;
Forum.Commands:
所有的Command都在這個工程中,每個Command都是一個DTO,會被封裝為消息發送到EQueue。
Forum.Domain:
就是論壇的領域層了,所有的聚合以、工廠、領域服務,以及領域事件等都在這個工程中。這個工程是整個Forum最有價值的地方,是業務邏輯所在的工程。
Forum.Domain.Dapper:
由於Domain中可能會定義一些接口,這些接口背后的持久化需要在外部實現;如果按照經典DDD的架構,比如倉儲接口是在Domain層定義,而實現則是在基礎層(Infrastructure)中。而從經典DDD的分層架構圖上來看,Domain層是依賴於Infrastructure層的,但是Infrastructure層中又有一些倉儲的實現類要依賴於Domain層;雖然我能理解這種雙向依賴,但很容易會給不少學DDD的人帶來困惑,所以我更加傾向於,把Domain看做是架構的核心,其他一切都是Domain的外圍。這個思想其實和六邊形架構是一個思路。就是從架構上來看,不是上層依賴於下層,而是外層依賴於內層;內層通過定義出接口,外層實現接口,內層只要面向自己定義的接口即可。所以基於這個思路,我會把Forum.Domain中定義的接口,如果用Dapper來實現,那我就定義一個Forum.Domain.Dapper這樣的工程,意思是實現Forum.Domain.Dapper依賴於內層的Forum.Domain。假如以后我們有一個基於EntityFramework的實現,則只要再創建一個Forum.Domain.EntityFramework這樣的工程即可。所以可以看出,Forum.Domain.Dapper這種工程實際上是Forum.Domain對外部的適配器,Forum.Domain里定義好適配接口,Forum.Domain.Dapper這種工程實現這些適配接口。基於這種思想,我們的架構就沒有了上層依賴下層的概念了,而是替換為內外層的關系,內層不依賴外層,外層依賴於內層,內層與外層直接通過適配器接口來交互,或者通過Domain Event也可以。這樣我們就不用再去糾結經典DDD中看似雙向依賴的問題了。
Forum.Domain.Tests:
這個工程就是對Forum.Domain的一個測試工程。每個測試用例會模擬Controller發起Command,然后最后檢查Domain中的狀態是否正確修改。
Forum.QueryServices:
這個工程定義了Query端的所有查詢接口,Forum.Web站點依賴於這個工程中的查詢服務接口;然后這些查詢接口的實現則是放在Forum.QueryServices.Dapper中。Forum.QueryServices與Forum.QueryServices.Dapper之間的關系和Forum.Domain與Forum.Domain.Dapper之間的關系類似,這里就不在重復了。
Forum.Denormalizers.Dapper:
這個工程中的就是所有的Denormalizer,Denormalizer就是負責處理Domain Event,然后更新讀庫。然后由於目前使用Dapper實現數據持久化,所以工程名以Dapper結尾。
Forum.Infrastructure:
這是一個基礎工程,存放所有基礎的公共的東西,比如一些業務無關的服務或配置信息或全局變量等東西;需要強調的是:這里的Forum.Infrastructure和經典DDD中的Infrastructure不是同一個概念。DDD中的Infrastructure是一個邏輯上的分層,領域層中所有的技術支撐實現都在Infrastructure中;而這里的Infrastructure,則僅僅只是一些Common的基礎的公用的東西,Infrastructure不是為了為其它哪一層服務的,它可以被其他任何項目使用;
好了,以上簡單介紹了每個工程的作用和設計目的。下面我們來看看Forum的領域模型的設計吧!
Forum的Domain Model的設計
核心功能需求分析
1. 提供用戶注冊、登錄、注銷三個功能;注冊用戶時需要驗證用戶名是否唯一;
2. 提供發帖、回帖、修改帖子、修改回復,以及回復的回復這些基本核心功能;
3. 系統管理員可以對論壇版塊進行維護;
聚合識別
識別出來的聚合有:論壇賬號、帖子、回復、版塊這四個。
賬號的最少信息應該有:賬號名稱+密碼;
版塊要有名稱即可;
帖子要有標題、內容、發帖人、發帖時間、所屬版塊;
回復要有回復內容、回復時間、回復人、所屬版塊,父回復(可以為空);
場景走查
注冊就是創建賬號(賬號唯一性的設計后面在詳細分析);
登錄本質就是調用Query端的查詢服務查找賬號是否存在,所以不需要Domain做什么處理,注銷也是;
發帖就是創建帖子;
回帖就是創建回復;
修改帖子就是對帖子聚合根做修改;
修改回復就是對回復聚合根做修改;
版塊添加就是創建一個版塊聚合根;
關鍵業務規則識別
1)賬號名稱不能重復;2)帖子必須要有所屬版塊和發帖人;3)回復必須要有一個對應的帖子和回復人;
下面我們分析一下每個業務規則的實現。
- 如何實現賬號名稱不能重復?首先它是一條業務規則,所以必須在Domain里實現,而不應該在Command Handler里。然后由於Event Sourcing的架構,天生有一個缺陷就是無法實現唯一性約束這種需求。所以我們需要在Domain中顯式設計出可以表達聚合根索引的東西,我把它們叫做IndexStore,表示是一種聚合根索引的存儲。這個思路非常類似於在經典DDD中,我們有倉儲(Repository)的概念,倉儲維護了所有的聚合根;而我這里的IndexStore則是維護了聚合根的索引信息。有了這個索引信息后,我們就能在注冊新賬號時,在Domain中設計一個RegisterAccountService這樣的領域服務,領域服務里通過AccountIndexStore來檢查賬號名稱是否重復,如果不重復,則將當前賬號名稱添加到AccountIndexStore中,如果重復,則報異常。另外一個非業務的點需要考慮,那就是如何實現並發注冊用戶的處理。我們可以在Command Handler中實現db級別的鎖(但不不需要鎖整個賬號表,而是鎖一個其他表中的某一條記錄),確保同一時刻,不會有兩個Account名稱添加到AccountIndexStore中;我們通過RegisterAccountService把“賬號名稱不能重復”的這個業務規則顯式的表達出來,從而在代碼級別體現領域內實現了這個業務規則。以前,如果沒有用Event Sourcing,我們可能會依賴db的唯一索引來實現這個唯一性,雖然功能上也可以實現,但實際上賬號名稱不能重復的這個業務規則沒有體現在領域內。這點也是我這次通過實現基於Event Sourcing而實現的唯一性驗證而想到的點。
- 帖子必須要有所屬版塊和發帖人,這條業務規則很容易保證,只要在帖子聚合根上,對版塊和發帖子判斷是否為空就行了;
- 回復必須要有一個對應的帖子和回復人,也是同理,只要在構造函數中判斷是否為空即可;
以注冊新用戶為例,展示代碼實現
客戶端JS通過angularJS提交注冊信息
$scope.submit = function () { if (isStringEmpty($scope.newAccount.accountName)) { $scope.errorMsg = '請輸入賬號。'; return false; } if (isStringEmpty($scope.newAccount.password)) { $scope.errorMsg = '請輸入密碼。'; return false; } if (isStringEmpty($scope.newAccount.confirmPassword)) { $scope.errorMsg = '請輸入密碼確認。'; return false; } if ($scope.newAccount.password != $scope.newAccount.confirmPassword) { $scope.errorMsg = '密碼輸入不一致。'; return false; } $http({ method: 'POST', url: '/account/register', data: $scope.newAccount }) .success(function (result, status, headers, config) { if (result.success) { $window.location.href = '/home/index'; } else { $scope.errorMsg = result.errorMsg; } }) .error(function (result, status, headers, config) { $scope.errorMsg = result.errorMsg; }); };
Controller處理請求
[HttpPost] [AjaxValidateAntiForgeryToken] [AsyncTimeout(5000)] public async Task<ActionResult> Register(RegisterModel model, CancellationToken token) { var command = new RegisterNewAccountCommand(model.AccountName, model.Password); var result = await _commandService.Execute(command, CommandReturnType.EventHandled); if (result.Status == CommandStatus.Failed) { if (result.ExceptionTypeName == typeof(DuplicateAccountException).Name) { return Json(new { success = false, errorMsg = "該賬號已被注冊,請用其他賬號注冊。" }); } return Json(new { success = false, errorMsg = result.ErrorMessage }); } _authenticationService.SignIn(result.AggregateRootId, model.AccountName, false); return Json(new { success = true }); }
CommandHandler處理Command
[Component(LifeStyle.Singleton)] public class AccountCommandHandler : ICommandHandler<RegisterNewAccountCommand> { private readonly ILockService _lockService; private readonly RegisterAccountService _registerAccountService; public AccountCommandHandler(ILockService lockService, RegisterAccountService registerAccountService) { _lockService = lockService; _registerAccountService = registerAccountService; } public void Handle(ICommandContext context, RegisterNewAccountCommand command) { _lockService.ExecuteInLock(typeof(Account).Name, () => { context.Add(_registerAccountService.RegisterNewAccount(command.Id, command.Name, command.Password)); }); } }
RegisterAccountService領域服務
/// <summary>提供賬號注冊的領域服務,封裝賬號注冊的業務規則,比如賬號唯一性檢查 /// </summary> [Component(LifeStyle.Singleton)] public class RegisterAccountService { private readonly IIdentityGenerator _identityGenerator; private readonly IAccountIndexStore _accountIndexStore; private readonly AggregateRootFactory _factory; public RegisterAccountService(IIdentityGenerator identityGenerator, AggregateRootFactory factory, IAccountIndexStore accountIndexStore) { _identityGenerator = identityGenerator; _factory = factory; _accountIndexStore = accountIndexStore; } /// <summary>注冊新賬號 /// </summary> /// <param name="accountIndexId"></param> /// <param name="accountName"></param> /// <param name="accountPassword"></param> /// <returns></returns> public Account RegisterNewAccount(string accountIndexId, string accountName, string accountPassword) { //首先創建一個新賬號 var account = _factory.CreateAccount(accountName, accountPassword); //先判斷該賬號是否存在 var accountIndex = _accountIndexStore.FindByAccountName(account.Name); if (accountIndex == null) { //如果不存在,則添加到賬號索引 _accountIndexStore.Add(new AccountIndex(accountIndexId, account.Id, account.Name)); } else if (accountIndex.IndexId != accountIndexId) { //如果存在但和當前的索引ID不同,則認為是賬號有重復 throw new DuplicateAccountException(accountName); } return account; } }
EventHandler處理Domain Event
[Component(LifeStyle.Singleton)] public class AccountEventHandler : BaseEventHandler, IEventHandler<NewAccountRegisteredEvent> { public void Handle(IEventContext context, NewAccountRegisteredEvent evnt) { using (var connection = GetConnection()) { connection.Insert( new { Id = evnt.AggregateRootId, Name = evnt.Name, Password = evnt.Password, CreatedOn = evnt.Timestamp, UpdatedOn = evnt.Timestamp, Version = evnt.Version }, Constants.AccountTable); } } }
結束語
好了,大概就這些吧。好久沒寫文章了,都不知道該怎么寫了,呵呵。接下來准備再好好分享下ENode,EQueue最近幾個月在不斷完善中我遇到的一些技術問題。對了,大家可以去體驗下這個論壇的功能哦,雖然很簡單,但基本的功能還是有的。http://www.enode.me
下一篇文章准備仔細介紹下ENode中各個環節的冪等的處理,比如Command的冪等處理,Domain Event持久化的冪等處理,Event Handler的冪等處理,Saga Process Manager的冪等處理。