你一定看得懂的 DDD+CQRS+EDA+ES 核心思想與極簡可運行代碼示例


前言

隨着分布式架構微服務的興起,DDD(領域驅動設計)、CQRS(命令查詢職責分離)、EDA(事件驅動架構)、ES(事件溯源)等概念也一並成為時下的火熱概念,我也在早些時候閱讀了一些大佬的分析文,學習相關概念,不過一直有種霧里看花、似懂非懂的感覺。經過一段時間的學習和研究大佬的代碼后,自己設計實現了一套我消化理解后的代碼。為了突出重點,避免受到大量實現細節的干擾,當然也是懶(這才是主要原因),其中的所有基礎設施都使用了現成的庫。所實現的研究成果也做成了傻瓜式一鍵體驗(我對對着黑框框敲命令沒什么興趣,能點兩下鼠標搞定的事我絕不在鍵盤上敲又臭又長的命令,敲命令能敲出優越感的人我覺得應該是抖M)。

正文

DDD(領域驅動設計)

這一定是最群魔亂舞的一個概念,每個大佬都能講出一大篇演講稿,但都或多或少存在差異或分歧,在我初看 DDD 時,我就被整懵了,這到底是咋回事?

現在回過頭來看,DDD 其實是一個高階思想概念,並不能指導開發者如何敲鍵盤,是指導人如何思考領域問題,而不是指導人思考出具體的領域的。正是因為中間隔了一層虛幻飄渺的概念,導致不同的人得出了不同的結論。還好 DDD 存在一些比較具體容易落實的概念,現在就來講下我對這些常見基礎概念的理解和我編碼時的基本原則,希望大家能在看大佬的文章時不用一臉懵逼,也進行下心得交流。

Entity(實體)

實體是一個存儲數據的類,如果類中包含自身的合法性驗證規則之類的方法,一般稱之為充血模型,相對的單純保存數據的則稱為貧血模型(有時也叫做 POCO 類)。實體有一個重要性質,相等性是由標識屬性決定的,這個標識可以是一個簡單的 int 型的 Id,也可以是多個內部數據的某種組合(類似數據庫表的復合字段主鍵)。除標識外的其他東西均不對兩個實體對象的相等性產生影響。並且實體的數據屬性是可更改的。

有很多大佬認為實體應該是充血的,但在我看來,貧血的似乎更好,因為需求的不穩定性可能導致這些規則並不穩定,或規則本身並不唯一,在不同場合可能需要不同規則。這時候充血模型無論怎么辦都很別扭,如果把規則定義和校驗交給外部組件,這些需求就很容易滿足,比如使用 FluentValidate 為一種實體定義多套規則或對內部的規則條目按情況重新組合。

ValueObject(值對象)

值對象也是用來存儲數據的類。與實體相對,值對象沒有標識屬性,其相等性由所有內部屬性決定,當且僅當兩個值對象實例的所有屬性一一相等時,這兩個值對象相等。並且值對象的所有屬性為只讀,僅能在構造函數中進行唯一一次設置,如果希望修改某個值對象的某一屬性,唯一的辦法是使用新的值對象替換舊的值對象。並且值對象經常作為實體的屬性存在。

這個概念看起來和實體特別相似,都是用來存儲數據的,但也有些性質上的根本不同。網上的大佬通常會為值對象編寫基類,但我認為,值對象和實體在代碼實現上並沒有這么大的區別。可以看作整數和小數在計算機中表現為不同的數據類型,但在數學概念上他們沒有區別,僅僅只是因為離散的計算機系統無法完美表示連續的數學數字而產生的縫合怪。我傾向於根據類的代碼定義所表現出來的性質與誰相符就將其視為誰,而不是看實現的接口或繼承的基類。因為需求的不確定性會導致他們可能會發生轉換,根據代碼進行自我描述來判斷可以避免很多潛在的麻煩。

Aggregate,Aggregate Root(聚合及聚合根)

聚合根表示一個領域所操作的頂級實體類型,其他附屬數據都是聚合根的內部屬性,聚合根和其所屬的其他實體的組合稱為聚合。這是一個純概念性的東西。對領域實體的操作必須從聚合根開始,也就是說確保數據完整性的基本單位是聚合。大佬的代碼中經常會用一個空接口來表示聚合根,如果某個實體實現了這個接口,就表示這個實體可以是一個聚合根。請注意,聚合根不一定必須是頂級類型,也可以是其他實體的一個屬性。這表示一個實體在,某些情況下是聚合根,而其他情況下是另一個聚合根的內部屬性。也就是說實體之間並非嚴格的樹狀關系,而是一般有向圖狀關系。

我認為定義這樣的空接口實際意不大,反而可能造成一些誤會。如果某個實體由於需求變動導致不再會成為聚合根,那這個實體事實上將不再是聚合根,但人是會犯錯的,很可能忘記去掉聚合根接口,這時代碼與事實將產生矛盾。所以我認為聚合根應該基於事實而不是代碼。當一個實體不再會作為聚合根使用時,將相關代碼刪除,就同時表示它不再是聚合根,閱讀代碼的人也因為看不到相關代碼而自動認為它不是聚合根。在代碼中的體現方式與下一個的概念有關。

Repository(倉儲)

倉儲表示對聚合根的持久化的抽象,在代碼上可表現為聲明了增刪查改的相關方法的接口,而倉儲的實現類負責具體解決如何對聚合根實體進行增刪查改。例如在倉儲內部使用數據庫完成具體工作。

如果一個倉儲負責管理一個聚合根實體的持久化或者說存取,那這個實體就是一個事實上的聚合根。那么在這里,就可以在代碼操作上將看到某個實體被倉儲管理等價為這個實體是聚合根,反之就不是。也就是說,如果將某個實體的倉儲的最后一個實際使用代碼刪除,這個實體就在事實上不再是聚合根,此時代碼表現與事實將完美同步,不再會產生矛盾。至於由於沒看到某個實體的倉儲而將實體誤認為不是聚合根,這其實並沒有任何問題。這說明在你所關注的領域中這個實體確實不是聚合根,而這個實體可能作為聚合根使用的領域你根本不關心,所以看不到,那這個實體是否在其他領域作為聚合根使用對你而言其實是無所謂的。

Domain Service(領域服務)

這就涉及到業務代碼的編寫了。如果一個業務需要由多個聚合根配合完成,也就是需要多個倉儲,那么就應該將這些對倉儲的調用封裝進一個服務,統一對外暴露提供服務。

如果這些倉儲操作需要具有事務性,也可以在這里進行協調管理。如果某個業務只需要一個倉儲參與,要不要專門封裝一個服務就看你高興了。

CQRS(命令查詢職責分離)

CQRS 本質上是一種指導思想,指導開發者如何設計一個低耦合高可擴展架構的思想。傳統的 CURD 將對數據的操作分為 讀、寫、改、刪,將他們封裝在一起導致他們將緊密耦合在相同的數據源中,不利於擴展。CQRS 則將對數據的操作分為會改變數據源的和不會改變數據源的,前者稱為命令,后者稱為查詢。將他們分別封裝能讓他們各自使用不同的數據源,提高可擴展性。

其中命令是一個會改變數據源,但不返回任何值的方法;查詢是會返回值,但絕不會改變數據源的方法。但是在我的編碼中,命令是可以返回值的,至於要返回什么,根據實際情況調整。比如最簡單的返回一個 bool 表示操作是否成功以決定接下來的業務流程該走向何方,這是很常見的情況。所以在我的概念里,一個方法是命令還是查詢實際上只看這個方法是否會改變數據源,要封裝在一起還是分別封裝都無所謂。建議分開封裝到不同的倉儲中,通過倉儲關聯到具體的數據源,命令和查詢的倉儲關聯到不同的數據源的時候,自然就完成了讀寫分離。通過起名來明示方法的目的應該可以輕松分辨一個方法屬於命令還是查詢。只要腦子里有這個概念,要實現擴展辦法多的是。

事件驅動架構(EDA)

可以說所有圖形界面(Gui)編程都是清一色的事件驅動架構,這東西一點也不稀奇。說白了,EDA 就是一種被動架構,通過某些事情的發生來觸發某些操作的執行,否則系統就隨時待命,按兵不動。

EDA 的實現需要一個中介才能實現,在 Windows 中,這個東西叫做 Windows 消息隊列(消息循環)和事件處理器。同樣的,在非 Gui 編程中也需要這倆東西,但通常被稱為消息總線和消息消費者。在分布式系統中,這個中介將不與系統在同一進程甚至不在同一設備中,稱為分布式消息總線。這樣在開發時可以分成兩撥,一撥負責寫生產並發送事件的代碼,一撥負責寫接收事件信息並進行處理的代碼。他們之間的溝通僅限於交流關心的事件叫什么以及事件攜帶了什么信息。至於產生的消息是如何送到正確的消費端並觸發消費處理器的,那是消息總線的事。如果一個消息總線需要這兩撥人了解中間的過程甚至需要自己去實現,那這個消息總線是個廢品,也起不到什么解耦的效果,甚至是個拖后腿的東西。

EDA + CQRS

當他們結合在一起,就產生了命令或查詢的發起和實際處理實現可以分離的效果。命令的發起方向命令總線發送一條命令消息並帶上必要參數,消費方收到消息后獲取參數完成任務並返回結果。命令可以看作一種特殊的事件,命令只由一個命令處理器處理,並可向發送方返回一個處理結果;事件由所有對同種事件感興趣的事件處理器處理,不向事件發送方返回任何結果。

事件處理器的執行順序是不確定的,所以任何事件處理器都必須獨立完成事件處理。如果兩個事件處理之間存在因果依賴,應該在前置事件處理后由事件處理器發布新事件,並由后置事件處理器去處理前置事件產生的新事件,而不是讓它們處理同一事件。

ES(事件溯源)

事件溯源表示能追查一個事件的源頭,甚至與之相關的其他事件的概念,說句大白話就是刨祖墳。ES 對歷史狀態回溯的需求有着天然的支持,最常見的如撤銷重做。而 ES 一般會配合 EDA 使用,ES 保存 EDA 產生的事件信息,並且這些信息有只讀性和因果連貫性。這順便能讓我們對系統中的實體究竟是如何一步一步變成現在這個樣子有一個清晰的了解。畢竟實體具有可變性,實體信息一旦改變,舊的信息就會丟失,ES 剛好彌補了這個缺陷。

代碼展示說明

此處的事件消息中介使用 MediatR 實現。

接口

DDD 相關

實體

定義一個實體的基本要素,實現接口的類就是實體,值對象沒有接口或基類,只看代碼所展現的性質是否符合值對象的定義,聚合根沒有接口或基類,只看實體是否被倉儲使用,領域服務說白了就是個打包封裝,根據情況來決定,例如重構時提取方法即可視為封裝服務。在此處可簡單認為沒有實現實體接口的數據類是值對象:

/// <summary>
/// 實體接口
/// </summary>
public interface IEntity {}

/// <summary>
/// 泛型實體接口,約束Id屬性
/// </summary>
public interface IEntity<TKey> : IEntity
    where TKey : IEquatable<TKey>
{
    TKey Id { get; set; }
}

倉儲接口

倉儲接口細分為可讀倉儲和可寫倉儲,可寫倉儲有一個分支為可批量提交倉儲,表示修改操作會在調用提交保存方法后批量保存,也就是事務(就是用來替代操作單元的,這東西就有一個提交操作,名字也莫名其妙,我曾經一直無法理解這東西是干嘛的),接口聲明參考 EF Core,示例實現也基於 EF Core。由於已經公開了查詢接口類型的 Set 屬性,使用者可以任意自定義查詢。

public interface IBulkOperableVariableRepository<TResult, TVariableRepository, TEntity>
    where TEntity : IEntity
    where TVariableRepository : IVariableRepository<TEntity>
{
    TResult SaveChanges();
    Task<TResult> SaveChangesAsync(CancellationToken cancellationToken);
}

public interface IBulkOperableVariableRepository<TVariableRepository, TEntity>
    where TEntity : IEntity
    where TVariableRepository : IVariableRepository<TEntity>
{
    void SaveChanges();
    Task SaveChangesAsync(CancellationToken cancellationToken);
}

public interface IReadOnlyRepository<TEntity>
    where TEntity : IEntity
{
    IQueryable<TEntity> Set { get; }
    TEntity Find(TEntity entity, bool ignoreNullValue);
    Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue);

}
public interface IReadOnlyRepository<TEntity, TKey> : IReadOnlyRepository<TEntity>
    where TEntity : IEntity<TKey>
    where TKey : IEquatable<TKey>
{
    TEntity Find(TKey key);
    Task<TEntity> FindAsync(TKey key);
    IQueryable<TEntity> Find(IEnumerable<TKey> keys);
}

public interface IVariableRepository<TEntity>
    where TEntity : IEntity
{
    void Add(TEntity entity);
    Task AddAsync(TEntity entity, CancellationToken cancellationToken);
    void Update(TEntity entity);
    Task UpdateAsync(TEntity entity, CancellationToken cancellationToken);
    void Delete(TEntity entity, bool isSoftDelete);
    Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken);
    void AddRange(IEnumerable<TEntity> entities);
    Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken);
    void UpdateRange(IEnumerable<TEntity> entities);
    Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken);
    void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete);
    Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken);
}
public interface IVariableRepository<TEntity, TKey> : IVariableRepository<TEntity>
    where TEntity : IEntity<TKey>
    where TKey : IEquatable<TKey>
{
    void Delete(TKey key, bool isSoftDelete);
    Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken);
    void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete);
    Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken);
}

public interface IRepository<TEntity> : IVariableRepository<TEntity>, IReadOnlyRepository<TEntity>
    where TEntity : IEntity
{
}

public interface IRepository<TEntity, TKey> : IRepository<TEntity>, IVariableRepository<TEntity, TKey>, IReadOnlyRepository<TEntity, TKey>
    where TEntity : IEntity<TKey>
    where TKey : IEquatable<TKey>
{
}

EF Core 專用特化版倉儲接口

public interface IEFCoreRepository<TEntity, TDbContext> : IReadOnlyRepository<TEntity>, IVariableRepository<TEntity>, IBulkOperableVariableRepository<int, IEFCoreRepository<TEntity, TDbContext>, TEntity>
    where TEntity : class, IEntity
    where TDbContext : DbContext
{ }

public interface IEFCoreRepository<TEntity, TKey, TDbContext> : IEFCoreRepository<TEntity, TDbContext>, IReadOnlyRepository<TEntity, TKey>, IVariableRepository<TEntity, TKey>
    where TEntity : class, IEntity<TKey>
    where TKey : IEquatable<TKey>
    where TDbContext : DbContext
{ }

CQRS+EDA 相關:

命令接口

分為帶返回值命令和無返回值命令。

public interface ICommand<out TResult> : ICommand
{
}

public interface ICommand : IMessage
{
}

命令總線接口

同樣分為帶返回值和無返回值

public interface ICommandBus<in TCommand>
    where TCommand : ICommand
{
    Task SendCommandAsync(TCommand command, CancellationToken cancellationToken);
}

public interface ICommandBus<in TCommand, TResult> : ICommandBus<TCommand>
    where TCommand : ICommand<TResult>
{
    new Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken);
}

命令處理器接口

同上

public interface ICommandHandler<in TCommand>
    where TCommand : ICommand
{
    Task Handle(TCommand command, CancellationToken cancellationToken);
}

public interface ICommandHandler<in TCommand, TResult> : ICommandHandler<TCommand>
    where TCommand : ICommand<TResult>
{
    new Task<TResult> Handle(TCommand command, CancellationToken cancellationToken);
}

命令存儲接口

可用於歷史命令追溯,返回值可用於返回存儲是否成功或其他必要信息。

public interface ICommandStore
{
    void Save(ICommand command);

    Task SaveAsync(ICommand command, CancellationToken cancellationToken);
}

public interface ICommandStore<TResult> : ICommandStore
{
    new TResult Save(ICommand command);

    new Task<TResult> SaveAsync(ICommand command, CancellationToken cancellationToken);
}

事件接口

沒有返回值

public interface IEvent : IMessage
{
}

事件總線接口

同上

public interface IEventBus
{
    void PublishEvent(IEvent @event);

    Task PublishEventAsync(IEvent @event, CancellationToken cancellationToken);
}

public interface IEventBus<TResult> : IEventBus
{
    new TResult PublishEvent(IEvent @event);

    new Task<TResult> PublishEventAsync(IEvent @event, CancellationToken cancellationToken);
}

事件處理器接口

同上

public interface IEventHandler<in TEvent>
    where TEvent : IEvent
{
    Task Handle(TEvent @event, CancellationToken cancellationToken);
}

事件存儲接口

同命令存儲接口

public interface IEventStore
{
    void Save(IEvent @event);

    Task SaveAsync(IEvent @event, CancellationToken cancellationToken = default);
}

public interface IEventStore<TResult> : IEventStore
{
    new TResult Save(IEvent @event);

    new Task<TResult> SaveAsync(IEvent @event, CancellationToken cancellationToken = default);
}

(命令、事件)消息基礎接口

public interface IMessage
{
    Guid Id { get; }

    DateTimeOffset Timestamp { get; }
}

相關接口定義完畢。

實現

EF Core 泛型倉儲

未知主鍵的實體使用實體對象為條件查找時,使用動態生成表達式的方法。

public class EFCoreRepository<TEntity, TKey, TDbContext> : EFCoreRepository<TEntity, TDbContext>, IEFCoreRepository<TEntity, TKey, TDbContext>
    where TEntity : class, IEntity<TKey>
    where TKey : IEquatable<TKey>
    where TDbContext : DbContext
{
    public EFCoreRepository(TDbContext dbContext) : base(dbContext)
    {
    }

    public virtual void Delete(TKey key, bool isSoftDelete)
    {
        var entity = Find(key);
        Delete(entity, isSoftDelete);
    }

    public virtual Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken = default)
    {
        Delete(key, isSoftDelete);
        return Task.CompletedTask;
    }

    public virtual void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete)
    {
        var entities = Find(keys).ToArray();
        dbSet.AttachRange(entities);
        DeleteRange(entities, isSoftDelete);
    }

    public virtual Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken = default)
    {
        DeleteRange(keys, isSoftDelete);
        return Task.CompletedTask;
    }

    public virtual TEntity Find(TKey key)
    {
        return Set.SingleOrDefault(x => x.Id.Equals(key));
    }

    public virtual IQueryable<TEntity> Find(IEnumerable<TKey> keys)
    {
        return Set.Where(x => keys.Contains(x.Id));
    }

    public override TEntity Find(TEntity entity, bool ignoreNullValue)
    {
        return base.Find(entity, ignoreNullValue);
    }

    public virtual Task<TEntity> FindAsync(TKey key)
    {
        return Set.SingleOrDefaultAsync(x => x.Id.Equals(key));
    }

    public override Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue)
    {
        return base.FindAsync(entity, ignoreNullValue);
    }
}

public class EFCoreRepository<TEntity, TDbContext> : IEFCoreRepository<TEntity, TDbContext>
    where TEntity : class, IEntity
    where TDbContext : DbContext
{
    protected readonly TDbContext dbContext;
    protected readonly DbSet<TEntity> dbSet;

    protected virtual void ProcessChangedEntity()
    {
        var changedEntities = dbContext.ChangeTracker.Entries()
            .Where(x => x.State == EntityState.Added || x.State == EntityState.Modified);
        foreach (var entity in changedEntities)
        {
            (entity as IOptimisticConcurrencySupported)?.GenerateNewConcurrencyStamp();
        }

        var changedEntitiesGroups = changedEntities.GroupBy(x => x.State);
        foreach (var group in changedEntitiesGroups)
        {
            switch (group)
            {
                case var entities when entities.Key == EntityState.Added:
                    foreach (var entity in entities)
                    {
                        if (entity is IActiveControllable)
                        {
                            (entity as IActiveControllable).Active ??= true;
                        }
                    }
                    break;
                case var entities when entities.Key == EntityState.Modified:
                    foreach (var entity in entities)
                    {
                        (entity as IEntity)?.ProcessCreationInfoWhenModified(dbContext);

                        if (entity is IActiveControllable && (entity as IActiveControllable).Active == null)
                        {
                            entity.Property(nameof(IActiveControllable.Active)).IsModified = false;
                        }
                    }
                    break;
                default:
                    break;
            }
        }
    }

    protected virtual void ResetDeletedMark(params TEntity[] entities)
    {
        foreach (var entity in entities)
        {
            if (entity is ILogicallyDeletable)
            {
                (entity as ILogicallyDeletable).IsDeleted = false;
            }
        }
    }

    public EFCoreRepository(TDbContext dbContext)
    {
        this.dbContext = dbContext;
        dbSet = this.dbContext.Set<TEntity>();
    }

    public virtual void Add(TEntity entity)
    {
        dbSet.Add(entity);
    }

    public virtual Task AddAsync(TEntity entity, CancellationToken cancellationToken = default)
    {
        return dbSet.AddAsync(entity, cancellationToken).AsTask();
    }

    public virtual void AddRange(IEnumerable<TEntity> entities)
    {
        dbSet.AddRange(entities);
    }

    public virtual Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
    {
        return dbSet.AddRangeAsync(entities, cancellationToken);
    }

    public virtual void Delete(TEntity entity, bool isSoftDelete)
    {
        dbSet.Attach(entity);
        if (isSoftDelete)
        {
            if (entity is ILogicallyDeletable)
            {
                (entity as ILogicallyDeletable).IsDeleted = true;
            }
            else
            {
                throw new InvalidOperationException($"要求軟刪除的實體不實現{nameof(ILogicallyDeletable)}接口。");
            }
        }
        else
        {
            dbSet.Remove(entity);
        }
    }

    public virtual Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken = default)
    {
        Delete(entity, isSoftDelete);
        return Task.CompletedTask;
    }

    public virtual void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete)
    {
        dbSet.AttachRange(entities);
        foreach (var entity in entities)
        {
            Delete(entity, isSoftDelete);
        }
    }

    public virtual Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken = default)
    {
        DeleteRange(entities, isSoftDelete);
        return Task.CompletedTask;
    }

    public virtual TEntity Find(TEntity entity, bool ignoreNullValue)
    {
        var exp = GenerateWhere(dbContext, entity, ignoreNullValue);

        return Set.SingleOrDefault(exp);
    }

    public virtual Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue)
    {
        var exp = GenerateWhere(dbContext, entity, ignoreNullValue);

        return Set.SingleOrDefaultAsync(exp);
    }

    public virtual int SaveChanges()
    {
        ProcessChangedEntity();
        return dbContext.SaveChanges();
    }

    public virtual Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
    {
        ProcessChangedEntity();
        return dbContext.SaveChangesAsync(cancellationToken);
    }

    public virtual IQueryable<TEntity> Set => dbSet.AsNoTracking();

    public virtual void Update(TEntity entity)
    {
        ResetDeletedMark(entity);
        dbSet.Update(entity);
    }

    public virtual Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default)
    {
        Update(entity);
        return Task.CompletedTask;
    }

    public virtual void UpdateRange(IEnumerable<TEntity> entities)
    {
        ResetDeletedMark(entities.ToArray());
        dbSet.UpdateRange(entities);
    }

    public virtual Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
    {
        UpdateRange(entities);
        return Task.CompletedTask;
    }

    static private Expression<Func<TEntity, bool>> GenerateWhere(TDbContext dbContext, TEntity entity, bool ignoreNullValue)
    {
        //查找實體類型主鍵
        var model = dbContext.Model.FindEntityType(typeof(TEntity));
        var key = model.FindPrimaryKey();

        //查找所有主鍵屬性,如果沒有主鍵就使用所有實體屬性
        IEnumerable<PropertyInfo> props;
        if (key != null)
        {
            props = key.Properties.Select(x => x.PropertyInfo);
        }
        else
        {
            props = model.GetProperties().Select(x => x.PropertyInfo);
        }

        //生成表達式參數
        ParameterExpression parameter = Expression.Parameter(typeof(TEntity), "x");

        //初始化提取實體類型所有屬性信息生成屬性訪問表達式並包裝備用
        var keyValues = props.Select(x => new { key = x, value = x.GetValue(entity), propExp = Expression.Property(parameter, x) });
        //初始化存儲由基礎類型組成的屬性信息(只要個空集合,實際數據在后面的循環中填充)
        var primitiveKeyValues = keyValues.Take(0).Where(x => IsPrimitiveType(x.key.PropertyType));
        //初始化基礎類型屬性的相等比較表達式存儲集合(只要個空集合,實際數據在后面的循環中填充)
        var equals = primitiveKeyValues.Take(0).Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value)));
        //初始化復雜類型屬性存儲集合
        var notPrimitiveKeyValues = primitiveKeyValues;

        //如果還有元素,說明上次用於提取信息的復雜屬性內部還存在復雜屬性,接下來用提取到的基礎類型屬性信息生成相等比較表達式並合並到存儲集合然后繼續提取剩下的復雜類型屬性的內部屬性
        while (keyValues.Count() > 0)
        {
            if (ignoreNullValue)
            {
                keyValues = keyValues.Where(x => x.value != null);
            }
            //提取由基礎類型組成的屬性信息
            primitiveKeyValues = keyValues.Where(x => IsPrimitiveType(x.key.PropertyType));
            //生成基礎類型屬性的相等比較表達式
            equals = equals.Concat(primitiveKeyValues.Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value))));
            //提取復雜類型屬性
            notPrimitiveKeyValues = keyValues.Except(primitiveKeyValues);
            //分別提取各個復雜類型屬性內部的屬性信息繼續生成內部屬性訪問表達式
            keyValues =
                from kv in notPrimitiveKeyValues
                from propInfo in kv.value.GetType().GetProperties()
                select new { key = propInfo, value = propInfo.GetValue(kv.value), propExp = Expression.Property(kv.propExp, propInfo) };
        }

        //如果相等比較表達式有多個,將所有相等比較表達式用 && 運算連接起來
        var and = equals.First();
        foreach (var eq in equals.Skip(1))
        {
            and = Expression.AndAlso(and, eq);
        }

        //生成完整的過濾條件表達式,形如:  (TEntity x) => { return x.a == ? && x.b == ? && x.obj1.m == ? && x.obj1.n == ? && x.obj2.u.v == ?; }
        var exp = Expression.Lambda<Func<TEntity, bool>>(and, parameter);

        //判斷某個類型是否是基礎數據類型
        static bool IsPrimitiveType(Type type)
        {
            var primitiveTypes = new[] {
                typeof(sbyte)
                ,typeof(byte)
                ,typeof(short)
                ,typeof(ushort)
                ,typeof(int)
                ,typeof(uint)
                ,typeof(long)
                ,typeof(ulong)
                ,typeof(float)
                ,typeof(double)
                ,typeof(decimal)
                ,typeof(char)
                ,typeof(string)
                ,typeof(bool)
                ,typeof(DateTime)
                ,typeof(DateTimeOffset)
                //,typeof(Enum)
                ,typeof(Guid)};

            var tmp =
                type.IsDerivedFrom(typeof(Nullable<>))
                ? Nullable.GetUnderlyingType(type)
                : type;

            return tmp.IsEnum || primitiveTypes.Contains(tmp);
        }

        return exp;
    }
}

命令

命令基類

public abstract class MediatRCommand : MediatRCommand<Unit>, ICommand, IRequest
{
}

public abstract class MediatRCommand<TResult> : ICommand<TResult>, IRequest<TResult>
{
    public Guid Id { get; }

    public DateTimeOffset Timestamp { get; }

    public MediatRCommand()
    {
        Id = Guid.NewGuid();
        Timestamp = DateTimeOffset.Now;
    }
}

示例具體命令,命令只包含參數信息,如何使用參數信息完成任務是命令處理器的事

public class ListUserCommand : MediatRCommand<IPagedList<ApplicationUser>>
{
    public PageInfo PageInfo { get; }
    public QueryFilter QueryFilter { get; }
    public ListUserCommand(PageInfo pageInfo, QueryFilter queryFilter)
    {
        PageInfo = pageInfo;
        QueryFilter = queryFilter;
    }
}

命令總線

public class MediatRCommandBus<TCommand, TResult> : ICommandBus<TCommand, TResult>
    where TCommand : MediatRCommand<TResult>
{
    private readonly IMediator mediator;
    private readonly ICommandStore commandStore;

    public MediatRCommandBus(IMediator mediator, ICommandStore commandStore)
    {
        this.mediator = mediator;
        this.commandStore = commandStore;
    }

    public virtual Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken = default)
    {
        commandStore?.SaveAsync(command, cancellationToken);
        return mediator.Send(command, cancellationToken);
    }

    Task ICommandBus<TCommand>.SendCommandAsync(TCommand command, CancellationToken cancellationToken)
    {
        return SendCommandAsync(command, cancellationToken);
    }
}

public class MediatRCommandBus<TCommand> : MediatRCommandBus<MediatRCommand<Unit>, Unit>
    where TCommand : MediatRCommand<Unit>
{
    public MediatRCommandBus(IMediator mediator, ICommandStore commandStore) : base(mediator, commandStore)
    {
    }
}

命令處理器

命令處理器基類

public abstract class MediatRCommandHandler<TCommand, TResult> : ICommandHandler<TCommand, TResult>, IRequestHandler<TCommand, TResult>
where TCommand : MediatRCommand<TResult>
{
    public abstract Task<TResult> Handle(TCommand command, CancellationToken cancellationToken = default);

    Task ICommandHandler<TCommand>.Handle(TCommand command, CancellationToken cancellationToken)
    {
        return Handle(command, cancellationToken);
    }
}

public abstract class MediatRCommandHandler<TCommand> : MediatRCommandHandler<TCommand, Unit>
    where TCommand : MediatRCommand
{
}

具體命令處理器示例,使用注入的倉儲查詢數據,ApplicationUser 在這里就是事實上的聚合根實體。

public class ListUserCommandHandler : MediatRCommandHandler<ListUserCommand, IPagedList<ApplicationUser>>
{
    private IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository;

    public ListUserCommandHandler(IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository)
    {
        this.repository = repository;
    }

    public override Task<IPagedList<ApplicationUser>> Handle(ListUserCommand command, CancellationToken cancellationToken = default)
    {
        return repository.Set
            .OrderBy(x => x.Id)
            .ToPagedListAsync(command.PageInfo.PageNumber, command.PageInfo.PageSize);
    }
}

命令存儲

什么都沒干,實際使用時可以使用數據庫保存相關信息。

public class InProcessCommandStore : ICommandStore<bool>
{
    public bool Save(ICommand command)
    {
        return SaveAsync(command).Result;
    }

    public Task<bool> SaveAsync(ICommand command, CancellationToken cancellationToken = default)
    {
        return Task.FromResult(true);
    }

    void ICommandStore.Save(ICommand command)
    {
        Save(command);
    }

    Task ICommandStore.SaveAsync(ICommand command, CancellationToken cancellationToken)
    {
        return SaveAsync(command, cancellationToken);
    }
}

事件部分和命令基本相同,具體代碼可以到文章末尾下載項目代碼查看。

使用

在 Startup.ConfigureServices 方法中注冊相關服務,事件總線和命令總線都使用 MediatR 實現。.Net Core 內置 DI 支持注冊泛型服務,所以某個實體在實際使用時注入泛型倉儲就表示這個實體是聚合根,不用提前定義具體的聚合根實體倉儲,所以刪除使用代碼相當於刪除了倉儲定義。

services.AddScoped(typeof(ICommandBus<>), typeof(MediatRCommandBus<>));
services.AddScoped(typeof(ICommandBus<,>), typeof(MediatRCommandBus<,>));
services.AddScoped(typeof(ICommandStore), typeof(InProcessCommandStore));
services.AddScoped(typeof(IEventBus), typeof(MediatREventBus));
services.AddScoped(typeof(IEventBus<>), typeof(MediatREventBus<>));
services.AddScoped(typeof(IEventStore), typeof(InProcessEventStore));
services.AddScoped(typeof(IEFCoreRepository<,>), typeof(EFCoreRepository<,>));
services.AddScoped(typeof(IEFCoreRepository<,,>), typeof(EFCoreRepository<,,>));
services.AddMediatR(typeof(ListUserCommandHandler).GetTypeInfo().Assembly);

示例使用比較簡單,就不定義服務了,如果需要定義服務,那么使用服務的一般是命令處理器,倉儲由服務使用。這里命令處理器直接使用倉儲。在控制器中注入命令總線,向命令總線發送命令就可以獲取結果。MediatR 會自動根據發送的命令類型查找匹配的命令處理器去調用。

[ApiController]
[Route("api/[controller]")]
public class UsersController : ControllerBase
{
    private readonly ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> _commandBus;
    private readonly IMapper _mapper;

    public UsersController(ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> commandBus, IMapper mapper)
    {
        _commandBus = commandBus;
        _mapper = mapper;
    }

    /// <summary>
    /// 獲取用戶列表
    /// </summary>
    /// <param name="page">頁碼</param>
    /// <param name="size">每頁條目數</param>
    /// <returns>用戶列表</returns>
    [HttpGet]
    [Produces("application/json")] //聲明接口響應 json 數據
    public async Task<IActionResult> GetAsync(int? page, int? size)
    {
        var cmd = new ListUserCommand(new PageInfo(page ?? 1, size ?? 10), new QueryFilter());
        var users = await _commandBus.SendCommandAsync(cmd, default);

        return new JsonResult(
            new
            {
                rows = users.Select(u => _mapper.Map<ApplicationUserDto>(u)),
                total = users.PageCount, //總頁數
                page = users.PageNumber, //當前頁碼
                records = users.TotalItemCount //總記錄數
            }
        );
    }
}

使用就是這么簡單。使用者根本不需要知道命令處理器的存在,把命令發送到總線,等着接收結果就可以了。

事件一般由命令處理器引發,可以改造命令處理器用 DI 注入事件總線,然后在命令處理器中向事件總線發送事件,事件總線就會自動觸發相應的事件處理器。

結語

完整的流程大概就是:控制器使用注入的服務執行業務流程,業務服務向命令總線發送命令,命令總線觸發處理器處理命令,命令處理器向事件總線發送事件,事件總線觸發事件處理器處理事件,事件處理器在處理事件后向事件總線發送新的事件觸發后續事件處理器繼續處理新的事件(如果需要),直到最后不發送事件的事件處理器完成處理。整個流程完結。在此過程中總線會自動調用注入的總線消息存儲來持久化命令和事件,至此,一個環環相扣的極簡 DDD+CQRS+EDA+ES 架構搭建完成!

想要實際體驗的朋友可以到文章末尾下載項目並運行體驗。啟動調試后訪問 /swagger 然后嘗試體驗調用 api/users 接口。

轉載請完整保留以下內容並在顯眼位置標注,未經授權刪除以下內容進行轉載盜用的,保留追究法律責任的權利!

本文地址:https://www.cnblogs.com/coredx/p/12364960.html

完整源代碼:Github

里面有各種小東西,這只是其中之一,不嫌棄的話可以Star一下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM