本文主要圍繞RabbitMQ消息隊列和EventBus事件總線做筆記,其中有很多自己的理解和觀點,有不對之處還望大神指出,我也學習一下。
1,消息隊列
什么是消息隊列就不說了,這里只介紹為什么用它!!!
RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。它的以上好處在高並發等三高場景是十分必要的。
前提:使用RabbitMQ必須考慮高可用性
1、高可用:如果使用消息隊列,基本要配合集群的,因為如果MQ服務器崩了,那就整個服務災難了。
2、數據安全:必須保證數據不能丟失,也就是要考慮好最終一致性,做好補償機制。
3、合理的消費。
2,事件總線
事件總線是對發布-訂閱模式的一種實現。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種解耦的目的。
說人話就是:集中對消息隊列中處理的事件進行訂閱和綁定管理。
實現事件總線的關鍵是:
- 事件總線維護一個事件源與事件處理的映射字典;
- 通過單例模式,確保事件總線的唯一入口;
- 利用反射完成事件源與事件處理的初始化綁定;
- 提供統一的事件注冊、取消注冊和觸發接口;
1 public class EventBus 2 { 3 /// <summary> 4 /// EventBus 單例 5 /// </summary> 6 public static EventBus Default { get; private set; } 7 8 /// <summary> 9 /// 事件與事件的處理程序映射 10 /// </summary> 11 private static readonly ConcurrentDictionary<Type, List<Type>> m_EventAndHandlersMapping = new ConcurrentDictionary<Type, List<Type>>(); 12 13 /// <summary> 14 /// 加鎖對象 15 /// </summary> 16 private object m_lockObj; 17 18 static EventBus() 19 { 20 Default = new EventBus(); 21 } 22 23 private EventBus() 24 { 25 m_lockObj = new object(); 26 } 27 28 /// <summary> 29 /// 注冊事件源和事件處理程序 30 /// </summary> 31 /// <typeparam name="TEvent">事件類型</typeparam> 32 /// <typeparam name="THandler">處理程序類型</typeparam> 33 public void RegisterHandler<T, THandler>() 34 where T : EventBase 35 where THandler : IEventHandler<T> 36 { 37 AddMapping(typeof(T), typeof(THandler)); 38 } 39 40 /// <summary> 41 /// 新增映射 事件源+事件處理程序 42 /// </summary> 43 /// <param name="eventType"></param> 44 /// <param name="handlerType"></param> 45 private void AddMapping(Type eventType, Type handlerType) 46 { 47 var handlers = m_EventAndHandlersMapping.GetOrAdd(eventType, type => new List<Type>()); 48 lock (m_lockObj) 49 { 50 if (!handlers.Contains(handlerType)) 51 { 52 handlers.Add(handlerType); 53 } 54 } 55 } 56 57 /// <summary> 58 /// 異步觸發事件 59 /// </summary> 60 /// <param name="e"></param> 61 /// <returns></returns> 62 public async Task TriggerAsync<TEvent>(TEvent e) 63 where TEvent : EventBase 64 { 65 List<Type> value; 66 if (m_EventAndHandlersMapping.TryGetValue(e.GetType(), out value)) 67 { 68 foreach (var handler in value.OrderBy(p => p.Name)) 69 { 70 var instance = IocManager.ServiceProvider.GetService(handler) as IEventHandler<TEvent>; 71 await instance?.HandleEvent(e); 72 } 73 } 74 } 75 }
上邊就是一個簡單的事件總線管理類,主要任務就是對一個事件源與事件處理的映射,包括事件處理程序的觸發。
3,事件源+事件處理
為了集中對事件進行處理,就需要進一步提取基類來處理,如下所示:
1 /// <summary> 2 /// 事件處理程序基類 3 /// </summary> 4 public interface IBaseEventHandler 5 { 6 } 7 8 /// <summary> 9 /// 事件處理程序 10 /// 解釋:where T : EventBase 表示傳入的類型必須繼承 EventBase類 11 /// </summary> 12 public interface IEventHandler<T> : IBaseEventHandler 13 where T : EventBase 14 { 15 /// <summary> 16 /// 處理事件 17 /// </summary> 18 /// <param name="e">事件傳遞的數據</param> 19 Task HandleEvent(T e); 20 }
然后還需要聲明一個事件基類來傳遞數據信息,這里聲明一個強類型的帶數據的事件
1 /// <summary> 2 /// 事件基類 3 /// </summary> 4 public abstract class EventBase 5 { 6 protected EventBase() 7 : this(null) 8 { 9 } 10 protected EventBase(object source) 11 { 12 this.Source = source; 13 } 14 /// <summary> 15 /// 引發事件的源 16 /// </summary> 17 public object Source { get; private set; } 18 } 19 20 /// <summary> 21 /// 聲明一個強類型的帶數據的事件 傳遞數據 22 /// </summary> 23 public class EventWithData<TData> : EventBase 24 { 25 26 /// <summary> 27 /// 初始化 EventBase 28 /// </summary> 29 /// <param name="data">傳遞的數據</param> 30 public EventWithData(TData data) 31 { 32 this.Data = data; 33 } 34 35 /// <summary> 36 /// 事件傳遞的數據 37 /// </summary> 38 public TData Data { get; private set; } 39 40 /// <summary> 41 /// 創建 EventWithData'T' 42 /// </summary> 43 /// <param name="data">數據</param> 44 /// <returns></returns> 45 public static EventWithData<TData> New(TData data) 46 { 47 return new EventWithData<TData>(data); 48 } 49 }
到這里關於事件總線的設計就初步完成了,接下來是關於消息隊列 發布+訂閱的設計。