一、寫在前面
上篇大致介紹過了領域驅動的主要概念,內容並不詳盡,相關方面的知識大家可以參考園子里湯雪華和陳晴陽的博客,上篇有說過,領域驅動設計重點是建立正確的領域模型,這取決於對業務的理解和抽象能力,本篇將以一個簡單的訂單流程來實踐領域驅動設計,希望能夠給想實踐DDD的人提供一種實現思路。
二、訂單流程
這是一個簡化了的訂單流程,實際情況還有很多細節要考慮。但這不妨礙本文的一個演示目的。
圖中的發布事件即為發布消息至消息隊列,為了達到EventSourcing的目的會在每次發布消息前將其持久化到數據庫。
示例源碼在本文最下面。
三、搭建分層架構解決方案
我們以領域驅動設計的經典分層架構來搭建我們的解決方案。如下圖
Applicaiton:應用層,在這里我們用ServiceStack實現的Web服務來作為應用層(實際情況該層承擔的應該是應用功能的划分和協調,但作為示例將其合並在同一層次)。
Domain:領域層,包含了業務所涉及的領域對象(實體、值對象),技術無關性。
Infrastructure:基礎設施層,數據庫持久化,消息隊列實現,業務無關性。
SampleTests:在這里我們用單元測來作為表現層。
四、基礎設施層
1:首先定義出領域模型
領域模型有一個聚合根的概念,定義模型之前我們先定義一個聚合根的接口。
namespace Infrastructure.Database { /// <summary> /// 聚合根 /// </summary> public interface IAggregateRoot { /// <summary> // 每個聚合根必須擁有一個全局的唯一標識,往往是GUID。 /// </summary> Guid Id { get; set; } } }我們為該聚合根定義一個抽象的實現類,通過使用[Key, DatabaseGenerated(DatabaseGeneratedOption.Identity)]可以使主鍵按排序規則生成。
namespace Infrastructure.Database { /// <summary> /// 實體基類 /// </summary> public abstract class EntityBase<TKey> { [Key, DatabaseGenerated(DatabaseGeneratedOption.Identity)] public TKey Id { get; set; } } /// <summary> /// 實體基類(GUID) /// </summary> public abstract class EntityBase :EntityBase<Guid>, IAggregateRoot { } }
2:CQRS接口及定義
首先是Command
namespace Infrastructure.Commands { public interface ICommand : IReturn<CommandResult> { Guid CommandId { get; } } } namespace Infrastructure.Commands { public interface ICommandHandler<in TCommand> : IHandler<TCommand> where TCommand : ICommand { } } namespace Infrastructure.Commands { public class CommandResult { public CommandResult() { } public CommandResult(bool result = true,string msg = "") { this.Result = result; this.Msg = msg; } public bool Result { get; set; } public string Msg { get; set; } } } namespace Infrastructure.Commands { public abstract class CommandHandlerBase { protected async Task DoHandle<TMessage>(Func<TMessage, Task> handlerAction, TMessage message) where TMessage : ICommand { try { await handlerAction.Invoke(message); } //catch MoreException catch (Exception e) { throw new Exception(e.Message); } } } }然后是Event
namespace Infrastructure.Events { public interface IEvent : IReturnVoid { Guid EventId { get; } } } namespace Infrastructure.Events { public interface IEventHandler<in TEvent> : IHandler<TEvent> where TEvent : IEvent { } } namespace Infrastructure.Events { public abstract class EventHandlerBase { public virtual async Task DoHandle<TMessage>(Func<TMessage, Task> handlerAction, TMessage message) where TMessage : IEvent { try { await handlerAction.Invoke(message); } //catch MoreException catch (Exception e) { throw new Exception(e.Message); } } } }最后是Bus
namespace Infrastructure.Bus { public interface IEventBus { void Publish<T>(T message) where T : IEvent; } } namespace Infrastructure.Bus { public interface ICommandBus { CommandResult Excute<T>(T command) where T : ICommand; Task<CommandResult> ExcuteAsync<T>(T command) where T : ICommand; } }
基礎設施層到這里就算完成了,還有個倉儲的實現上篇有說明,有點要說明的是本文的示例Domain層中並沒有做到真正的純凈,譬如數據庫持久化采用的EF實現,且把上下文放置在了Domain層,若作為開發框架是不建議這樣做的,要達到完全解耦可以參考陳晴陽的開源項目Apworks。
五、領域層
首先定義出所需要的領域模型
namespace Domain.Entitys { /// <summary> /// 訂單實體類 /// </summary> public class Order : EntityBase { public string OrderNo { get; set; } public decimal OrderAmount { get; set; } public DateTime OrderTime { get; set; } public string ProductNo { get; set; } public string UserIdentifier { get; set; } public bool IsPaid { get; set; } } /// <summary> /// 訂單支付實體類 /// </summary> public partial class PayOrder : EntityBase { public decimal PayAmount { get; set; } public string PayResult { get; set; } public string OrderNo { get; set; } } }此處訂單模型繼承抽象類,如此可以保持模型的純凈,你甚至可以根據業務差異性定義多個基類,通常我們會將通用的一些屬性及方法定義在基類中,如IsDeleted【邏輯刪除】、CreateTime、Timestamp【並發控制】等。
為簡化流程示例中僅包含兩個操作,【生成訂單】和【支付訂單】,我們將其定義在領域服務內。
namespace Domain.DomainServices { public interface IOrderService { Task OrderBuild(Order order); Task Pay(Order order); } } namespace Domain.DomainServices { public class OrderService : IOrderService { public IRepository<Order> OrderRepository { private get; set; } public IRepository<PayOrder> PayOrderRepository { private get; set; } public IRepository<EventStore> EventStoreRepository { private get; set; } public IEventBus EventBus { private get; set; } public async Task OrderBuild(Order order) { //生成訂單 await OrderRepository.AddAsync(order); //toEventStore await EventStoreRepository.AddAsync(order.ToBuildOrderReadyEvent().ToEventStore()); //發布生成訂單事件 EventBus.Publish(order.ToBuildOrderReadyEvent()); } public async Task Pay(Order order) { var payOrder = new PayOrder { OrderNo = order.OrderNo, PayAmount = order.OrderAmount, PayResult = "pay success!" }; //支付成功 await PayOrderRepository.AddAsync(payOrder); //更新訂單 var findOrder = await OrderRepository.GetByKeyAsync(order.Id); findOrder.IsPaid = true; await OrderRepository.UpdateAsync(findOrder); //toEventStore await EventStoreRepository.AddAsync(payOrder.ToPaySuccessReadyEvent().ToEventStore()); //發布支付成功事件 EventBus.Publish(payOrder.ToPaySuccessReadyEvent()); } } }要驅動整個流程的訂單發起,我們需要定義一個Command【OrderBuild】,它通常是根據調用端數據DTO轉化而來。
namespace Domain.Commands { [Route("/BuildOrder", "Post")] public class BuildOrder : Command { public string OrderNo { get; set; } public decimal OrderAmount { get; set; } public string ProductNo { get; set; } public string UserIdentifier { get; set; } public Order ToOrder() { return new Order { OrderNo = OrderNo, OrderAmount = OrderAmount, OrderTime = DateTime.Now, ProductNo = ProductNo, UserIdentifier = UserIdentifier, IsPaid = false }; } } }有了Command,接着定義出該命令的處理程序
namespace Domain.Commands.Handlers { public class OrderCommandHandler :CommandHandlerBase, ICommandHandler<BuildOrder> { public IOrderService OrderService { private get; set; } public async Task Handle(BuildOrder command) { await DoHandle(async c => { await OrderService.OrderBuild(command.ToOrder()); }, command); } } }由上面定義的領域服務中可見,OrderBuild和Pay中都發布有事件,事件及其處理程序如下
namespace Domain.Events { public class BuildOrderReady : Event { public Order Entity { get; set; } public EventStore ToEventStore() { return new EventStore { Timestamp = DateTime.Now, Body = JsonConvert.SerializeObject(Entity) }; } } } namespace Domain.Events { public class PaySuccessReady : Event { public PayOrder Entity { get; set; } public EventStore ToEventStore() { return new EventStore { Timestamp = DateTime.Now, Body = JsonConvert.SerializeObject(Entity) }; } } } namespace Domain.Events.Handlers { public class OrderEventHandler : EventHandlerBase, IEventHandler<BuildOrderReady>, IEventHandler<PaySuccessReady> { public IOrderService OrderService { private get; set; } public async Task Handle(BuildOrderReady @event) { await DoHandle(async c => { await OrderService.Pay(@event.Entity); }, @event); } public async Task Handle(PaySuccessReady @event) { //Send Email.. //Send SMS.. } } }可以看到在兩個Event中都包含有ToEventStore方法,此處僅為模擬出將當前Event序列化保存,以供EventSourcing使用。這里有較成熟的框架可以使用,如NEventStore、http://geteventstore.com/。
六、應用層
開頭有說過應用層采用ServiceStack實現的Web服務,優點有3
1:ServiceStack強調數據交換需定義出RequestDto及ResponseDto,這很符合我們CQRS的一個Command機制
2:示例中Event即消息,Publish的Event將在MQ中,通過訂閱去消費,示例采用的消息隊列是RabbitMq(跨平台),這樣一來可以使用其他平台的語言去訂閱該消息並消費,ServiceStack將Rabbitmq的部分功能集成在內。
3:其實是第二點的衍生,當事件經過MQ,有些消息我們可以消費即ACK掉,有些消息我們可以將其存儲在隊列中,如此一來我們可以基於訂閱MQ來實現系統對業務的一個分析和數據處理,如下圖
ServiceStack中服務的定義只需要繼承ServiceStack.Service或者IService,如下
namespace Application.Services { public partial class CommandService : Service { public async Task<CommandResult> Any(BuildOrder command) { return await Handler(command); } } } namespace Application.Services { public partial class EventService : Service { public async Task Any(BuildOrderReady @event) { await Handler(@event); } public async Task Any(PaySuccessReady @event) { await Handler(@event); } } }關於方法名定義成Any是推薦的做法,若要控制其Post或Get等可以在其RequestDto上以 [Route("/BuildOrder", "Post")]標簽的形式拓展。
因ServiceStack要求RequestDto定義的同時須要指定其ResponseDto,以繼承IReturn<ResponseDto>接口來聲明。
示例中我們的Command都是繼承自IReturn<CommandResult>,Event都是繼承自IReturnVoid,如下
namespace Infrastructure.Commands { public interface ICommand : IReturn<CommandResult> { Guid CommandId { get; } } } namespace Infrastructure.Events { public interface IEvent : IReturnVoid { Guid EventId { get; } } }在ServiceStack中需要定義一個繼承自AppHostBase的服務宿主類(姑且這樣叫吧),通常取名叫AppHost,如下
namespace Application.Services.Config { public class AppHost : AppHostBase { public AppHost() : base("CQRS Demo", typeof(AppHost).Assembly) { } public override void Configure(Funq.Container container) { //SwaggerUI配置用於調試 AddPlugin(new SwaggerFeature()); //IOC配置 ServiceLocatorConfig.Configura(container); //rabbitmq配置 var mq = new RabbitMqServer(ConfigurationManager.AppSettings.Get("EventProcessorAddress")) { AutoReconnect = true, DisablePriorityQueues = true, RetryCount = 0 }; container.Register<IMessageService>(c => mq); var mqServer = container.Resolve<IMessageService>(); //注冊eventHandler mq.RegisterHandler<BuildOrderReady>(ServiceController.ExecuteMessage, 1); mq.RegisterHandler<PaySuccessReady>(ServiceController.ExecuteMessage, 1); mqServer.Start(); } } }構造函數中的兩個參數分別代表,服務顯示名稱和指定當前服務定義所在的程序集。
SwaggerUI用於調試服務接口是非常方便的,內置的依賴注入框架Funq功能也不錯。
另外就是rabbitmq的使用需要在Nuget中另外安裝,全稱是:ServiceStack.RabbitMq。值得一提的是服務啟動時,ServiceStack會在你指定的Rabbitmq服務端創建對應的隊列,通常是根據你定義的Event創建如下。
可以看到每個Event創建了4個隊列,分別代表的意思是:
dlq:沒有對應的處理程序或處理失敗的消息。
inq:還未被消費的消息。
outq:處理完畢的消息。
priorityq:優先隊列。
更詳細的可以到ServiceStack Wiki上查看。
優點:在注冊EventHandler時可以指定處理線程個數,如上面指定的是1,此時若同樣的服務有兩個,分別部署在不同服務器上且都訂閱相同消息時,將根據線程數來消費MQ中的消息來達到負載均衡的目的。
//注冊eventHandler mq.RegisterHandler<BuildOrderReady>(ServiceController.ExecuteMessage, 1); mq.RegisterHandler<PaySuccessReady>(ServiceController.ExecuteMessage, 1);但其實針對Rabbitmq封裝一個Client並不麻煩,我們可以按項目需要去實現其Exchanges和Queues,並且可以很靈活的控制Ack等。
七、調試
在單元測試中我們按如下方式調試。
也可以使用SwaggerUI調試,服務運行之后將打開如下頁面
點擊SwaggerUI打開調試頁面
點擊Try it out!按鈕
此時數據庫中應包含一條Order記錄一條PayOrder記錄和兩條EventStore記錄
RabbitMq中
八、源碼
源碼地址:https://github.com/yanghongjie/DomainDrivenDesignSample