[譯]CQRS介紹


以下內容均為看完原文后自己的理解。並非一字一句翻譯,會盡量保持原文意思。

什么是 CQRS:

CQRS 意思就是命令查詢職責分離(Command Query Responsibility Segregation)。很多人認為 CQRS 是一個完整的架構,但是他們錯了。它只是一個小小的模式。Greg Young 和 Udi Dahan 首先介紹了這種模式。他們是從 Bertrand Meyer 的 “面向對象的軟件結構”一書中得到了 CQS(查詢與命令分離( Command Query Separation )) 模式的設計靈感。CQS 背后的主要靈感是:“一個方法更改對象的狀態或返回一個結果,但是不能同時包含這兩個行為。更正式的說,如果它們之間的引用是透明的且無副作用的,那么這個方法只需要返回一個值。”,因此我們可以把方法分為兩組:

  • Commands : 更改一個對象或整個系統的狀態。
  • Query : 返回結果但並不會改變對象的狀態。

在實際使用中很容易分清哪個是 Command 哪個是 Query 。查詢將返回一個類型,而命令的返回是 void 類型的。這種模式被廣泛使用,它使推理相關對象更容易。另一方面,CQS 僅適用於特定的問題。

許多應用程序使用覺的主流方法,包括模型的讀取和寫入。同一個模型如果擁有讀取和寫入的功能,則可能很維護和優化。

這兩種模式的真正功能是可以分開改變那些沒有狀態的方法。這種分享可以在非常方便的情況下處理性能和調優。你可以從寫入端單獨優化系統的的讀取端。寫入端稱之為領域。領域包含所有行為(業務邏輯)。讀取端專門匯報需求。

這種模式的另一個好處是在大型應用程序中。可以將開發者分為較小的團隊工作在不同方面的系統(讀取或寫入)。例如:在讀取端的開發人員就不需要了解領域模型。

查詢端:

查詢只包含獲取數據的方法。從架構的角度查看這些都會返回 DTO 對象。DTO 對象通常是領域對象。在某些情況下這可能是一個非常痛苦的過程,特別是在獲取復雜的 DTO 對象時。

使用 CQRS 可以避免這種情況的發生。相反,它可能介紹一種新的映射到 DTO 的方法。你可以繞過領域模型通過讀取端直接從數據存儲介質中獲取 DTO 對象。當應用程序獲取數據時,可以通過讀取端調用一個單一的方法來返回一個包含所需數據的 DTO 對象。

讀取層可以直接連接數據庫(數據模型)而且使用存儲過程也不是一個壞主意。直接連接到數據源的查詢很容易維護和優化。它就是使用反規范化數據的意義。這樣做的原因是:數據查詢的次數通常是執行領域模型行為的的許多倍(即領域模型中行為的執行次數比常規的數據查詢少很多)。這種反規范化可以提高應用程序的性能。

命令端:

由於讀取端被分離,所以領域模型只集中處理命令(Command)。現在不再需要公開領域對象的內部狀態,倉儲(即數據持久化的存儲介質)除了GetById方法,只有幾個查詢方法。

命令由客戶端應用程序創建並發送到領域層。命令信息指定一個特定的實體執行某些行為(操作)。命令的命名方法如:ChangeName, DeleteOrder,...。它們指定目標實體執行一些行為返回不同的結果或者執行行為失敗收到錯誤信息。命令通過命令處理程序處理。

public interface ICommand
{
    Guid Id { get; }
}

public class Command : ICommand
{
    public Guid Id { get; private set; }
    public int Version { get; private set; }
    public Command(Guid id,int version)
    {
        Id = id;
        Version = version;
    }
}
 
public class CreateItemCommand:Command
{
    public string Title { get; internal set; }
    public string Description { get;internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }

    public CreateItemCommand(Guid aggregateId, string title, 
        string description,int version,DateTime from, DateTime to)
        : base(aggregateId,version)
    {
        Title = title;
        Description = description;
        From = from;
        To = to;
    }
}

所有的命令將被發送到命令處理程序並委托給每個命令的命令總線。這表明,進入一個領域實體只有一個入口點。命令處理程序的責任是執行對應的領域模型上的行為。當執行命令時命令處理程序應該有一個連接到倉儲且提供加載所需的實體的能力(在這里稱為聚合根)。

