[.NET領域驅動設計實戰系列]專題十:DDD擴展內容:全面剖析CQRS模式實現


一、引言

   前面介紹的所有專題都是基於經典的領域驅動實現的,然而,領域驅動除了經典的實現外,還可以基於CQRS模式來進行實現。本專題將全面剖析如何基於CQRS模式(Command Query Responsibility Segregation,命令查詢職責分離)來實現領域驅動設計。

二、CQRS是什么?

   在介紹具體的實現之前,對於之前不了解CQRS的朋友來說,首先第一個問題應該是:什么是CQRS啊?你倒是詳細介紹完CQRS后再介紹具體實現啊?既然大家會有這樣的問題,所以本專題首先全面介紹下什么是CQRS。

  2.1 CQRS發展歷程

  在介紹CQRS之前,我覺得有必要先了解一下CQS(即Command Query Separation,命令查詢分離)模式。我們可以理解CQRS是在DDD的實踐中基於CQS理論而出現的一種體系結構模式。CQS模式最早由軟件大師Bertrand Meyer(Eiffel語言之父,面向對象開-閉原則OCP提出者)提出,他認為,對象的行為僅有兩種:命令和查詢,不存在第三種情況。根據CQS的思想,任何方法都可以拆分為命令和查詢兩部分。例如下面的方法:

        private int _number = 0; public int Add(int factor) { _number += factor; return _number; }

  在上面的方法中,執行了一個命令,即對變量_number加上一個因子factor,同時又執行了一個查詢,即查詢返回_number的值。根據CQS的思想,該方法可以拆成Command和Query兩個方法:

private int _number = 0;
private void AddCommand(int factor)
{
    _number += factor;
}

private int QueryValue()
{
    return _number;
}

  命令和查詢分離使得我們可以更好地把握對象的細節,更好地理解哪些操作會改變系統的狀態。從而使的系統具有更好的擴展性,並獲得更好的性能。

  CQRS根據CQS思想,並結合領域驅動設計思想,由Grey Young在CQRS, Task Based UIs, Event Sourcing agh! 這篇文章中提出。CQRS將之前只需要定義一個對象拆分成兩個對象,分離的原則按照對象中方法是執行命令還是執行查詢來進行拆分的。

  2.2 CQRS結構

  由前面的介紹可知,采用CQRS模式實現的系統結構可以分為兩個部分:命令部分和查詢部分。其系統結構如下圖所示:

  從上面系統結構圖可以發現,采用CQRS實現的領域驅動設計與經典DDD有很大的不同。采用CQRS實現的DDD結構大體分為兩部分,查詢部分和命令部分,並且維護着兩個數據庫實例,一個專門用來進行查詢,另一個用來響應命令操作。然后通過EventHandler操作將命令改變的狀態同步到用來查詢的數據庫實例中。從這個描述中,我們可能會聯想到數據庫級別主從讀寫分離。然而數據讀寫分離是在數據庫層面來實現讀寫分離的機制,而CQRS是在業務邏輯層面來實現讀寫分離機制。兩者是站在兩個不同的層面對讀寫分離進行實現的。

三、為什么需要引入CQRS模式

   前面我們已經詳細介紹了CQRS模式,相信經過前面的介紹,大家對CQRS模式一定有一些了解了,但為什么要引入CQRS模式呢?

  在傳統的實現中,對DB執行增、刪、改、查所有操作都會放在對應的倉儲中,並且這些操作都公用一份領域實體對象。對於一些簡單的系統,使用傳統的設計方式並沒有什么不妥,但在一些大型復雜的系統中,傳統的實現方式也會存在一些問題:

  • 使用同一個領域實體來進行數據讀寫可能會遇到資源競爭的情況。所以經常要處理鎖的問題,在寫入數據的時候,需要加鎖,讀取數據的時候需要判斷是否允許臟讀。這樣使得系統的邏輯性和復雜性增加,並會影響系統的吞吐量。
  • 在大數據量同時進行讀寫的情況下,可能出現性能的瓶頸。
  • 使用同一個領域實體來進行數據庫讀寫可能會太粗糙。在大多是情況下,比如編輯操作,可能只需要更新個別字段,這時卻需要將整個對象都穿進去。還有在查詢的時候,表現層可能只需要個別字段,但需要查詢和返回整個領域實體,再把領域實體對象轉換從對應的DTO對象。
  • 讀寫操作都耦合在一起,不利於對問題的跟蹤和分析,如果讀寫操作分離的話,如果是由於狀態改變的問題就只需要去分析寫操作相關的邏輯就可以了,如果是關於數據的不正確,則只需要關心查詢操作的相關邏輯即可。

  針對上面的這些問題,采用CQRS模式的系統都可以解決。由於CQRS模式中將查詢和命令進行分析,所以使得兩者分工明確,各自負責不同的部分,並且在業務上將命令和查詢分離能夠提高系統的性能和可擴展性。既然CQRS這么好,那是不是所有系統都應該基於CQRS模式去實現呢?顯然不是的,CQRS也有其使用場景:

  1. 系統的業務邏輯比較復雜的情況下。因為本來業務邏輯就比較復雜了,如果再把命令操作和查詢操作綁定同一個業務實體的話,這樣會導致后期的需求變更難於進行擴展下去。
  2. 需要對系統中查詢性能和寫入性能分開進行優化的情況下,尤其讀/寫比例非常高的情況下。例如,在很多系統中讀操作的請求數遠大於寫操作,此時,就可以考慮將寫操作抽離出來進行單獨擴展。
  3. 系統在將來隨着時間不斷變化的情況下。

  然而,CQRS也有其不適用的場景:

  • 業務邏輯比較簡單的情況下,此時采用CQRS反而會把系統搞的復雜。
  • 系統用戶訪問量都比較小的情況下,並且需求以后不怎么會變更的情況下。針對這樣的系統,完全可以用傳統的實現方式快速將系統實現出來,沒必要引入CQRS來增加系統的復雜度。

四、事件溯源

  在CQRS中,查詢方面,直接通過方法查詢數據庫,然后通過DTO將數據返回,這個方面的操作相對比較簡單。而命令方面,是通過發送具體Command,接着由CommandBus來分發到具體的CommandHandle來進行處理,CommandHandle在進行處理時,並沒有直接將對象的狀態保存到外部持久化結構中,而僅僅是從領域對象中獲得產生的一系列領域事件,並將這些事件保存到Event Store中,同時將事件發布到事件總線Event Bus進行下一步處理;接着Event Bus同樣進行協調,將具體的事件交給具體的Event Handle進行處理,最后Event Handler再把對象的狀態保存到對應Query數據庫中。

  上面過程正是CQRS系統中的調用順序。從中可以發現,采用CQRS實現的系統存在兩個數據庫實例,一個是Event Store,該數據庫實例用來保存領域對象中發生的一系列的領域事件,簡單來說就是保存領域事件的數據庫。另一個是Query Database,該數據庫就是存儲具體的領域對象數據的,查詢操作可以直接對該數據庫進行查詢。由於,我們在Event Store中記錄領域對象發生的所有事件,這樣我們就可以通過查詢該數據庫實例來獲得領域對象之前的所有狀態了。所謂Event Sourcing,就是指的的是:通過事件追溯對象的起源,它允許通過記錄下來的事件,將領域模型恢復到之前的任意一個時間點。

  通過Event來記錄領域對象所發生的所有狀態,這樣利用系統的跟蹤並能夠方便地回滾到某一歷史狀態。經過上面的描述,感覺事件溯源一般用於系統的維護。例如,我們可以設計一個同步服務,該服務程序從Event Store數據庫查詢出領域對象的歷史數據,從而打印生成一個歷史報表,如歷史價格報表等。但正是的CQRS系統中如何使用Event Sourcing的呢?

  在前面介紹CQRS系統的調用順序中,我們講到,由Event Handler將對象的狀態保存到對應的Query數據庫中,這里有一個問題,對象的狀態怎么獲得呢?對象狀態的獲得正是由Event sourcing機制來獲得,因為用戶發送的僅僅是Command,Command中並不包含對象的狀態數據,所以此時需要通過Event Sourcing機制來查詢Event Store來還原對象的狀態,還原根據就是對應的Id,該Id是通過命令傳入的。Event Sourcing的調用需要放在CommandHandle中,因為CommandHandle需要先獲得領域對象,這樣才能把領域對象與命令對象來進行對比,從而獲得領域對象中產生的一系列領域事件。

五、快照

   然而,當隨着時間的推移,領域事件變得越來越多時,通過Event Sourcing機制來還原對象狀態的過程會非常耗時,因為每一次都需要從最早發生的事件開始。那有沒有好的一個方式來解決這個問題呢?答案是肯定的,即在Event Sourcing中引入快照(Snapshots)實現。實現原理就是——沒產生N個領域事件,則對對象做一次快照。這樣,領域對象溯源的時候,可以先從快照中獲得最近一次的快照,然后再逐個應用快照之后所有產生的領域事件,而不需要每次溯源都從最開始的事件開始對對象重建,這樣就大大加快了對象重建的過程。

六、CQRS模式實現和剖析

  前面介紹了那么多CQRS的內容,下面就具體通過一個例子來演示下CQRS系統的實現。

  命令部分的實現

  

    // 應用程序初始化操作,將依賴的對象通過依賴注入框架StructureMap進行注入
    public sealed class ServiceLocator
    {
        private static readonly ICommandBus _commandBus;
        private static readonly IStorage _queryStorage;
        private static readonly bool IsInitialized;
        private static readonly object LockThis = new object();
        
        static ServiceLocator()
        {
            if (!IsInitialized)
            {
                lock (LockThis)
                {
                    // 依賴注入
                    ContainerBootstrapper.BootstrapStructureMap();

                    _commandBus = ContainerBootstrapper.Container.GetInstance<ICommandBus>();
                    _queryStorage = ContainerBootstrapper.Container.GetInstance<IStorage>();
                    IsInitialized = true;
                }
            }
        }

        public static ICommandBus CommandBus
        {
            get { return _commandBus; }
        }

        public static IStorage QueryStorage
        {
            get { return _queryStorage; }
        }
    }

    class ContainerBootstrapper
    {
        private static Container _container;
        public static void BootstrapStructureMap()
        {
            _container = new Container(x =>
            {
                x.For(typeof (IDomainRepository<>)).Singleton().Use(typeof (DomainRepository<>));
                x.For<IEventStorage>().Singleton().Use<InMemoryEventStorage>();
                x.For<IEventBus>().Use<EventBus>();
                x.For<ICommandBus>().Use<CommandBus>();
                x.For<IStorage>().Use<InMemoryStorage>();
                x.For<IEventHandlerFactory>().Use<StructureMapEventHandlerFactory>();
                x.For<ICommandHandlerFactory>().Use<StructureMapCommandHandlerFactory>();
            });
        }

        public static Container Container 
        {
            get { return _container;}
        }
    }

public class HomeController : Controller
    {
         [HttpPost]
        public ActionResult Add(DiaryItemDto item)
        {
            // 發布CreateItemCommand到CommandBus中
            ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));

            return RedirectToAction("Index");
        }    
    }

 // CommandBus 的實現
    public class CommandBus : ICommandBus
    {
        private readonly ICommandHandlerFactory _commandHandlerFactory;

        public CommandBus(ICommandHandlerFactory commandHandlerFactory)
        {
            _commandHandlerFactory = commandHandlerFactory;
        }

        public void Send<T>(T command) where T : Command
        {
            // 獲得對應的CommandHandle來對命令進行處理
            var handlers = _commandHandlerFactory.GetHandlers<T>();

            foreach (var handler in handlers)
            {
                // 處理命令
                handler.Execute(command);
            }
        }       
    }

// 對CreateItemCommand處理類
    public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
    {
        private readonly IDomainRepository<DiaryItem> _domainRepository;

        public CreateItemCommandHandler(IDomainRepository<DiaryItem> domainRepository)
        {
            _domainRepository = domainRepository;
        }

        // 具體處理邏輯
        public void Execute(CreateItemCommand command)
        {
            if (command == null)
            {
                throw new ArgumentNullException("command");
            }
            if (_domainRepository == null)
            {
                throw new InvalidOperationException("domainRepository is not initialized.");
            }

            var aggregate = new DiaryItem(command.ID, command.Title, command.Description, command.From, command.To)
            {
                Version = -1
            };

            // 將對應的領域實體進行保存
            _domainRepository.Save(aggregate, aggregate.Version);
        }
    }

 // IDomainRepository的實現類
    public class DomainRepository<T> : IDomainRepository<T> where T : AggregateRoot, new()
    {
             // 並沒有直接對領域實體進行保存,而是先保存領域事件進EventStore,然后在Publish事件到EventBus進行處理
        // 然后EventBus把事件分配給對應的事件處理器進行處理,由事件處理器來把領域對象保存到QueryDatabase中
        public void Save(AggregateRoot aggregate, int expectedVersion)
        {
            if (aggregate.GetUncommittedChanges().Any())
            {
                _storage.Save(aggregate);
            }
        }
    }

 // Event Store的實現,這里保存在內存中,通常是保存到具體的數據庫中,如SQL Server、Mongodb等
    public class InMemoryEventStorage : IEventStorage
    {
         // 領域事件的保存
        public void Save(AggregateRoot aggregate)
        {
            // 獲得對應領域實體未提交的事件
            var uncommittedChanges = aggregate.GetUncommittedChanges();
            var version = aggregate.Version;

            
            foreach (var @event in uncommittedChanges)
            {
                version++;
                // 沒3個事件創建一次快照
                if (version > 2)
                {
                    if (version % 3 == 0)
                    {
                        var originator = (ISnapshotOrignator)aggregate;
                        var snapshot = originator.CreateSnapshot();
                        snapshot.Version = version;
                        SaveSnapshot(snapshot);
                    }
                }

                @event.Version = version;
                // 保存事件到EventStore中
                _events.Add(@event);
            }

            // 保存事件完成之后,再將該事件發布到EventBus 做進一步處理
            foreach (var @event in uncommittedChanges)
            {
                var desEvent = TypeConverter.ChangeTo(@event, @event.GetType());
                _eventBus.Publish(desEvent);
            }
        }
    }

  // EventBus的實現
    public class EventBus : IEventBus
    {
        private readonly IEventHandlerFactory _eventHandlerFactory;

        public EventBus(IEventHandlerFactory eventHandlerFactory)
        {
            _eventHandlerFactory = eventHandlerFactory;
        }

        public void Publish<T>(T @event) where T : DomainEvent
        {
            // 獲得對應的EventHandle來處理事件
            var handlers = _eventHandlerFactory.GetHandlers<T>();
            foreach (var eventHandler in handlers)
            {
                // 對事件進行處理
                eventHandler.Handle(@event);
            }
        }
    }

// DiaryItemCreatedEvent的事件處理類
    public class DiaryIteamCreatedEventHandler : IEventHandler<DiaryItemCreatedEvent>
    {
        private readonly IStorage _storage;

        public DiaryIteamCreatedEventHandler(IStorage storage)
        {
            _storage = storage;
        }

        public void Handle(DiaryItemCreatedEvent @event)
        {
            var item = new DiaryItemDto()
            {
                Id = @event.SourceId,
                Description = @event.Description,
                From = @event.From,
                Title = @event.Title,
                To = @event.To,
                Version = @event.Version
            };

            // 將領域對象持久化到QueryDatabase中
            _storage.Add(item);
        }
    }
    

  上面代碼主要演示了Command部分的實現,從代碼可以看出,首先我們需要通過ServiceLocator類來對依賴注入對象進行注入,然后UI層通過CommandBus把對應的命令發布到CommandBus中進行處理,命令總線再查找對應的CommandHandler來對命令進行處理,接着CommandHandler調用倉儲類來保存領域對象對應的事件,保存事件成功后再將事件發布到事件總線中進行處理,然后由對應的事件處理程序將領域對象保存到QueryDatabase中。這樣就完成了命令部分的操作,從中可以發現,命令部分的實現和CQRS系統中的系統結構圖的處理過程是一樣的。然而創建日志命令並沒有涉及事件溯源操作,因為創建命令並需要重建領域對象,此時的領域對象是通過創建日志命令來獲得的,但在修改和刪除命令中涉及了事件溯源,因為此時需要根據命令對象的ID來重建領域對象。具體的實現可以參考源碼。

  下面讓我們再看看查詢部分的實現。

  查詢部分的實現代碼:

 public class HomeController : Controller
    {
        // 查詢部分
        public ActionResult Index()
        {
            // 直接獲得QueryDatabase對象來查詢所有日志
            var model = ServiceLocator.QueryStorage.GetItems();
            return View(model);
        }
    }

 public class InMemoryStorage : IStorage
    {
        private static readonly List<DiaryItemDto> Items = new List<DiaryItemDto>();

        public DiaryItemDto GetById(Guid id)
        {
            return Items.FirstOrDefault(a => a.Id == id);
        }

        public void Add(DiaryItemDto item)
        {
            Items.Add(item);
        }

        public void Delete(Guid id)
        {
            Items.RemoveAll(i => i.Id == id);
        }

        public List<DiaryItemDto> GetItems()
        {
            return Items;
        }
    }

  從上面代碼可以看出,查詢部分的代碼實現相對比較簡單,UI層直接通過QueryDatabase來查詢領域對象,然后由UI層進行渲染出來顯示。

  到此,一個簡單的CQRS系統就完成了,然而在項目中,UI層並不會直接CommandBus和QueryDatabase進行引用,而是通過對應的CommandService和QueryService來進行協調,具體的系統結構如下圖所示(只是在CommandBus和Query Database前加入了一個SOA的服務層來進行協調,這樣有利於系統擴展,可以通過SOA服務來進行請求路由,將不同請求路由不同的系統中,這樣會可以實現多個系統進行一個整合):

  關於該CQRS系統的演示效果,大家可以自行去Github或MSDN中進行下載,具體的下載地址將會本專題最后給出。

七、總結

   到這里,本專題關於CQRS的介紹就結束了,並且本專題也是領域驅動設計系列的最后一篇了。本系列專題的內容主要是參考daxnet的ByteartRetail案例,由於daxnet在寫這個案例的時候並沒有一步一步介紹其創建過程,對於一些領域驅動的初學者來說,直接去學習這個案例未免會有點困難,導致學習興趣降低,從而放棄領域驅動的學習。為了解決這些問題,所以,本人對ByteartRetail案例進行剖析,並參考該案例一步步實現自己的領域驅動案例OnlineStore。希望本系列可以幫助大家打開領域驅動的大門。

  由於現在NO-SQL在互聯網行業的應用已經非常流行,以至於面試的時候經常會被問到你用過的非關系數據庫有哪些?所以本人也不想Out,所以在最近2個月的時候學習了一些No-SQL的內容,所以,接下來,我將會開啟一個NO-SQL系列,記錄自己這段時間來學習NO-SQL的一些心得和體會。

 

  本專題所有源碼下載:

  Github地址:https://github.com/lizhi5753186/CQRSDemo

   MSDN地址:https://code.msdn.microsoft.com/CQRS-1f05ebe5

     本文參考鏈接:

     http://www.codeproject.com/Articles/555855/Introduction-to-CQRS 

     http://www.cnblogs.com/daxnet/archive/2010/08/02/1790299.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM