一、問題背景
最近離職來到了一家新的公司,原先是在乙方工作,這回到了甲方,在這一個月中,發現目前的業務很大一部分是靠輪詢實現的,例如:通過輪詢判斷數據處於B狀態了,則輪詢到數據后執行某種動作,這個其實是非常浪費的,並且對於數據的實時性也會不怎么友好,基於以上的情況,在某天開車堵車時候,想到了之前偶然了解過的事件總線(EventBus),對比了公司當前的場景后,覺得事件總線應該是可以滿足需求的(PS:只是我覺得這個有問題,很多人不覺得有問題),那既然想到了,那就想自己是否可以做個事件總線的輪子
二、什么是事件總線
我們知道事件是由一個Publisher跟一個或多個的Subsriber組成,但是在實際的使用過程中,我們會發現,Subsriber必須知道Publisher是誰才可以注冊事件,進而達到目的,那這其實就是一種耦合,為了解決這個問題,就出現了事件總線的模式,事件總線允許不同的模塊之間進行彼此通信而又不需要相互依賴,如下圖所示,通過EventBus,讓Publisher以及Subsriber都只需要對事件源(EventData)進行關注,不用管Publisher是誰,那么EventBus主要是做了一些什么事呢?
三、EventBus做了什么事?
1、EventBus實現了對於事件的注冊以及取消注冊的管理
2、EventBus內部維護了一份事件源與事件處理程序的對應關系,並且通過這個對應關系在事件發布的時候可以找到對應的處理程序去執行
3、EventBus應該要支持默認就注冊事件源與處理程序的關系,而不需要開發人員手動去注冊(這里可以讓開發人員去控制自動還是手動)
四、具體實現思路
首先在事件總線中,存在注冊、取消注冊以及觸發事件這三種行為,所以我們可以將這三種行為抽象一個接口出來,最終的接口代碼如下:
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public interface IEventBus { #region 接口注冊 void Register<TEventData>(Type handlerType) where TEventData : IEventData; void Register(Type eventType, Type handlerType); void Register(string eventType, Type handlerType); #endregion #region 接口取消注冊 void Unregister<TEventData>(Type handler) where TEventData : IEventData; void Unregister(Type eventType, Type handlerType); void Unregister(string eventType, Type handlerType); #endregion void Trigger(string pubKey, IEventData eventData); Task TriggerAsync(string pubKey, IEventData eventData); Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData; void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData; } }
在以上代碼中發現有些方法是有IEventData約束的,這邊IEventData就是約束入參行為,原則上規定,每次觸發的EventData都需要繼承IEventData,而注冊的行為也是直接跟入參類型相關,具體代碼如下:
using System; using System.Collections.Generic; using System.Text; namespace MEventBus.Core { public interface IEventData { string Id { get; set; } DateTime EventTime { get; set; } object EventSource { get; set; } } }
接下來我們看下具體的實現代碼
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public class EventBus : IEventBus { private static ConcurrentDictionary<string, List<Type>> dicEvent = new ConcurrentDictionary<string, List<Type>>(); private IResolve _iresolve { get; set; } public EventBus(IResolve resolve) { _iresolve = resolve; InitRegister(); } public void InitRegister() { if (dicEvent.Count > 0) { return; } //_iresolve = ioc_container; dicEvent = new ConcurrentDictionary<string, List<Type>>(); //自動掃描類型並且注冊 foreach (var file in Directory.GetFiles(AppDomain.CurrentDomain.BaseDirectory, "*.dll")) { var ass = Assembly.LoadFrom(file); foreach (var item in ass.GetTypes().Where(p => p.GetInterfaces().Contains(typeof(IEventHandler)))) { if (item.IsClass) { foreach (var item1 in item.GetInterfaces()) { foreach (var item2 in item1.GetGenericArguments()) { if (item2.GetInterfaces().Contains(typeof(IEventData))) { Register(item2, item); } } } } } } } //注冊以及取消注冊的時候需要加鎖處理 private static readonly object obj = new object(); #region 注冊事件 public void Register<TEventData>(Type handlerType) where TEventData : IEventData { //將數據存儲到mapDic var dataType = typeof(TEventData).FullName; Register(dataType, handlerType); } public void Register(Type eventType, Type handlerType) { var dataType = eventType.FullName; Register(dataType, handlerType); } public void Register(string pubKey, Type handlerType) { lock (obj) { //將數據存儲到dicEvent if (dicEvent.Keys.Contains(pubKey) == false) { dicEvent[pubKey] = new List<Type>(); } if (dicEvent[pubKey].Exists(p => p.GetType() == handlerType) == false) { //IEventHandler obj = Activator.CreateInstance(handlerType) as IEventHandler; dicEvent[pubKey].Add(handlerType); } } } #endregion #region 取消事件注冊 public void Unregister<TEventData>(Type handler) where TEventData : IEventData { var dataType = typeof(TEventData); Unregister(dataType, handler); } public void Unregister(Type eventType, Type handlerType) { string _key = eventType.FullName; Unregister(_key, handlerType); } public void Unregister(string eventType, Type handlerType) { lock (obj) { if (dicEvent.Keys.Contains(eventType)) { if (dicEvent[eventType].Exists(p => p.GetType() == handlerType)) { dicEvent[eventType].Remove(dicEvent[eventType].Find(p => p.GetType() == handlerType)); } } } } #endregion #region Trigger觸發 //trigger時候需要記錄到數據庫 public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData { var dataType = eventData.GetType().FullName; //獲取當前的EventData綁定的所有Handler Notify(dataType, eventData); } public void Trigger(string pubKey, IEventData eventData) { //獲取當前的EventData綁定的所有Handler Notify(pubKey, eventData); } public async Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData { await Task.Factory.StartNew(new Action(()=> { var dataType = eventData.GetType().FullName; Notify(dataType, eventData); })); } public async Task TriggerAsync(string pubKey, IEventData eventData) { await Task.Factory.StartNew(new Action(() => { var dataType = eventData.GetType().FullName; Notify(pubKey, eventData); })); } //通知每成功執行一個就需要記錄到數據庫 private void Notify<TEventData>(string eventType, TEventData eventData) where TEventData : IEventData { //獲取當前的EventData綁定的所有Handler var handlerTypes = dicEvent[eventType]; foreach (var handlerType in handlerTypes) { var resolveObj = _iresolve.Resolve(handlerType); IEventHandler<TEventData> handler = resolveObj as IEventHandler<TEventData>; handler.Handle(eventData); } } #endregion } }
代碼說明:
1、如上的EventBus是繼承了IEventBus后的具體實現,小伙伴可能看到在構造函數里,有一個接口參數IResolve,這個主要是為了將解析的過程進行解耦,由於在一些WebApi的項目中,更加多的是使用IOC的機制進行對象的創建,那基於IResolve就可以實現不同的對象創建方式(內置的是通過反射實現)
2、InitRegister方法通過遍歷當前目錄下的dll文件,去尋找所有實現了IEventHandler<IEventData>接口的信息,並且自動注冊到EventBus中,所以在實際使用過程中,應該是沒有機會去適用register注冊的
3、觸發機制實現了同步以及異步的調用,這個從方法命名中就可以看出來
五、程序Demo
TestHandler2(繼承IEventHandler)
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Windows.Forms; using MEventBus.Core; namespace MEventBusHandler.Test { public class TestHandler2 : IEventHandler<TestEventData> { public void Handle(TestEventData eventData) { Thread.Sleep(2000); MessageBox.Show(eventData.EventTime.ToString()); } } }
TestEventData(繼承EventData,EventData是繼承了IEventData的代碼)
using MEventBus.Core; using System; using System.Collections.Generic; using System.Text; namespace MEventBusHandler.Test { public class TestEventData : EventData { } }
調用代碼
using MEventBus.Core; using MEventBusHandler.Test; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace MEventBus.Test { public partial class Form1 : Form { public Form1() { InitializeComponent(); TestHandler.OnOut += TestHandler_OnOut; } private void TestHandler_OnOut(object sender, EventArgs e) { MessageBox.Show("Hello World"); } private void button1_Click(object sender, EventArgs e) { var task = new MEventBus.Core.EventBus(new ReflectResolve()).TriggerAsync(new TestEventData()); task.ContinueWith((obj) => { MessageBox.Show("事情全部做完"); }); } private void button2_Click(object sender, EventArgs e) { new EventBus(new ReflectResolve()).Trigger(new TestEventData()); } } }
執行結果
我在真正的Demo中,其實是注冊了2個handler,可以在后續公布的項目地址里看到
六、總結
從有這個想法開始,到最終實現這個事件總線,大概總共花了2,3天的時間(PS:晚上回家獨自默默干活),目前只能說是有一個初步可以使用的版本,並且還存在着一些問題:
1、在.NetFrameWork下(目前公司還不想升級到.NetCore,吐血。。),如果使用AutoFac創建EventBus(單例模式下),如果Handler也使用AutoFac進行創建,會出現要么對象創建失敗,要么handler里的對象與調用方的對象不是同一個實例,為了解決這個問題,我讓EventBus不再是單例模式,將dicEvent變成了靜態,暫時表面解決
2、未考慮跨進程的實現(感覺用savorboard大佬的CAP就可以了)
3、目前這個東西在一個小的新項目里使用,暫時在測試環境還算沒啥問題,各位小伙伴如果有類似需求,可以做個參考
由於個人原因,在測試上可能會有所不夠,如果有什么bug的話,還請站內信告知,感謝(ps:文字表達弱雞,技術渣渣,各位多多包涵)
最后:附上項目地址:https://gitee.com/OneMango/MEventBus
作者: Mango
出處: http://www.cnblogs.com/OMango/
關於自己:專注.Net桌面開發以及Web后台開發,對.NetCore、微服務、DevOps,K8S等感興趣,最近到了個甲方公司准備休養一段時間
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,如有問題, 可站內信告知.