在CQRS架構中,一個比較重要的內容就是當命令處理器從命令隊列中接收到相關的命令數據后,通過調用領域對象邏輯,然后將當前事件的對象數據持久化到事件存儲中。主要的用途是能夠快速持久化對象此次的狀態,另外也可以通過未來最終一致性的需求,通過事件數據將對象還原到一個特定的狀態,這個狀態通常是通過對象事件的版本來進行還原的。
要實現一個事件存儲的框架,我們通常需要實現以下幾個方面:
1.對象事件的存儲表
我們通常將對象某個變化的事件數據存儲到數據庫的表中,通常采用關系型數據庫進行存儲,這里使用SQL Server。
CREATE TABLE [dbo].[DomainCommandAndEventObject]( [Id] [uniqueidentifier] NULL, [AggregationRootId] [uniqueidentifier] NULL, [AssemblyQualifiedAggreateRooType] [nvarchar](500) NULL, [AssemblyQualifiedCommandAndEventType] [nvarchar](500) NULL, [CreateDate] [datetime] NULL, [Version] [int] NULL, [Data] [varbinary](max) NULL )
AggregationRootId是當前聚合根對象的Id;AssemblyQualifiedAggreateRooType是當前聚合根對象的FQDN名,在C#代碼中對應名稱空間+類名(例如:Order.Domain.POCOModels.Orders, Order.Domain, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null);AssemblyQualifiedCommandAndEventType是操作當前聚合根的事件類型的FQDN名字,在C#代碼中對應名稱空間+類名(例如:Events.OrderCommands.CreateOrderCommand, Events, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null),Version對應的是針對某個聚合根當前事件操作的版本,通常對一個聚合根進行操作,版本就加1,Data則包括當前事件操作后,對象的當前狀態數據。
2.重構Event用以支持存儲
前面文章實現的事件只是用於標識消息,在事件需要存儲時,我們需要更多的屬性,包括聚合根ID,聚合根類型,操作聚合根的事件類型,版本號等。
public interface IEvent { Guid Id { get; set; } DateTime CreateDate { get; set; } Guid AggregationRootId { get; set; } string AssemblyQualifiedAggreateRooType { get; set; } string AssemblyQualifiedCommandAndEventType { get; set; } int Version { get; set; } }
public class BaseEvent : IEvent { public Guid Id { get; set; } public DateTime CreateDate { get; set; } public Guid AggregationRootId { get; set; } public string AssemblyQualifiedAggreateRooType { get; set; } public string AssemblyQualifiedCommandAndEventType { get; set; } public int Version { get; set; } public BaseEvent() { this.Id = Guid.NewGuid(); this.CreateDate = DateTime.Now; } }
3.實現存儲的事件對象
其實這里要實現的就是將事件和事件對象之間做相互的轉換,用於未來存儲事件或將事件反序列化成事件對象進行使用。
public class EventObject:BaseEvent { public byte[] Data { get; set; } public static EventObject FromDomainEvent(IEvent idomainevent) { var domaineventobject = new EventObject(); domaineventobject.Id = idomainevent.Id; domaineventobject.CreateDate = idomainevent.CreateDate; domaineventobject.Version = idomainevent.Version; domaineventobject.AggregationRootId = idomainevent.AggregationRootId; domaineventobject.AssemblyQualifiedAggreateRooType = idomainevent.AssemblyQualifiedAggreateRooType; domaineventobject.AssemblyQualifiedCommandAndEventType = idomainevent.AssemblyQualifiedCommandAndEventType; domaineventobject.Data = XmlObjectSerializer.Serialize(idomainevent); return domaineventobject; } public IEvent ToDomainEvent() { Type type = Type.GetType(this.AssemblyQualifiedAggreateRooType); var domainevent = (IEvent)XmlObjectSerializer.Deserialize(type, this.Data); domainevent.Id = this.Id; return domainevent; } }
FromDomainEvent方法就是將事件信息轉換為以后要存儲的事件對象,ToDomainEvent就是將事件對象轉換為事件。
4.實現事件存儲
實現事件存儲就是將領域事件對象存儲到我們前面創建的數據庫表中。為了能夠快速存儲,我們並不采用ORM框架,而是直接使用ADO.NET完成事件對象的存儲。
public void SaveEvent(IEvent idomainevent) { try { var domaineventobject = EventObject.FromDomainEvent(idomainevent); conn.Open(); SqlParameter sqlparm = new SqlParameter("@AggregationRootId", System.Data.SqlDbType.UniqueIdentifier); sqlparm.Value = idomainevent.AggregationRootId; cmd = new SqlCommand("select count(*) from DomainCommandAndEventObject where AggregationRootId=@AggregationRootId", conn); cmd.Parameters.Add(sqlparm); var count = cmd.ExecuteScalar(); if(count!=null) { domaineventobject.Version = int.Parse(count.ToString()); } SqlParameter[] sqlparams = new SqlParameter[7]; sqlparams[0] = new SqlParameter("@Id", System.Data.SqlDbType.UniqueIdentifier); sqlparams[0].Value = domaineventobject.Id; sqlparams[1] = new SqlParameter("@AggregationRootId", System.Data.SqlDbType.UniqueIdentifier); sqlparams[1].Value = domaineventobject.AggregationRootId; sqlparams[2] = new SqlParameter("@AssemblyQualifiedAggreateRooType", System.Data.SqlDbType.NVarChar); sqlparams[2].Value = domaineventobject.AssemblyQualifiedAggreateRooType; sqlparams[3] = new SqlParameter("@AssemblyQualifiedCommandAndEventType", System.Data.SqlDbType.NVarChar); sqlparams[3].Value = domaineventobject.AssemblyQualifiedCommandAndEventType; sqlparams[4] = new SqlParameter("@CreateDate", System.Data.SqlDbType.DateTime); sqlparams[4].Value = domaineventobject.CreateDate; sqlparams[5] = new SqlParameter("@Version", System.Data.SqlDbType.Int); sqlparams[5].Value = domaineventobject.Version; sqlparams[6] = new SqlParameter("@Data", System.Data.SqlDbType.VarBinary); sqlparams[6].Value = domaineventobject.Data; cmd = new SqlCommand("insert DomainCommandAndEventObject values
(@Id,@AggregationRootId,@AssemblyQualifiedAggreateRooType,@AssemblyQualifiedCommandAndEventType,@CreateDate,@Version,@Data)", conn); foreach(var sqlparam in sqlparams) { cmd.Parameters.Add(sqlparam); } cmd.ExecuteNonQuery(); } catch(Exception error) { throw error; } finally { cmd.Dispose(); conn.Close(); }
這樣,我們基本就實現了事件與存儲方面的基礎內容。
QQ討論群:309287205
微服務實戰視頻請關注微信公眾號: