最近在看微軟eShopOnContainers 項目,看到事件總線覺得不錯,和大家分享一下
看完此文你將獲得什么?
- eShop中是如何設計事件總線的
- 實現一個InMemory事件總線eShop中是沒有InMemory實現的,這算是一個小小小的挑戰
發布訂閱模式
發布訂閱模式可以讓應用程序組件之間解耦,這是我們使用這種模式最重要的理由之一,如果你完全不知道這個東西,建議你先通過搜索引擎了解一下這種模式,網上的資料很多這里就不再贅述了。
eShop中的EventBus就是基於這種模式的發布/訂閱。
發布訂閱模式核心概念有三個:發布者、訂閱者、調度中心,這些概念在消息隊列中就是生產者、消費者、MQ實例。
在eShop中有兩個EventBus的實現:
- 基於RabbitMq的
EventBusRabbitMQ
- 基於AzureServiceBus的
EventBusServiceBus
。
從IEventBus
開始
先來看一看,所有EventBus的接口IEventBus
public interface IEventBus
{
void Publish(IntegrationEvent @event);
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
}
嗯,乍一看看是有點眼暈的,仔細看它的核心功能只有三個:
- Publish 發布
- Subscribe 訂閱
- Unsubscribe 取消訂閱
這對應着發布訂閱模式的基本概念,不過對於事件總線的接口添加了許多約束:
- 發布的內容(消息)必須是
IntegrationEvent
及其子類 - 訂閱事件必須指明要訂閱事件的類型,並附帶處理器類型
- 處理器必須是
IIntegrationEventHandler
的實現類
Ok,看到這里先不要管Dynamic
相關的方法,然后記住這個兩個關鍵點:
- 事件必須繼承
IntegrationEvent
- 處理器必須實現
IIntegrationEventHandler<T>
且T
是IntegrationEvent
子類
另外,看下 IntegrationEvent
有什么
public class IntegrationEvent
{
public IntegrationEvent()
{
Id = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
}
public Guid Id { get; }
public DateTime CreationDate { get; }
}
IEventBusSubscriptionsManager是什么
public interface IEventBusSubscriptionsManager
{
bool IsEmpty { get; }
event EventHandler<string> OnEventRemoved;
void AddDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void AddSubscription<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void RemoveSubscription<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
void RemoveDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName);
Type GetEventTypeByName(string eventName);
void Clear();
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
string GetEventKey<T>();
}
這個接口看起來稍顯復雜些,我們來簡化下看看:
public interface IEventBusSubscriptionsManager
{
void AddSubscription<T, TH>()
void RemoveSubscription<T, TH>()
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>()
}
最終,這三個方法就是我們要關注的,添加訂閱、移除訂閱、獲取指定事件的訂閱信息。
SubscriptionInfo
是什么?
public bool IsDynamic { get; }
public Type HandlerType{ get; }
SubscriptionInfo
中只有兩個信息,這是不是一個Dynamic類型的Event以及這個Event所對應的處理器的類型。
這是你可能會有另一個疑問:
這個和IEventBus
有什么關系?
-
IEventBusSubscriptionsManager
含有更多功能:查看是否有訂閱,獲取事件的Type,獲取事件的處理器等等 -
IEventBusSubscriptionsManager
由IEventBus
使用,在RabbitMq和ServiceBus的實現中,都使用Manager去存儲事件的信息,例如下面的代碼:public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { // 查詢事件的全名 var eventName = _subsManager.GetEventKey<T>(); //向mq添加注冊 DoInternalSubscription(eventName); // 向manager添加訂閱 _subsManager.AddSubscription<T, TH>(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
查詢事件的名字是manager做的,訂閱的時候是先向mq添加訂閱,之后又加到manager中,manager管理着訂閱的基本信息。
另外一個重要功能是獲取事件的處理器信息,在rabbit mq的實現中,ProcessEvent方法中用manager獲取了事件的處理器,再用依賴注入獲得處理器的實例,反射調用Handle
方法處理事件信息:
private async Task ProcessEvent(string eventName, string message)
{
// 從manager查詢信息
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
// 從manager獲取處理器
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
// Di + 反射調用,處理事件(兩個都是,只是針對是否是dynamic做了不同的處理)
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
dynamic eventData = JObject.Parse(message);
await handler.Handle(eventData);
}
else
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handler = scope.ResolveOptional(subscription.HandlerType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
}
}
IEventBusSubscriptionsManager的默認實現
在eShop中只有一個實現就是InMemoryEventBusSubscriptionsManager
類
這個類中有兩個重要的字段
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
private readonly List<Type> _eventTypes;
他們分別存儲了事件列表和事件處理器信息詞典
接下來就是實現一個
基於內存的事件總線
了
我們要做什么呢?IEventBusSubscriptionsManager 已經有了InMemory的實現了,我們可以直接拿來用,所以我們只需要自己實現一個EventBus就好了
先貼出最終代碼:
public class InMemoryEventBus : IEventBus
{
private readonly IServiceProvider _provider;
private readonly ILogger<InMemoryEventBus> _logger;
private readonly ISubscriptionsManager _manager;
private readonly IList<IntegrationEvent> _events;
public InMemoryEventBus(
IServiceProvider provider,
ILogger<InMemoryEventBus> logger,
ISubscriptionsManager manager)
{
_provider = provider;
_logger = logger;
_manager = manager;
}
public void Publish(IntegrationEvent e)
{
var eventType = e.GetType();
var handlers = _manager.GetHandlersForEvent(eventType.FullName);
foreach (var handlerInfo in handlers)
{
var handler = _provider.GetService(handlerInfo.HandlerType);
var method = handlerInfo.HandlerType.GetMethod("Handle");
method.Invoke(handler, new object[] { e });
}
}
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.AddSubscription<T, TH>();
}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.RemoveSubscription<T, TH>();
}
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
}
首先構造函數中聲明我們要使用的東西:
public InMemoryEventBus(
IServiceProvider provider,
ILogger<InMemoryEventBus> logger,
ISubscriptionsManager manager)
{
_provider = provider;
_logger = logger;
_manager = manager;
}
這里要注意的就是IServiceProvider provider
這是 DI容器,當我們在切實處理事件的時候我們選擇從DI獲取處理器的實例,而不是反射創建,這要做的好處在於,處理器可以依賴於其它東西,並且可以是單例的
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.AddSubscription<T, TH>();
}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.RemoveSubscription<T, TH>();
}
訂閱和取消訂閱很簡單,因為我們是InMemory的所以只調用了manager的方法。
接下來就是最重要的Publish方法,實現Publish有兩種方式:
-
使用額外的線程和Queue讓發布和處理異步
-
為了簡單起見,我們先寫個簡單易懂的同步的
public void Publish(IntegrationEvent e) { // 首先要拿到集成事件的Type信息 var eventType = e.GetType(); // 獲取屬於這個事件的處理器列表,可能有很多,注意獲得的是SubscriptionInfo var handlers = _manager.GetHandlersForEvent(eventType.FullName); // 不解釋循環 foreach (var handlerInfo in handlers) { // 從DI中獲取類型的實例 var handler = _provider.GetService(handlerInfo.HandlerType); // 拿到Handle方法 var method = handlerInfo.HandlerType.GetMethod("Handle"); // 調用方法 method.Invoke(handler, new object[] { e }); } }
OK,我們的InMemoryEventBus就寫好了!
要實踐這個InMemoryEventBus,那么還需要一個IntegrationEvent
的子類,和一個IIntegrationEventHandler<T>
的實現類,這些都不難,例如我們做一個添加用戶的事件,A在添加用戶后,發起一個事件並將新用戶的名字作為事件數據,B去訂閱事件,並在自己的處理器中處理名字信息。
思路是這樣的:
-
寫一個
AddUserEvent:IntegrationEvent
,里面有一個UserId和一個UserName
。 -
寫一個
AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>
,在Handle
方法中輸出UserId和Name到日志。 -
注冊DI,你要注冊下面這些服務:
IEventBus=>InMemoryEventBus ISubscriptionsManager=>InMemorySubscriptionsManager AddUserEventHandler=>AddUserEventHandler
-
在Startup中為剛剛寫的事件和處理器添加訂閱(在這里已經可以獲取到IEventBus實例了)
-
寫一個Api接口或是什么,調用IEventBus的Publish方法,new 一個新的
AddUserEvent
作為參數傳進去。
OK!到這里一個切實可用的InMemoryEventBus就可以使用了。