ENode 2.0 - 第一個真實案例剖析-一個簡易論壇(Forum)


前言

經過不斷的堅持和努力,ENode 2.0的第一個真實案例終於出來了。這個案例是一個簡易的論壇,開發這個論壇的初衷是為了驗證用ENode框架來開發一個真實項目的可行性。目前這個論壇在UI上是使用了最終一致性,也就是說當我們發帖或回帖后不會立馬顯示你的帖子或回復。當我們下一次刷新頁面時,會顯示出來。這點貌似很多人向我反饋不太習慣,接受不了,呵呵。我之所以這樣做也是想看看最終一致性大家的接受程度如何,看來UI層面上的最終一致性,大部分人接受不了。回頭我改進下效果,改為立即可以看到帖子或回復吧!另外,關於ENode是什么,本文就不多介紹了,可以參考這篇文章的介紹。本文重點介紹一下ENode是如何幫助我們開發一個基於DDD+CQRS+Event Sourcing架構的應用程序的。這個論壇使用到了ENode, EQueue兩個框架,EQueue是一個分布式的消息隊列組件,該組件的主體思想是參考阿里的RocketMQ。當我們使用EQueue時,面向的不是Queue,而是Topic。EQueue也是完全用C#實現的,關於EQueue詳細介紹,大家可以看一下這篇文章

ENode, EQueue, Forum 開源項目地址

  1. ENode開源地址:https://github.com/tangxuehua/enode
  2. EQueue開源地址:https://github.com/tangxuehua/equeue
  3. ECommon開源地址:https://github.com/tangxuehua/ecommon
  4. Forum開源地址:https://github.com/tangxuehua/forum
  5. Forum論壇線上地址(臨時域名,以后會改為enode.me):http://www.enode.me

另外,項目中如果要開發引用程序集,可以通過Nuget來獲取,輸入關鍵字ENode就能看到所有相關的Package了,如下圖所示:

Forum總體架構分析

Forum采用DDD+CQRS+Event Sourcing的架構。借助於ENode,使得Forum本身無須再做技術架構方面的設計了,直接使用ENode就能完成這種架構。所以我們只要明白了ENode的架構,就知道這個Forum的架構是怎樣的了。以下是ENode的架構圖(已經理解了這個圖的朋友請直接跳過這一節):

上圖是一個CQRS架構的數據流向圖。UI請求會分為兩類:Command和Query。

Command用於寫數據,Query用於讀數據,寫數據和讀數據完全采用不同的架構實現。寫數據支持同步和異步的方式,讀數據完全走簡單高效思路來實現。當我們要對系統做寫操作時,如果你是用ASP.NET MVC來開發站點,那就可以在Controller中創建並發送一個Command即可。該Command會被發送到消息隊列(EQueue中),然后消息隊列的訂閱方,也就是處理Command的進程會拉取這些Command,然后調用Command Handler完成Command的處理;Command Handler處理Command時,是調用Domain的方法來完成相關的業務邏輯操作。Domain就是DDD中的領域層,負責實現整個系統的業務邏輯。 然后由於是Event Sourcing的架構,所以Domain中任何聚合根的修改都會產生相應的領域事件(Domain Event),領域事件會先被持久化到EventStore中,持久化如果沒有遇到並發沖突,成功后,則會被發布(Publish)到消息隊列(EQueue中),然后消息隊列的訂閱方,也就是處理Domain Event的進程會拉取這些Domain Event,然后調用相關的Event Handler做相關的更新,比如有些Event Handler是會更新讀庫(Read DB),有些是會產生新的Command,這種我把它叫做流程管理器(Process Manager,也有人叫做Saga)。當我們有時一個業務場景需要涉及到多個聚合根的修改時,我們會需要用到Process Manager。Process Manager負責對流程進行建模,它的原理是基於事件驅動的流程實現。Process Manager處理事件,然后產生響應的Command,從而完成聚合根之間的交互。一般一個流程,我們會設計一個流程聚合根以及其他的參與該流程的聚合根,Process Manager則是用於負責協調這些聚合根之間的交互。具體的例子可以看一下ENode源代碼中的BankTransferSample。

關於Query端,由於都是查詢,這些查詢都是用於UI展示數據或者為第三方接口提供數據為目的,查詢對系統無副作用。我們可以用我們自己任意喜歡的方式來實現Query端。查詢面向的是Read DB。上面提到,Read DB中的數據是通過Event Handler(老外叫Denormalizer)來更新的。

所以我們可以看到,整個架構中,Command端和Query端的數據源是完全分離的。Command端最后的結果就是Domain Event,Domain Event是持久化在Event Store中的;Query端的數據源就是Read DB,一般可以用關系型數據庫來作為存儲。CQ兩端的數據同步通過Domain Event來實現。

上圖的CQRS架構最大的好處是在架構級別以及數據存儲級別,把讀寫都分離了。這樣我們可以方便的對讀或寫單獨做優化。另外由於使用了Event Sourcing的架構,使得我們的Command端只要持久化了Domain Event,就意味着保存了這個Domain的所有狀態。這個特性,可以讓我們的框架有很多設計余地,比如不必考慮Domain Event和業務數據要強一致等問題,因為Domain Event本身就是業務數據本身了,我們通過Domain Event隨時可以還原出任意時刻的Domain的狀態。當我們要查詢Domain的當前最新數據時,就走Query端即可。當然,由於Query端是異步更新的,所以Query端的數據可能會有一點點延遲。這點也就是我們平時一直講到的最終一致性(CQ兩端的數據最終會一致)。

通過上面的架構圖,我們知道,一個Command發出后會經過兩個階段的處理:1)先被某個Command Service處理(調用Domain完成業務邏輯產生Domain Event);2)再被Event Service處理(響應Domain Event,完成Read DB的更新或者產生新的Command);理解這兩個階段對理解下面的Forum的項目結構很有用處。

Forum項目結構分析

 

以上是Forum的項目工程結構,項目中包含四個宿主工程,分別是:

Forum.BrokerService:

這個工程用於宿主EQueue的Broker,整個論壇中所有的Command,Domain Event的消息,都會被放在Broker上。比如Controller發送的Command會被發送到Broker,同樣Domain產生的Domain Event也會被發送到Broker;然后消費者消費消息則都是從BrokerService拉取消息。由於該宿主工程不需要和用戶交互,所以我部署為Windows Service。

Forum.CommandService:

這個工程就是用於處理Command的進程,同樣也部署為Windows Service。

Forum.EventService:

這個工程就是用於處理Domain Event的進程,同樣也部署為Windows Service。

Forum.Web:

這個就是論壇的Web站點了,不用多講了;這個Web站點做的事情就是發送Command或者調用Query端的查詢服務查詢數據;Web站點只需要依賴於Forum.Commands和Forum.QueryServices即可,因為它只需要發送Command和查詢數據即可。

Forum.CommandHandlers:

所有的Command Handler都在這個工程,Command Handler的職責是處理Command,調用Domain的方法完成業務邏輯;

Forum.Commands:

所有的Command都在這個工程中,每個Command都是一個DTO,會被封裝為消息發送到EQueue。

Forum.Domain:

就是論壇的領域層了,所有的聚合以、工廠、領域服務,以及領域事件等都在這個工程中。這個工程是整個Forum最有價值的地方,是業務邏輯所在的工程。

Forum.Domain.Dapper:

由於Domain中可能會定義一些接口,這些接口背后的持久化需要在外部實現;如果按照經典DDD的架構,比如倉儲接口是在Domain層定義,而實現則是在基礎層(Infrastructure)中。而從經典DDD的分層架構圖上來看,Domain層是依賴於Infrastructure層的,但是Infrastructure層中又有一些倉儲的實現類要依賴於Domain層;雖然我能理解這種雙向依賴,但很容易會給不少學DDD的人帶來困惑,所以我更加傾向於,把Domain看做是架構的核心,其他一切都是Domain的外圍。這個思想其實和六邊形架構是一個思路。就是從架構上來看,不是上層依賴於下層,而是外層依賴於內層;內層通過定義出接口,外層實現接口,內層只要面向自己定義的接口即可。所以基於這個思路,我會把Forum.Domain中定義的接口,如果用Dapper來實現,那我就定義一個Forum.Domain.Dapper這樣的工程,意思是實現Forum.Domain.Dapper依賴於內層的Forum.Domain。假如以后我們有一個基於EntityFramework的實現,則只要再創建一個Forum.Domain.EntityFramework這樣的工程即可。所以可以看出,Forum.Domain.Dapper這種工程實際上是Forum.Domain對外部的適配器,Forum.Domain里定義好適配接口,Forum.Domain.Dapper這種工程實現這些適配接口。基於這種思想,我們的架構就沒有了上層依賴下層的概念了,而是替換為內外層的關系,內層不依賴外層,外層依賴於內層,內層與外層直接通過適配器接口來交互,或者通過Domain Event也可以。這樣我們就不用再去糾結經典DDD中看似雙向依賴的問題了。

Forum.Domain.Tests:

這個工程就是對Forum.Domain的一個測試工程。每個測試用例會模擬Controller發起Command,然后最后檢查Domain中的狀態是否正確修改。

Forum.QueryServices:

這個工程定義了Query端的所有查詢接口,Forum.Web站點依賴於這個工程中的查詢服務接口;然后這些查詢接口的實現則是放在Forum.QueryServices.Dapper中。Forum.QueryServices與Forum.QueryServices.Dapper之間的關系和Forum.Domain與Forum.Domain.Dapper之間的關系類似,這里就不在重復了。

Forum.Denormalizers.Dapper:

這個工程中的就是所有的Denormalizer,Denormalizer就是負責處理Domain Event,然后更新讀庫。然后由於目前使用Dapper實現數據持久化,所以工程名以Dapper結尾。

Forum.Infrastructure:

這是一個基礎工程,存放所有基礎的公共的東西,比如一些業務無關的服務或配置信息或全局變量等東西;需要強調的是:這里的Forum.Infrastructure和經典DDD中的Infrastructure不是同一個概念。DDD中的Infrastructure是一個邏輯上的分層,領域層中所有的技術支撐實現都在Infrastructure中;而這里的Infrastructure,則僅僅只是一些Common的基礎的公用的東西,Infrastructure不是為了為其它哪一層服務的,它可以被其他任何項目使用;

好了,以上簡單介紹了每個工程的作用和設計目的。下面我們來看看Forum的領域模型的設計吧!

Forum的Domain Model的設計

核心功能需求分析

1. 提供用戶注冊、登錄、注銷三個功能;注冊用戶時需要驗證用戶名是否唯一;

2. 提供發帖、回帖、修改帖子、修改回復,以及回復的回復這些基本核心功能;

3. 系統管理員可以對論壇版塊進行維護;

聚合識別

識別出來的聚合有:論壇賬號、帖子、回復、版塊這四個。

賬號的最少信息應該有:賬號名稱+密碼;

版塊要有名稱即可;

帖子要有標題、內容、發帖人、發帖時間、所屬版塊;

回復要有回復內容、回復時間、回復人、所屬版塊,父回復(可以為空);

場景走查

注冊就是創建賬號(賬號唯一性的設計后面在詳細分析);

登錄本質就是調用Query端的查詢服務查找賬號是否存在,所以不需要Domain做什么處理,注銷也是;

發帖就是創建帖子;

回帖就是創建回復;

修改帖子就是對帖子聚合根做修改;

修改回復就是對回復聚合根做修改;

版塊添加就是創建一個版塊聚合根;

關鍵業務規則識別

1)賬號名稱不能重復;2)帖子必須要有所屬版塊和發帖人;3)回復必須要有一個對應的帖子和回復人;

下面我們分析一下每個業務規則的實現。

  1. 如何實現賬號名稱不能重復?首先它是一條業務規則,所以必須在Domain里實現,而不應該在Command Handler里。然后由於Event Sourcing的架構,天生有一個缺陷就是無法實現唯一性約束這種需求。所以我們需要在Domain中顯式設計出可以表達聚合根索引的東西,我把它們叫做IndexStore,表示是一種聚合根索引的存儲。這個思路非常類似於在經典DDD中,我們有倉儲(Repository)的概念,倉儲維護了所有的聚合根;而我這里的IndexStore則是維護了聚合根的索引信息。有了這個索引信息后,我們就能在注冊新賬號時,在Domain中設計一個RegisterAccountService這樣的領域服務,領域服務里通過AccountIndexStore來檢查賬號名稱是否重復,如果不重復,則將當前賬號名稱添加到AccountIndexStore中,如果重復,則報異常。另外一個非業務的點需要考慮,那就是如何實現並發注冊用戶的處理。我們可以在Command Handler中實現db級別的鎖(但不不需要鎖整個賬號表,而是鎖一個其他表中的某一條記錄),確保同一時刻,不會有兩個Account名稱添加到AccountIndexStore中;我們通過RegisterAccountService把“賬號名稱不能重復”的這個業務規則顯式的表達出來,從而在代碼級別體現領域內實現了這個業務規則。以前,如果沒有用Event Sourcing,我們可能會依賴db的唯一索引來實現這個唯一性,雖然功能上也可以實現,但實際上賬號名稱不能重復的這個業務規則沒有體現在領域內。這點也是我這次通過實現基於Event Sourcing而實現的唯一性驗證而想到的點。
  2. 帖子必須要有所屬版塊和發帖人,這條業務規則很容易保證,只要在帖子聚合根上,對版塊和發帖子判斷是否為空就行了;
  3. 回復必須要有一個對應的帖子和回復人,也是同理,只要在構造函數中判斷是否為空即可;

以注冊新用戶為例,展示代碼實現

客戶端JS通過angularJS提交注冊信息

    $scope.submit = function () {
        if (isStringEmpty($scope.newAccount.accountName)) {
            $scope.errorMsg = '請輸入賬號。';
            return false;
        }
        if (isStringEmpty($scope.newAccount.password)) {
            $scope.errorMsg = '請輸入密碼。';
            return false;
        }
        if (isStringEmpty($scope.newAccount.confirmPassword)) {
            $scope.errorMsg = '請輸入密碼確認。';
            return false;
        }
        if ($scope.newAccount.password != $scope.newAccount.confirmPassword) {
            $scope.errorMsg = '密碼輸入不一致。';
            return false;
        }

        $http({
            method: 'POST',
            url: '/account/register',
            data: $scope.newAccount
        })
        .success(function (result, status, headers, config) {
            if (result.success) {
                $window.location.href = '/home/index';
            } else {
                $scope.errorMsg = result.errorMsg;
            }
        })
        .error(function (result, status, headers, config) {
            $scope.errorMsg = result.errorMsg;
        });
    };

Controller處理請求

[HttpPost]
[AjaxValidateAntiForgeryToken]
[AsyncTimeout(5000)]
public async Task<ActionResult> Register(RegisterModel model, CancellationToken token)
{
    var command = new RegisterNewAccountCommand(model.AccountName, model.Password);
    var result = await _commandService.Execute(command, CommandReturnType.EventHandled);

    if (result.Status == CommandStatus.Failed)
    {
        if (result.ExceptionTypeName == typeof(DuplicateAccountException).Name)
        {
            return Json(new { success = false, errorMsg = "該賬號已被注冊,請用其他賬號注冊。" });
        }
        return Json(new { success = false, errorMsg = result.ErrorMessage });
    }

    _authenticationService.SignIn(result.AggregateRootId, model.AccountName, false);
    return Json(new { success = true });
}

CommandHandler處理Command

[Component(LifeStyle.Singleton)]
public class AccountCommandHandler : ICommandHandler<RegisterNewAccountCommand>
{
    private readonly ILockService _lockService;
    private readonly RegisterAccountService _registerAccountService;

    public AccountCommandHandler(ILockService lockService, RegisterAccountService registerAccountService)
    {
        _lockService = lockService;
        _registerAccountService = registerAccountService;
    }

    public void Handle(ICommandContext context, RegisterNewAccountCommand command)
    {
        _lockService.ExecuteInLock(typeof(Account).Name, () =>
        {
            context.Add(_registerAccountService.RegisterNewAccount(command.Id, command.Name, command.Password));
        });
    }
}

RegisterAccountService領域服務

    /// <summary>提供賬號注冊的領域服務,封裝賬號注冊的業務規則,比如賬號唯一性檢查
    /// </summary>
    [Component(LifeStyle.Singleton)]
    public class RegisterAccountService
    {
        private readonly IIdentityGenerator _identityGenerator;
        private readonly IAccountIndexStore _accountIndexStore;
        private readonly AggregateRootFactory _factory;

        public RegisterAccountService(IIdentityGenerator identityGenerator, AggregateRootFactory factory, IAccountIndexStore accountIndexStore)
        {
            _identityGenerator = identityGenerator;
            _factory = factory;
            _accountIndexStore = accountIndexStore;
        }

        /// <summary>注冊新賬號
        /// </summary>
        /// <param name="accountIndexId"></param>
        /// <param name="accountName"></param>
        /// <param name="accountPassword"></param>
        /// <returns></returns>
        public Account RegisterNewAccount(string accountIndexId, string accountName, string accountPassword)
        {
            //首先創建一個新賬號
            var account = _factory.CreateAccount(accountName, accountPassword);

            //先判斷該賬號是否存在
            var accountIndex = _accountIndexStore.FindByAccountName(account.Name);
            if (accountIndex == null)
            {
                //如果不存在,則添加到賬號索引
                _accountIndexStore.Add(new AccountIndex(accountIndexId, account.Id, account.Name));
            }
            else if (accountIndex.IndexId != accountIndexId)
            {
                //如果存在但和當前的索引ID不同,則認為是賬號有重復
                throw new DuplicateAccountException(accountName);
            }

            return account;
        }
    }

EventHandler處理Domain Event

[Component(LifeStyle.Singleton)]
public class AccountEventHandler : BaseEventHandler, IEventHandler<NewAccountRegisteredEvent>
{
    public void Handle(IEventContext context, NewAccountRegisteredEvent evnt)
    {
        using (var connection = GetConnection())
        {
            connection.Insert(
                new
                {
                    Id = evnt.AggregateRootId,
                    Name = evnt.Name,
                    Password = evnt.Password,
                    CreatedOn = evnt.Timestamp,
                    UpdatedOn = evnt.Timestamp,
                    Version = evnt.Version
                }, Constants.AccountTable);
        }
    }
}

結束語

好了,大概就這些吧。好久沒寫文章了,都不知道該怎么寫了,呵呵。接下來准備再好好分享下ENode,EQueue最近幾個月在不斷完善中我遇到的一些技術問題。對了,大家可以去體驗下這個論壇的功能哦,雖然很簡單,但基本的功能還是有的。http://www.enode.me

下一篇文章准備仔細介紹下ENode中各個環節的冪等的處理,比如Command的冪等處理,Domain Event持久化的冪等處理,Event Handler的冪等處理,Saga Process Manager的冪等處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM