從上一篇文章大家可以看出,實現一個自己的消息總線框架是非常重要的內容,消息總線可以將界限上下文之間進行解耦,也可以為大並發訪問提供必要的支持。
消息總線的作用:
1.界限上下文解耦:在DDD第一波文章中,當更新了訂單信息后,我們通過調用經銷商界限上下文的領域模型和倉儲,進行了經銷商信息的更新,這造成了耦合。通過一個消息總線,可以在訂單界限上下文的WebApi服務(來源微服務-生產者)更新了訂單信息后,發布一個事件消息到消息總線的某個隊列中,經銷商界限上下文的WebApi服務(消費者)訂閱這個事件消息,然后交給自己的Handler進行消息處理,更新自己的經銷商信息。這樣就實現了訂單界限上下文與經銷商界限上下文解耦。
2.大並發支持:可以通過消息總線進一步提升下單的性能。我們可以將用戶下單的操作直接交給一個下單命令WebApi接收,下單命令WebApi接收到命令后,直接丟給一個消息總線的隊列,然后立即給前端返回下單結果。這樣用戶就不用等待后續的復雜訂單業務邏輯,加快速度。后續訂單的一系列處理交給消息的Handler進行后續的處理與消息的進一步投遞。
消息總線設計重點:
1.定義消息(事件)的接口:所有需要投遞與處理的消息,都從這個消息接口繼承,因為需要約束消息中必須包含的內容,比如消息的ID、消息產生的時間等。
public interface IEvent { Guid Id { get; set; } DateTime CreateDate { get; set; } }
2.定義消息(事件)處理器接口:當消息投遞到消息總線隊列中后,一定有消費者WebApi接收並處理這個消息,具體的處理方法邏輯在訂閱方處理器中實現,這里先需要定義處理器的接口,便於在消息總線框架中使用。
public interface IEventHandler { Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
從上面代碼可以看出,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類。
3.定義消息(事件)與消息(事件)處理器關聯接口:一種類型的消息被投遞后,一定要在訂閱方找到這種消息的處理器進行處理,所以一定要定義二者的關聯接口,這樣才能將消息與消息處理器對應起來,才能實現消息被訂閱后的處理。
public interface IEventHandlerExecutionContext { void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
RegisterEventHandler方法就是建立消息與消息處理器的關聯,這個方法其實是在訂閱方使用,訂閱方告訴消息總線,什么樣的消息應該交給我的哪個處理器進行處理。
IsRegisterEventHandler方法是判斷消息與處理器之間是否已經存在關聯。
HandleAsync方法是通過查找到消息對應的處理器后,然后調用處理器自己的Handle方法進行消息的處理.
4.定義消息發布、訂閱與消息總線接口:消息總線至少要支持兩個功能,一個是生產者能夠發布消息到我的消息總線,另一個是訂閱方需要能夠從我這個消息總線訂閱消息。
public interface IEventPublisher { void Publish<TEvent>(TEvent @event) where TEvent : IEvent; }
從上面代碼可以看出,生產者發布的消息仍然要從IEvent繼承的類型。
public interface IEventSubscriber { void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }
上面代碼是訂閱方用於從消息總線訂閱消息,從代碼中可以看出,它的最終的實現其實就是建立消息與處理器之間的關聯。
public interface IEventBus:IEventPublisher,IEventSubscriber { }
消息(事件)總線從兩個接口繼承下來,同時支持消息的發布與消息的訂閱。
5.實現事件基類:上面已經訂閱了消息(事件)的接口,這里來實現事件的基類,其實就是實現消息ID與產生的時間:
public class BaseEvent : IEvent { public Guid Id { get; set; } public DateTime CreateDate { get; set; } public BaseEvent() { this.Id = Guid.NewGuid(); this.CreateDate = DateTime.Now; } }
6.實現消息總線基類:消息總線底層的依賴可以是各種消息代理產品,比如RabbitMq、Kafaka或第三方雲平台提供的消息代理產品,通常我們要封裝這些消息代理產品。在封裝之前,我們需要定義頂層的消息總線基類實現,主要的目的是未來依賴於它的具體實現可替換,另外也將消息與消息處理器的關聯接口傳遞進來,便於訂閱方使用。
public abstract class BaseEventBus : IEventBus { protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext = eventHandlerExecutionContext; } public abstract void Publish<TEvent>(TEvent @event) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }
7.實現消息與處理器關聯:消息必須與處理器關聯,訂閱方收到特定類型的消息后,才知道交給哪個處理器處理。
public class EventHandlerExecutionContext : IEventHandlerExecutionContext { private readonly IServiceCollection registry; private readonly IServiceProvider serviceprovider; private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>(); public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null) { this.registry = registry; this.serviceprovider = this.registry.BuildServiceProvider(); } //查找消息關聯的處理器,然后調用處理器的處理方法 public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent { var eventtype = @event.GetType(); if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0) { using(var childscope = this.serviceprovider.CreateScope()) { foreach(var handlertype in handlertypes) { var handler = Activator.CreateInstance(handlertype) as IEventHandler; await handler.HandleAsync(@event); } } } } //判斷消息與處理器之間是否有關聯 public bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler { if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist)) { return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler)); } return false; } //將消息與處理器關聯起來,可以在內存中建立關聯,也可以建立在數據庫單獨表中 public void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler { Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations); } }
上面我們基本上就將消息總線的架子搭建起來了,也實現了基本的功能,下一章我們基於它來實現RabbitMq的消息總線。
QQ討論群:309287205
微服務實戰視頻請關注微信公眾號: