前言
基礎篇已經更新完了,從本篇開始我們進入,中級篇(學習部分源代碼)我會挑一些我個人認為比較重要的知識點結合部分開源項目進行源碼講解,咱們廢話不說直接上車。
Abp vNext的事件總線分2種,一種是本地事件總線,一種是分布式事件總線,本節主要講解本地事件總線,下一節講分布式事件總線。
事件總線所處模塊在 Volo.Abp.EventBus 中。
正文
看Abp源代碼我們先從Module
下手,這塊代碼實現的比較簡單,就是在類注冊的時候判斷是否繼承ILocalEventHandler/IDistributedEventHandler
,將其記錄下來並賦值給對應的Options
配置類當中。
也就是說當我們定義如下Handler
的時候,會在依賴注入的時候,都會將其類型 (Type) 添加到事件總線的配置類當中。
public class ProjectDeleteHandler : ILocalEventHandler<ProjectDeleteEvent>,
ITransientDependency
{
// ...
}
在單元測試EventBusTestBase
類我們找一個LocalEventBus的調用類,直接看PublishAsync
的調用來自於IEventBus
,根據繼承我們先找到ILocalEventBus
,也就是下面這些,這里注意EventBusBase
類型,Abp有分布式事件總線和本地事件總線,這里抽離了一個事件基類提供使用。
public interface IEventBus
{
/// <summary>
/// Triggers an event.
/// </summary>
/// <typeparam name="TEvent">Event type</typeparam>
/// <param name="eventData">Related data for the event</param>
/// <param name="onUnitOfWorkComplete">True, to publish the event at the end of the current unit of work, if available</param>
/// <returns>The task to handle async operation</returns>
Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class;
/// <summary>
/// Triggers an event.
/// </summary>
/// <param name="eventType">Event type</param>
/// <param name="eventData">Related data for the event</param>
/// <param name="onUnitOfWorkComplete">True, to publish the event at the end of the current unit of work, if available</param>
/// <returns>The task to handle async operation</returns>
Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true);
/// ......
}
還是順着我們的PublishAsync
向下看,我們可以看到調用了TriggerHandlersAsync
函數名字就是觸發處理器
這個Trigger感覺有點js的味道.
public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
{
await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData);
}
TriggerHandlersAsync
內部主要就是調用GetHandlerFactories
獲取事件工廠,而GetHandlerFactories
內部的數據來自於HandlerFactories
HandlerFactories
的數據內容在LocalEventBus
構造函數中通過調用SubscribeHandlers
函數進行填充,最后在TriggerHandlerAsync
中完成調用。
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
}
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }));
在整套的運行中還牽扯到異步等待,異常處理,工作單元,構造事件處理器工廠,事件銷毀等。不過核心的處理順序就這樣。
解析
這里也想大家推薦本系列的講解倉庫Dppt
(https://github.com/MrChuJiu/Dppt/tree/master/src)該倉庫源於Easy.Core.Flow
是我20年創建的用於對知識點的解析,后面代碼都會上傳到這個倉庫.
接下來對上面的Abp代碼進行一個簡單的分析,簡單來看本地EventBus就是通過某個key來反射調用某個函數.簡單點直接開整.
我們也寫一個IEventBus
然后讓LocalEventBus
繼承IEventBus
,這里我不需要EventBusBase和ILocalEventBus
因為我就是一個本地evetbus的包,總代碼300行左右我就不全部拿上來了,大家可以去倉庫自己下載。
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent eventData);
void Unsubscribe(Type eventType, IEventHandlerFactory factory);
}
public class LocalEventBus : IEventBus
{
public async Task PublishAsync<TEvent>(TEvent eventData)
{
await TriggerHandlersAsync(typeof(TEvent), eventData);
}
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData)
{
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData);
}
}
}
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }));
}
}
}
因為我們我們既沒有模塊化也沒Autofac我們就簡單點來注冊Handler,讓外部把Handler告訴我們,當然你掃描程序集也是可以的,但是沒必要,這里我們的步驟等於Abp在模塊里面的配置Options。
public static class DpptEventBusRegistrar
{
public static void AddDpptEventBus(this IServiceCollection services, List<Type> types) {
services.AddSingleton<IEventBus, LocalEventBus>();
var localHandlers = types;/*.Where(s => typeof(ILocalEventHandler<>).IsAssignableFrom(s)).ToList();*/
foreach (var item in localHandlers)
{
services.AddTransient(item);
}
services.Configure<LocalEventBusOptions>(options =>
{
options.Handlers.AddIfNotContains(localHandlers);
});
}
}
源碼里面要注意好以下幾點,EventBus是單例的因為全局引用,但是Handler是瞬時,因為每次執行完就可以了。
測試
新建一個空項目注冊一個Handler.測試結果放在下面了
public class WeatherQueryHandler : ILocalEventHandler<WeatherQueryEto>
{
public Task HandleEventAsync(WeatherQueryEto eventData)
{
Console.WriteLine(eventData.Name);
return Task.CompletedTask;
}
}
public class WeatherQueryEto
{
public string Name { get; set; }
}
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
var rng = new Random();
// 測試發送
_eventBus.PublishAsync(new WeatherQueryEto() { Name = "測試"+ rng.Next(-20, 55) });
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateTime.Now.AddDays(index),
TemperatureC = rng.Next(-20, 55),
Summary = Summaries[rng.Next(Summaries.Length)]
})
.ToArray();
}
結語
本次挑選了一個比較簡單的示例來講,整個EventBus我應該分成3篇 下一篇我來講分布式EventBus。
最后歡迎各位讀者關注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 歡迎大家Star
另外這里有個社區地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技術點希望我提前檔期可以寫在這里,希望本項目助力我們一起成長