public interface ICommandHandler<TCommand> where TCommand : Command
{
    void Execute(TCommand command);
}

public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
{
    private IRepository<DiaryItem> _repository;

    public CreateItemCommandHandler(IRepository<DiaryItem> repository)
    {
        _repository = repository;
    }

    public void Execute(CreateItemCommand command)
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }
        if (_repository == null)
        {
            throw new InvalidOperationException("Repository is not initialized.");
        }
        var aggregate = new DiaryItem(command.Id, command.Title, command.Description,             
                                      command.From, command.To);
        aggregate.Version = -1;
        _repository.Save(aggregate, aggregate.Version);
    }
} 

命令處理程序需要執行以下任務:

  • 從消息基礎設施(命令總線)接收 Command 實例。
  • 它驗證該命令是一個有效的命令。
  • 它確定命令的目標是聚合的實例。
  • 在命令上可以傳遞任何參數,然后在聚合實例上調用適當的方法。
  • 它保存新聚合的狀態到存儲介質。

內部事件:

第一個問題我們要問的是:什么是領域事件。領域事件是在系統中已經發生的事件。事件通常是一個命令的結果。例如:客戶端獲取一個 DTO 對象和改變對象的狀態就會導致事件的發生。適當的命令處理程序加載正確的聚合根並執行適當的行為。然后行為將引發一個事件。事件被特定的訂閱者處理。聚合發布一個事件到一個事件總線(Event Bus)並傳遞事件到適當的事件處理程序。聚合根內處理的事件稱為內部事件。事件處理程序不需要處理任何邏輯從而取代設置狀態。

領域行為:

public void ChangeTitle(string title)
{
    ApplyChange(new ItemRenamedEvent(Id, title));
}

領域事件:

public class ItemCreatedEvent:Event
{
    public string Title { get; internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }
    public string Description { get;internal set; }

    public ItemCreatedEvent(Guid aggregateId, string title ,
        string description, DateTime from, DateTime to)
    {
        AggregateId = aggregateId;
        Title = title;
        From = from;
        To = to;
        Description = description;
    }
}

public class Event:IEvent
{
    public int Version;
    public Guid AggregateId { get; set; }
    public Guid Id { get; private set; }
} 

內部領域事件處理器:

public void Handle(ItemRenamedEvent e)
{
    Title = e.Title;
} 

事件通常連接到另一個模式被稱為事件溯源(Event Sourcing. ES.)ES是為了記錄在聚合狀態變化時保存事件流並持久化聚合狀態的一種途徑。

如前面所說的,聚合根的每一個狀態的改變都是有事件引發的且聚合根的內部處理程序除了設置正確的狀態沒有其它作用。為了獲取聚合根的狀態我們不得不在內部重新播放(引發)所有的事件。這里我必須指出,事件是只寫的。你不能改變或刪除現有事件。如果你發現某些邏輯在系統中生成錯誤的事件,你必須生成一個新的補償事件並糾正以前的錯誤事件的結果。

外部事件:

外部事件通常用於報告數據庫同步與當前領域的狀態。它是通過內部事件發布到外部領域的。當事件發布的時候適當的事件處理程序會處理這個事件。外部事件可以發布到多個事件處理程序。事件處理程序執行以下任務:

  • 從消息基礎設施(Event Bus)接收事件實例。
  • 它確定事件的目標進程管理器
  • 在事件上可以傳遞任何參數,然后在聚合實例上調用適當的方法。
  • 它保存進程管理器的新狀態到存儲介質。

但是誰可以發布事件呢?通常領域倉儲可以發布外部事件。

示例程序:

我創建了一個非常簡單的示例,演示如何使用 CQRS 模式。這個簡單的示例允許你創建日記記錄和修改它們。

該解決方案有三個項目:

  • Diary.CQRS
  • Diary.CQRS.Configuration
  • Diary.CQRS.Web

第一個是基礎項目,它包含所有領域模型和消息處理對象。現在讓我們進一步了解一下這些主要項目。

Diary.CQRS

如前面所說,這個項目包含所有的領域模型和消息對象在這個示例中。在發送命令時 CQRS 示例的入口點是事件總線。這個 class 只有一個通用的方法 Send(T command) 。該方法使用 CommandHandlerFactory 創建適當的命令處理程序。如果沒有命令處理程序與命令相關聯,那么將會拋出一個異常。在其它情況下,調用 Execute 方法將會執行其行為(即命令處理程序中的行為)。行為創建一個內部事件且這個事件存儲到一個名稱為 _changes 的字段中,這個字段在聚合根基類中聲明。接下來,是由內部處理此事件的事件處理程序更改聚合的狀態。在行為被執行之后,所有聚合的更改將保存在倉儲中。倉儲比較聚合與預期版本不一樣的地方然后作上標記,然后存儲到倉儲中。如果這些版本是不同的,這意味着該對象已被某個人修改了並且將引發 ConcurrencyException 異常。在其它情況下這些改變將存儲在事件倉儲中。

倉儲(Repository):

public class Repository<T> : IRepository<T> where T : AggregateRoot, new()
{
    private readonly IEventStorage _storage;
    private static object _lockStorage = new object();

    public Repository(IEventStorage storage)
    {
        _storage = storage;
    } 

    public void Save(AggregateRoot aggregate, int expectedVersion)
    {
        if (aggregate.GetUncommittedChanges().Any())
        {
            lock (_lockStorage)
            {
                var item = new T();

                if (expectedVersion != -1)
                {
                    item = GetById(aggregate.Id);
                    if (item.Version != expectedVersion)
                    {
                        throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",
                                                                     item.Id));
                    }
                }

                _storage.Save(aggregate);
            }
        }
    }
 
    public T GetById(Guid id)
    {
       IEnumerable<Event> events;
       var memento = _storage.GetMemento<BaseMemento>(id);
       if (memento != null)
       {
           events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version);
       }
       else
       {
           events = _storage.GetEvents(id);
       }
        var obj = new T();
        if(memento!=null)
            ((IOriginator)obj).SetMemento(memento);
        
        obj.LoadsFromHistory(events);
        return obj;
    }
}

InMemoryEventStorage:

在這個簡單的示例中,我已經創建了一個 InMemoryEventStorage 用於存儲所有的事件到內存中。這個類實現了 IEventStorage 接口,此接口包含 4 個方法。

public IEnumerable<Event> GetEvents(Guid aggregateId)
{
    var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p);
    if (events.Count() == 0)
    {
        throw new AggregateNotFoundException(string.Format(
          "Aggregate with Id: {0} was not found", aggregateId));
    }
    return events;
} 

此方法返回聚合的所有事件且當聚合沒有事件時將拋出一個錯誤(即聚合是不存在的)。

 

public void Save(AggregateRoot aggregate)
{
    var uncommittedChanges = aggregate.GetUncommittedChanges();
    var version = aggregate.Version;
    
    foreach (var @event in uncommittedChanges)
    {
        version++;
        if (version > 2)
        {
            if (version % 3 == 0)
            {
                var originator = (IOriginator)aggregate;
                var memento = originator.GetMemento();
                memento.Version = version;
                SaveMemento(memento);
            }
        }
        @event.Version=version;
        _events.Add(@event);
    }
    foreach (var @event in uncommittedChanges)
    {
        var desEvent = Converter.ChangeTo(@event, @event.GetType());
        _eventBus.Publish(desEvent);
    }
}

 

這個方法存儲事件到內存且為此聚合創建三個事件副本(備忘)。這些副本將為這個聚合保存所有狀態和版本。使用副本增加應用程序的性能。因為這是加載所有不重要的事件。

當所有事件都被存儲,它們將被事件總線發布且由外部處事程序執行。

public T GetMemento<T>(Guid aggregateId) where T : BaseMemento
{
    var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault();
    if (memento != null)
        return (T) memento;
    return null;
}

返回一個聚合的副本(備忘)。

 

public void SaveMemento(BaseMemento memento)
{
    _mementos.Add(memento);
}

存儲聚合的副本(備忘)。

 

聚合根(Aggregate Root):

聚合根是所有聚合的基類。這個類實現了 IEventProvider 接口。它包含有關 _changes 列表中的所有未提交的更改的信息。這個類還有一個 ApplyChange 方法執行適當的內部事件處理程序的方法。 LoadFromHistory 方法加載並應用內部事件。

public abstract class AggregateRoot:IEventProvider
{
    private readonly List<Event> _changes;

    public Guid Id { get; internal set; }
    public int Version { get; internal set; }
    public int EventVersion { get; protected set; }

    protected AggregateRoot()
    {
        _changes = new List<Event>();
    }

    public IEnumerable<Event> GetUncommittedChanges()
    {
        return _changes;
    }

    public void MarkChangesAsCommitted()
    {
        _changes.Clear();
    }
 
    public void LoadsFromHistory(IEnumerable<Event> history)
    {
        foreach (var e in history) ApplyChange(e, false);
        Version = history.Last().Version;
        EventVersion = Version;
    }

    protected void ApplyChange(Event @event)
    {
        ApplyChange(@event, true);
    }

    private void ApplyChange(Event @event, bool isNew)
    {
        dynamic d = this;
        
        d.Handle(Converter.ChangeTo(@event,@event.GetType()));
        if (isNew)
        {
            _changes.Add(@event);
        }
    }      
}

事件總線(EventBus):

事件描述系統狀態的變化。 事件的主要目的是更新所讀取的領域模型。為了這個目的我創建了 EventBus 類。EventBus 類的唯一行為就是發布事件到訂閱服務器,一個事件可以發布到多個訂閱服務器。在本例中不需要手動訂閱。事件處理工廠返回所有 EventHandler 的一個列表並可以處理當前的事件。

public class EventBus:IEventBus
{
    private IEventHandlerFactory _eventHandlerFactory;

    public EventBus(IEventHandlerFactory eventHandlerFactory)
    {
        _eventHandlerFactory = eventHandlerFactory;
    }
    
    public void Publish<T>(T @event) where T : Event
    {
        var handlers = _eventHandlerFactory.GetHandlers<T>();
        foreach (var eventHandler in handlers)
        {
            eventHandler.Handle(@event);
        }
    }
}

 

事件處理器(Event Handlers): 

事件處理程序的主要目的是接收事件並更新所讀取的領域模型。在下面的例子你可以看到 ItemCreatedEventHandler 。它處理 ItemCreatedEvent 。使用事件中的信息創建一個新對象且存儲它到報表數據庫。

public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent>
{
    private readonly IReportDatabase _reportDatabase;
    public ItemCreatedEventHandler(IReportDatabase reportDatabase)
    {
        _reportDatabase = reportDatabase;
    }
    public void Handle(ItemCreatedEvent handle)
    {
        DiaryItemDto item = new DiaryItemDto()
            {
                Id = handle.AggregateId,
                Description =  handle.Description,
                From = handle.From,
                Title = handle.Title,
                To=handle.To,
                Version =  handle.Version
            };

        _reportDatabase.Add(item);
    }
}

 

Diary.CQRS.Web:

這個例子作為用戶界面的 CQRS 的例子。這個 Web UI 項目是一個簡單的 ASP.NET MVC4 應用程序,只有一個 HomeController 和 6 個 ActionResult 方法:

 ActionResult Index() - 這個方法返回 Index View 包含所有日記項的列表。

 ActionResult Delete(Guid id) - 這個方法創建一個新的 DeleteItemCommand 且發送它到 CommandBus 。當一個命令發送時,並返回 Index View。

 ActionResult Add() - 當添加一條新日記將返回一個 Add View 。

 ActionResult Add(DiaryItemDto item) - 這個方法創建一個新的 CreateItemCommand 並將其發送到 CommandBus 。當新的項被添加將返回 Index View 。

 ActionResult Edit(Guid id) - 返回選中日記項的 Edit View 。

 ActionResult Edit(DiaryItemDto item) - 這個方法創建一個新的 CreateItemCommand 並將其發送到 CommandBus 。當一個列表項被成功更新時,將返回 Index View 。在 ConcurrencyError 的情況下,將返回 Edit View 並顯示一條異常信息。

在下面的圖片中你可以看到主屏幕和日記條目的列表。

什么時候使用 CQRS:

一般情況下,DDD+CQRS,都會應用到大型系統中,這些系統復雜且需要互相協作。

什么時候不用 CQRS:

一般的小項目使用 DDD + CQRS 則沒有這個必要,小項目使用簡單的數據庫驅動開發更來的快。 

 

原文地址:http://www.codeproject.com/Articles/555855/Introduction-to-CQRS


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM