事件總線是通過一個中間服務,剝離了常規事件的發布與訂閱(消費)強依賴關系的一種技術實現。事件總線的基礎知識可參考聖傑的博客【事件總線知多少】
本片博客不再詳細概述事件總線基礎知識,核心點放置使用Autofac組件實現事件總線與事件事件處理對象的解耦,並在實際業務場景中使用事件總線解決需求。
案例需求
這里還是先面向業務來針對性的探究下事件總線在實際業務場景里的用處有多大,再來講解后續的Autofac解耦。
在基礎數據管理模塊里,需要對產品類別刪除后也將相應的產品類別標簽、產品類別下的產品進行刪除。
我們過往的做法是在產品類別刪除的業務邏輯后繼續編寫刪除產品類別標簽、產品類別對應的產品,類似於下面的代碼。
private void EventBusTest1(long productCategoryId)
{
_logger.LogDebug($"刪除產品類別{productCategoryId}.");
_logger.LogDebug("刪除產品類別標簽..");
_logger.LogDebug("刪除產品類別與產品關系..");
}
這種做法本身可以實現我們的實際需求,但是試想如果這個時候我需要再加一個功能,針對刪除產品類別后郵件通知管理員。我們發現要實現此功能得繼續在之前的代碼塊中加入通知郵件的相關服務。如下:
private void EventBusTest1(long productCategoryId) { _logger.LogDebug($"刪除產品類別{productCategoryId}."); _logger.LogDebug("刪除產品類別標簽.."); _logger.LogDebug("刪除產品類別與產品關系.."); _logger.LogDebug("發送郵件通知管理員.."); }
上面的范例代碼是將業務代碼采用Logger打印日志模擬出來。如上代碼,實際工作中也因為經常性對某一事件進行業務邏輯補充而無限擴張部分代碼塊,到后來還要進行大面積重構。
ok,如何讓我們產品因后續某項事件處理補充而不影響原有的代碼塊呢?不着急,捋一捋邏輯。
我們先思考下,刪除產品類別是不是一個事件,那么它是一定有事件源對象,事件源對象即是我們針對刪除產品類別所需業務處理的參數。無它我們什么也做不了,事件源EventData就是這么重要,腦海里想象下某個微信公眾號,事件源就是這個公眾號發布的文章。那么上面的代碼中,入參productCategoryId就是一個事件源(當然我們可以包裝成一個對象,補充多一點信息)
那么有了EventData后還是要把業務邏輯從聚合狀態分解,不然還是聚合在一起處理和原來的處理方式一樣。理想情況下是刪除產品類別本身是一個Handler,刪除產品類別也是一個Handler,產出產品類別與產品關系及發送郵件通知也是如此。只要觸發刪除產品類別,則這些Handler也一並會執行。
同一個事件源但凡綁定了多個Handler,則這些Handler會根據事件源進行各自的邏輯處理。

上面對EventData和EventHandler進行了簡單介紹,但是我們要怎么觸發呢?事件源與處理程序對象如何映射呢?
答案是采取EventBus統一對事件源進行事件觸發,事件源於處理程序對象映射在一個字典里。觸發事件時EventBus從字典中獲取事件源對應的處理對象來分發處理。
哦豁,概念終於嘮叨完了,看看使用了事件總線后是如何處理的吧,上碼說話。。
先定義事件源接口
/// <summary>
/// 定義事件源接口,所有的事件源都要實現該接口
/// </summary>
public interface IEventData
{
/// <summary>
/// 事件發生的時間
/// </summary>
DateTime EventTime { get; set; }
/// <summary>
/// 觸發事件的對象
/// </summary>
object EventSource { get; set; }
}
再定義事件源基類
/// <summary>
/// 事件源基類:描述事件信息,用於參數傳遞
/// </summary>
public class EventData : IEventData
{
/// <summary>
/// 事件發生的時間
/// </summary>
public DateTime EventTime { get; set; }
/// <summary>
/// 觸發事件的對象
/// </summary>
public Object EventSource { get; set; }
public EventData()
{
EventTime = DateTime.Now;
}
}
定義事件處理接口
/// <summary>
/// 定義事件處理器公共接口,所有的事件處理都要實現該接口
/// </summary>
public interface IEventHandler: IDependency
{
}
/// <summary>
/// 泛型事件處理器接口
/// </summary>
/// <typeparam name="TEventData"></typeparam>
public interface IEventHandler<TEventData> : IEventHandler where TEventData : IEventData
{
/// <summary>
/// 事件處理器實現該方法來處理事件
/// </summary>
/// <param name="eventData"></param>
void HandleEvent(TEventData eventData);
}
定義事件源與事件處理對象存儲容器接口
/// <summary>
/// 定義事件源與事件處理對象存儲容器接口
/// </summary>
public interface IEventStore
{
void AddRegister<T, TH>(string keyName) where T : IEventData where TH : IEventHandler;
void AddRegister(Type eventData, string handlerName, Type eventHandler);
void RemoveRegister<T, TH>() where T : IEventData where TH : IEventHandler;
void RemoveRegister(Type eventData, Type eventHandler);
bool HasRegisterForEvent<T>() where T : IEventData;
bool HasRegisterForEvent(Type eventData);
IEnumerable<Type> GetHandlersForEvent<T>() where T : IEventData;
IEnumerable<Type> GetHandlersForEvent(Type eventData);
Type GetEventTypeByName(string eventName);
bool IsEmpty { get; }
void Clear();
}
實現IEventStore,這里將事件處理對象與事件源映射信息存儲在內存中(無需持久化)
public class InMemoryEventStore : IEventStore
{
/// <summary>
/// 定義鎖對象
/// </summary>
private static readonly object LockObj = new object();
private readonly ConcurrentDictionary<ValueTuple<Type, string>, Type> _eventAndHandlerMapping;
public InMemoryEventStore()
{
_eventAndHandlerMapping = new ConcurrentDictionary<ValueTuple<Type, string>, Type>();
}
public void AddRegister<T, TH>(string keyName) where T : IEventData where TH : IEventHandler
{
AddRegister(typeof(T), keyName, typeof(TH));
}
public void AddRegister(Type eventData, string handlerName, Type eventHandler)
{
lock (LockObj)
{
var mapperKey = new ValueTuple<Type, string>(eventData, handlerName);
//是否存在事件參數對應的事件處理對象
if (!HasRegisterForEvent(eventData))
{
_eventAndHandlerMapping.TryAdd(mapperKey, eventHandler);
}
else
{
_eventAndHandlerMapping[mapperKey] = eventHandler;
}
}
}
public void RemoveRegister<T, TH>() where T : IEventData where TH : IEventHandler
{
var handlerToRemove = FindRegisterToRemove(typeof(T), typeof(TH));
RemoveRegister(typeof(T), handlerToRemove);
}
public void RemoveRegister(Type eventData, Type eventHandler)
{
if (eventHandler != null)
{
lock (LockObj)
{
//移除eventHandler
var eventHandelerBind = _eventAndHandlerMapping.FirstOrDefault(p => p.Value == eventHandler);
if (eventHandelerBind.Value != null)
{
Type removedHandlers;
_eventAndHandlerMapping.TryRemove(eventHandelerBind.Key, out removedHandlers);
}
}
}
}
private Type FindRegisterToRemove(Type eventData, Type eventHandler)
{
if (!HasRegisterForEvent(eventData))
{
return null;
}
return _eventAndHandlerMapping.FirstOrDefault(p => p.Value == eventHandler).Value;
}
public bool HasRegisterForEvent<T>() where T : IEventData
{
var mapperDto = _eventAndHandlerMapping.FirstOrDefault(p => p.Key.Item1 == typeof(T));
return mapperDto.Value != null ? true : false;
}
public bool HasRegisterForEvent(ValueTuple<Type, string> mapperKey)
{
return _eventAndHandlerMapping.ContainsKey(mapperKey);
}
public bool HasRegisterForEvent(Type eventData)
{
return _eventAndHandlerMapping.Count(p => p.Key.Item1 == eventData) > 0 ? true : false;
}
public IEnumerable<Type> GetHandlersForEvent<T>() where T : IEventData
{
return GetHandlersForEvent(typeof(T));
}
public IEnumerable<Type> GetHandlersForEvent(Type eventData)
{
if (HasRegisterForEvent(eventData))
{
var items = _eventAndHandlerMapping
.Where(p => p.Key.Item1 == eventData)
.Select(p => p.Value).ToList();
return items;
}
return new List<Type>();
}
public Type GetEventTypeByName(string eventName)
{
return _eventAndHandlerMapping.Keys.FirstOrDefault(eh => eh.Item2 == eventName).Item1;
}
public bool IsEmpty => !_eventAndHandlerMapping.Keys.Any();
public void Clear() => _eventAndHandlerMapping.Clear();
}
在定義EventHandler,我們這里定義3個EventHandler,分別對應產品類別刪除、產品類別標簽刪除、產品類別與產品關系信息刪除
public class DeleteProductCategoryEventHandler : IEventHandler<DeleteProductCategoryEventData>
{
private readonly ILogger _logger;
public DeleteProductCategoryEventHandler(ILogger<DeleteProductCategoryEventHandler> logger)
{
_logger = logger;
}
public void HandleEvent(DeleteProductCategoryEventData eventData)
{
_logger.LogDebug($"刪除產品類別{eventData.ProductCategoryId}..");
}
}
public class DeleteProductCategoryTagEventHandler : IEventHandler<DeleteProductCategoryEventData>
{
private readonly ILogger _logger;
public DeleteProductCategoryTagEventHandler(ILogger<DeleteProductCategoryTagEventHandler> logger)
{
_logger = logger;
}
public void HandleEvent(DeleteProductCategoryEventData eventData)
{
_logger.LogDebug($"刪除產品類別{eventData.ProductCategoryId}標簽..");
}
}
public class DeleteProductCategoryRelEventHandler : IEventHandler<DeleteProductCategoryEventData>
{
private readonly ILogger _logger;
public DeleteProductCategoryRelEventHandler(ILogger<DeleteProductCategoryRelEventHandler> logger)
{
_logger = logger;
}
public void HandleEvent(DeleteProductCategoryEventData eventData)
{
_logger.LogDebug($"刪除產品類別{eventData.ProductCategoryId}與產品關系..");
}
}
還有最重要的EventBus,我們使用它來同一觸發Handler,EventBus會從EventStore獲取到事件源映射的Handler集合,並通過DI容器實例化對象后執行事件。
EventBus
/// <summary>
/// 事件總線
/// </summary>
public class EventBus : IEventBus
{
private readonly IEventStore _eventStore;
public static EventBus Default { get; } = new EventBus();
public EventBus()
{
_eventStore = ContainerManager.Current.ResolveNamed<IEventStore>(AppConst.IN_MEMORY_EVENT_STORE);
}
public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData
{
//從事件映射集合里獲取匹配當前EventData(事件源數據)的Handler
List<Type> handlerTypes = _eventStore.GetHandlersForEvent(eventData.GetType()).ToList();
if (handlerTypes.Count <= 0) return;
//循環執行事件處理函數
foreach (var handlerType in handlerTypes)
{
var handlerInterface = handlerType.GetInterface("IEventHandler`1");
//這里需要根據Name才能Resolve,因為注入服務時候使用了命名方式(解決同一事件參數多個事件處理類綁定問題)+
var eventHandler = ContainerManager.Current.Container.ResolveNamed(handlerType.Name, handlerInterface);
if (eventHandler.GetType().FullName == handlerType.FullName)
{
var handler = eventHandler as IEventHandler<TEventData>;
handler?.HandleEvent(eventData);
}
}
}
public void Trigger<TEventData>(Type eventHandlerType, TEventData eventData) where TEventData : IEventData
{
if (_eventStore.HasRegisterForEvent<TEventData>())
{
var handlers = _eventStore.GetHandlersForEvent<TEventData>();
if (handlers.Any(th => th == eventHandlerType))
{
//獲取類型實現的泛型接口
var handlerInterface = eventHandlerType.GetInterface("IEventHandler`1");
var eventHandlers = ContainerManager.Current.Container.Resolve(handlerInterface);
//循環遍歷,僅當解析的實例類型與映射字典中事件處理類型一致時,才觸發事件
if (eventHandlers.GetType() == eventHandlerType)
{
var handler = eventHandlers as IEventHandler<TEventData>;
handler?.HandleEvent(eventData);
}
}
}
}
public Task TriggerAsycn<TEventData>(Type eventHandlerType, TEventData eventData) where TEventData : IEventData
{
return Task.Run(() => Trigger(eventHandlerType, eventData));
}
public Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData
{
return Task.Run(() => Trigger(eventData));
}
public void UnRegister<TEventData>(Type handlerType) where TEventData : IEventData
{
_eventStore.RemoveRegister(typeof(TEventData), handlerType);
}
public void UnRegisterAll<TEventData>() where TEventData : IEventData
{
//獲取所有映射的EventHandler
List<Type> handlerTypes = _eventStore.GetHandlersForEvent(typeof(TEventData)).ToList();
foreach (var handlerType in handlerTypes)
{
_eventStore.RemoveRegister(typeof(TEventData), handlerType);
}
}
}
當然最重要的是如何使用DI容器注入事件處理程序以及相關依賴服務,這里要注意Autofac對同一個容器只允許Build一次或Update一次。即不允許在程序運行時動態注入服務。且針對同一接口除非使用命名方式注入服務,否則默認一個接口映射一個服務。Autofac這個特性極大程度限制了同一事件源多個Handler的情況,沒辦法我還是想了個辦法,參考AutoMapper的Profile映射方式,定義一個用於映射的類,如下:
public class EventRegisterService : IEventRegisterService
{
public void RegisterEventHandler(IEventStore _eventStore, ContainerBuilder builder)
{
builder.RegisterType<DeleteProductCategoryEventHandler>().Named<IEventHandler<DeleteProductCategoryEventData>>("DeleteProductCategoryEventHandler");
_eventStore.AddRegister(typeof(DeleteProductCategoryEventData), "DeleteProductCategoryEventHandler", typeof(DeleteProductCategoryEventHandler));
builder.RegisterType<DeleteProductCategoryTagEventHandler>().Named<IEventHandler<DeleteProductCategoryEventData>>("DeleteProductCategoryTagEventHandler");
_eventStore.AddRegister(typeof(DeleteProductCategoryEventData), "DeleteProductCategoryTagEventHandler", typeof(DeleteProductCategoryTagEventHandler));
builder.RegisterType<DeleteProductCategoryRelEventHandler>().Named<IEventHandler<DeleteProductCategoryEventData>>("DeleteProductCategoryRelEventHandler");
_eventStore.AddRegister(typeof(DeleteProductCategoryEventData), "DeleteProductCategoryRelEventHandler", typeof(DeleteProductCategoryRelEventHandler));
}
}
通過上面的映射后,在程序初始化時可使用C#反射的特性執行 RegisterEventHandler實現注入同一事件源多個事件處理服務,簡直完美~~
下面是Autofac注入的代碼:
/// <summary>
/// 通過反射注入事件處理對象
/// </summary>
/// <param name="assemblieLst"></param>
/// <param name="_eventStore"></param>
private static void RegisterAllEventHandlerFromAssembly(List<Assembly> assemblieLst, IEventStore _eventStore)
{
var baseType = typeof(IEventRegisterService);
foreach (var assem in assemblieLst)
{
assem.GetTypes().Each(p =>
{
if (baseType.IsAssignableFrom(p) && p.IsClass)
{
//反射執行注冊方法
object obj = Activator.CreateInstance(p);
MethodInfo method = obj.GetType().GetMethod(_eventBusBindMethod);
method.Invoke(obj, new object[] { _eventStore, Builder });
}
});
}
}
/// <summary>
/// 注冊服務
/// </summary>
/// <param name="builder"></param>
/// <param name="assemblieLst"></param>
private static IContainer RegisterDependencyService(List<Assembly> assemblieLst, IServiceCollection services)
{
//1、注入依賴服務
Builder = new ContainerBuilder();
Builder.Populate(services);
var baseType = typeof(IDependency);
foreach (var assem in assemblieLst)
{
Builder.RegisterAssemblyTypes(assem)
.Where(type => baseType.IsAssignableFrom(type) && !type.IsAbstract)
.AsImplementedInterfaces()
.SingleInstance();
}
//2、注入AutoMaper配置
List<Type> loadedProfiles = RetrieveProfiles(assemblieLst);
Builder.RegisterTypes(loadedProfiles.ToArray());
//3、注入事件總線
var eventStore = new InMemoryEventStore();
Builder.Register(c => eventStore).Named<IEventStore>(AppConst.IN_MEMORY_EVENT_STORE).SingleInstance();
RegisterAllEventHandlerFromAssembly(assemblieLst, eventStore);
//4、生成容器
IContainer container = Builder.Build();
RegisterAutoMapper(container, loadedProfiles);
return container;
}
OK,看下最終效果,將觸發事件寫在Action中

F5跑起來,Debug目錄下出來了相應的日志:

內容如下:

總結
事件總線是個好東西,使用它可以將業務分散處理,唯一麻煩點在於需要保證業務處理的原子性問題。以前我們直接一個事務套起來就完事,現在可能要采取類似微服務中的“最終一致性”方案,所以合適的業務采用合適的技術。
時間總線也是領域驅動設計里重要的知識點,掌握它很有必要,微服務中也是采取它配合消息隊列實現跨服務處理領域事件。這里使用Autofac也是因為自己的架構本身就是使用它做默認的DI容器,在與事件總線融合中也發現了Autofac的局限性,那就是運行時注冊,如果能運行時注冊服務的話真的是不要太爽了。
最后如果有長沙的小伙伴可以加入.NET長沙社區,大家共同探討.NET Core、Redis、Docker等流行技術,共同進步!
微信一群已加滿,大家可加入微信二群,也可以掃下面支付碼請我喝杯奶茶 o(∩_∩)o 哈哈~~


