用CQRS+ES實現DDD


這篇文章應該算是對前三篇的一個補充,在寫之前說個題外話,有園友評論這是在用三層架構在寫DDD,我的個人理解DDD是一種設計思想,跟具體用什么架構應該沒有什么關系,DDD也需要分層,也有三層架構的影子在里面。三層架構主要是表現層、業務層和數據層,而DDD已經沒有數據層,三層結構里的模型是貧血的,而DDD卻是充血的。如果你在用三層框架已經有了聚合,實體,值對象的概念,那說明你已經在靠近DDD了,或者你不願相信罷了,當然你可以保留自己的觀點,這里不作爭論,我也不能作出結論,我個人是覺得這種討論也是有意義的,我也會思考之前所介紹的到底是不是DDD,這個答案留給各位讀者吧。總之歡迎形式各樣的評論。

 

接下來我就來介紹一下CQRS(命令和查詢職責分離 )風格的框架。在學習CQRS的時候,有很多人說這個太高大上,難以應用。我想說CQRS不是那么可怕,當然也不是那么簡單。那么就開始慢慢來揭開面紗。首先還是先看看經典DDD在Application層是這么做的,先定義一個接口

public interface IUserService
{
    UserDTO GetUserInfo(string userId);
    IEnumerable<UserDTO> GetAllUsers();
    
    void RegisterUser(UserDTO userData);
    void ModifyContactInfo(UserDTO userData);
    void ModifyPassword(string userId, string oldPwd, string newPwd);
}

通過代碼會發現定義的接口有一點點規律,要么是有返回值,要么就是沒有返回值的,那么他們有什么特點呢?請注意我在寫代碼的時候特意在兩個接口之間加了回車以區分,上面兩個主要為了返回數據,是查詢,下面三個其中一個是創建數據,剩下的是修改數據,是命令。也就是說一個方法要么是執行某種動作的命令,要么是返回數據的查詢且查詢不應該會影響數據,不可能兩者同時存在(可能你並不認同,有一種情況是特殊的,就是當實體標識由數據庫來提供的,那么有時我們需要知道它的標識,但我也建議該方法不應該有返回值,可以用out,或者是給傳輸對象進行賦值,在有些環境下后一種也並不能解決),也就類似一次向服務器發送url請求時,要么是get,要么是post,不可能即是get,又是post。

當前接口只定義了一個DTO,該DTO的描述可能會過於宏大,只有當我們知道需要調用哪個接口時才會知道此DTO有哪些數據,於是當ui層去對DTO賦值往往也會不知所措,你是不是有針對不同的接口去定義相應的DTO的想法呢?至少我有,那么這樣的DTO和接口是不是具有相同功能的表達呢?我已經開始會將上頁分成兩個接口了

再來說說查詢,查詢主要是為了ui呈現數據,經典DDD的查詢一般都是通過repository(具體實現很多情況下會選擇orm),然后將domain model轉成dto,這種方式限制很大,對於一些復雜數據就會顯得很難,如當要查看一個user信息時還要展示他的role信息,這樣就需要通過repository先查出user,然后再通過user.RoleID再查詢role,最終數據轉換成ui需要的model,應用層開發就會有點繁瑣了,不如關系數據庫一句sql來的方便。當然,現在的orm(如nh、ef)提供了級聯查詢,這樣就會在user上定義一個role屬性,雖然是方便了很多,但是這樣的回報也僅僅是為了查詢,對於我們跟蹤狀態一點用也沒有,為什么?當一個用戶修改角色時,需要role對象嗎?不需要,只需要他的ID,因此在聚合之間的引用應該盡量用引用ID,而不是引用對象,所以聚合之間盡量低耦合,“低耦合高內聚”這個標准也能夠更好進行模塊式開發。再有一些匯總查詢,估計repository實現人員快要瘋了,寫應用層的人估計更要瘋,呵呵。使用orm帶來的好處是顯而易見的,但是面對查詢,orm並不是那么完美,盡管現在的orm查詢功能已經很強大。經過以上闡述你可能有了一點想法,讓查詢繞開倉儲。

 

接下來就開始CQRS吧。不細說查詢了,在上述接口重新定義一個名稱叫做IUserQuerySerice,我已經開始注重命名了,去掉里面的命令方法就行了。那么只要針對ui展示數據用的查詢DTO就行了,他也可以叫ReadModel(只讀模型),我個人覺得這個叫法更合適一點。那么實現你用數據庫視圖也行,用sql也好,達到目的就行。還要就是需要定義多少ReadModel,這個仁者見仁,智者見智。

 

重點是命令處理,為C端設計一個接口

public interface ICommandBus
{
    void Send(ICommand command);
}

就這么簡單,但是這帶來了需要大量寫Command,即每有一個操作就需要定義一個命令模型,然后還要寫該命令對應的處理器,還是拿之前的用戶注冊的例子來演示代碼吧

public class RegisterUserCommand : ICommand
{
    public string Name { get; set; }
    public string Password { get; set; }    
    public string Email { get; set; }
}

public class RegisterUserHandler : ICommandHandler<RegisterUserCommand>
{
    private readonly IRegisterUserService _registerUserService;
    private readonly IUserRepository _userRepository;

    public void Handle(RegisterUserCommand command)
    {
        User user = _registerUserService.RegisterNewUser(command.Name, command.Password, command.Email);
        _userRepository.Add(user);
    }
}

這種架構風格帶來了大量的代碼工作,就是需要定義很多Command。CommandBus的具體實現就是運用了訂閱/發布,即一個Command發送過來,系統會去找對應的CommandHandler,這樣的代碼寫起來會顯示更干凈。


CQRS不是一個讓你覺得是多么炫麗的架構,他的這種復雜性其實也是合理的,因為他是為了解決數據顯示的復雜性。

 

接下來我就說說ES。什么是ES?全稱是Event Sourcing,事件源。在未用ES之前,數據庫中保存的聚合只是最后一次完整狀態的數據,他不能反應聚合的歷史變遷,除非你使用了其他的方式。還記得之前我稍微說了一下事件驅動嗎?用了ES,必然要有事件驅動的(目前為止我還沒有其他好的方式),而且還要接受最終一致性。什么是最終一致性?后面再說吧。還是用代碼演示,在這里還是用戶注冊,為了方便這里用戶密碼就先不加密了,領域內的代碼大致就這些 

public class UserCreated : IDomainEvent
{
    public UserCreated(string name, string password)
    {
        this.Name = name;
        this.Password = password;
    }

    public string SourceId { get; set; }     
    public int Version { get; set; }

    public string Name { get; private set; }
    public string Password { get; private set; }
}

public class User : IAggregateRoot
{
    private readonly IList<IDomainEvent> _uncommittedEvents = new List<IDomainEvent>();
    IEnumerable<IDomainEvent> IAggregateRoot.Events
    {
        get { return this._uncommittedEvents; }
    }

    public User(string id)
    {
        this.Id = id;
    }

    public User(string name, string password)
        : this(Guid.NewGuid().ToString())
    {
        OnUserCreated(new UserCreated(name, password));
    }

    private void OnUserCreated(UserCreated @event)
    {
        @event.SourceId = this.Id;
        @event.Version = this.Version + 1;

        Handler(@event);

        this.Version = @event.Version;
        _uncommittedEvents.Add(@event);
    }

    private void Handle(UserCreated @event)
    {
        this.Name = @event.Name;
        this.Password = @event.Password;
    }

    void IAggregateRoot.LoadFrom(IEnumerable<IDomainEvent> events)
    {
        foreach (var @event in events) {
            Handle(@event);
            this.Version = @event.Version;
        }
} public string Id { get; private set; } public int Version { get; private set; } public string Name { get; private set; } public string Password { get; private set; } } public class IRepository { T Get<T>(string id) where T : class, IAggregateRoot; void Save<T>(T aggregate, string commandId) where T : class, IAggregateRoot; }

不知道上面的代碼你能不能夠大致明白。在這里倉儲的功能更為有限,只有獲取和保存聚合。當new一個user時會產生個事件,同時為事件記錄一個版本號,聚合會得到最終的版本號,而且狀態的修改是由事件驅動的。這個時候我們還看不出來事件的作用。別急,簡單看下倉儲的實現。保存聚合到底發生了什么

public class SourcedEvent
{
    public SourcedEvent(string aggregateId, string aggregateName, int version)
    {
        this.AggregateId = aggregateId;
        this.AggregateName = aggregateName;
        this.Version = version;
    }

    public string AggregateId { get; private set; }
    public string AggregateName { get; private set; }
    public int Version { get; private set; }
    public string Payload { get; set; }
    public string CorrelationId { get; set; }
}

public class EventSourcedRepository : IRepository
{
    public void Save<T>(T aggregate, string commandId) where T : class, IAggregateRoot
    {
        var events = aggregate.Events
            .Select(@event => new SourcedEvent(aggregate.Id, typeof(T).Name, @event.Version) {
                CorrelationId = commandId,
                Payload = _serializer.Serialize(@event)
            }).ToArray();

        using (var connection = new SqlConnection()) {
            using (var trans = connection.BeginTransaction()) {
                try {
                    foreach (var @event in events) {
                        //TODO添加事件sql
                    }
                    trans.Commit();
                }
                catch (Exception) {
                    trans.RollBack();
                    throw;
                }
            }
        }
        _eventBus.Publish(aggregate.Events);
    }
}

你會看到此時保存的僅僅是事件,持久化成功了,會將事件發布出去。這樣C端的寫數據庫設計可以簡單到只需要記錄Events的一張表。而且最大的好處在於只會對event進行insert,還是就是他的存儲介質不一定就需要db,甚至文本文件都行(為每個聚合創建一個文件,然后將事件追加,多么簡單),想想都會興奮,有點顛覆吧。

然后你再寫個同步讀數據庫的EventHandler,還是再貼上代碼,已經寫了這么多了,不在乎再寫一個了

public class UserDataSyncHandler : IEventHandler<UserCreated>
{
    public void Handle(UserCreated @event)
    {
        string sql = string.Format("insert user(id, name, password) values('{0}','{1}','{2}')",
            @event.SourceId, @event.Name, @event.Password);
    }
}

至此,大致做法已經介紹完了。在這過程中你會發現用了CQRS+ES架構,可以完全拋棄ORM了,喜歡寫sql的伙伴們可能會更興奮,也許會和我一樣想說“ORM我早就看你有點不爽了”,呵呵。還要說明一下前面所說的最終一致性,就是C端的事件持久化完時此時Q端的數據並沒有同步過來,會存在一點延遲,但這種延遲不會太久,甚至會感覺不到。到了這里我還要有一個感觸就是有了這樣的架構去實現DDD,你還認為聚合模型就是數據模型嗎?或者說他倆是同胞兄弟嗎?

最后再展示下如何通過事件還原聚合,還是上代碼吧,誰讓我如此喜歡用代碼來描述呢

public class EventSourcedRepository : IRepository
{
    public T Get<T>(string id) where T : class, IAggregateRoot
    {
        IEnumerable<IDomainEvent> events;
        using (var connection = new SqlConnection()) {
            //TODO聚合名稱和聚合ID取出事件並對版本號進行升序
        }

        var aggregate = (T)Activator.CreateInstance(typeof(T), id);
        aggregate.LoadFrom(events);

        return aggregate;
    }
}

這樣聚合就可以還原到最后一次的狀態了。就像以前的電影膠片一樣,每個事件對應着一個畫面,放完了也就完了。

 

通過上面的介紹,你應該會了大致的了解了,現在來看這張圖估計你就不會覺得有多么高大上了(先跳過wcf)

 

CQRS+ES的結合帶來了很大的亮點,但是要應用考慮的會很多,復雜度也會很大,如果同步數據,如果事件丟了怎么辦?產生的事件執行順序跟我們的預期不一樣怎么辦?遇到並發怎么辦?實體的id生成策略等等好多問題。有了問題自己的知識范圍也會擴大和提高。總之,CQRS+ES可討論的太多了,我也無法一一列舉。就先寫到這兒了,這一篇應該是這周最長的一篇了,明天周末了,歇兩天。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM