看eShopOnContainers學一個EventBus


最近在看微軟eShopOnContainers 項目,看到事件總線覺得不錯,和大家分享一下

看完此文你將獲得什么?

  1. eShop中是如何設計事件總線的
  2. 實現一個InMemory事件總線eShop中是沒有InMemory實現的,這算是一個小小小的挑戰

發布訂閱模式

發布訂閱模式可以讓應用程序組件之間解耦,這是我們使用這種模式最重要的理由之一,如果你完全不知道這個東西,建議你先通過搜索引擎了解一下這種模式,網上的資料很多這里就不再贅述了。

eShop中的EventBus就是基於這種模式的發布/訂閱
發布訂閱模式核心概念有三個:發布者、訂閱者、調度中心,這些概念在消息隊列中就是生產者、消費者、MQ實例

在eShop中有兩個EventBus的實現:

  • 基於RabbitMq的EventBusRabbitMQ
  • 基於AzureServiceBus的EventBusServiceBus

IEventBus開始

先來看一看,所有EventBus的接口IEventBus

public interface IEventBus
{
    void Publish(IntegrationEvent @event);

    void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;

    void SubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void UnsubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;
}

嗯,乍一看看是有點眼暈的,仔細看它的核心功能只有三個:

  1. Publish 發布
  2. Subscribe 訂閱
  3. Unsubscribe 取消訂閱

這對應着發布訂閱模式的基本概念,不過對於事件總線的接口添加了許多約束:

  1. 發布的內容(消息)必須是IntegrationEvent及其子類
  2. 訂閱事件必須指明要訂閱事件的類型,並附帶處理器類型
  3. 處理器必須是IIntegrationEventHandler的實現類

Ok,看到這里先不要管Dynamic相關的方法,然后記住這個兩個關鍵點:

  1. 事件必須繼承IntegrationEvent
  2. 處理器必須實現IIntegrationEventHandler<T>TIntegrationEvent子類

另外,看下 IntegrationEvent有什么

public class IntegrationEvent
{
    public IntegrationEvent()
    {
        Id = Guid.NewGuid();
        CreationDate = DateTime.UtcNow;
    }

    public Guid Id  { get; }
    public DateTime CreationDate { get; }
}

IEventBusSubscriptionsManager是什么

public interface IEventBusSubscriptionsManager
{
    bool IsEmpty { get; }
    event EventHandler<string> OnEventRemoved;
    void AddDynamicSubscription<TH>(string eventName)
       where TH : IDynamicIntegrationEventHandler;

    void AddSubscription<T, TH>()
       where T : IntegrationEvent
       where TH : IIntegrationEventHandler<T>;

    void RemoveSubscription<T, TH>()
         where TH : IIntegrationEventHandler<T>
         where T : IntegrationEvent;
    void RemoveDynamicSubscription<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
    bool HasSubscriptionsForEvent(string eventName);
    Type GetEventTypeByName(string eventName);
    void Clear();
    IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
    IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
    string GetEventKey<T>();
}

這個接口看起來稍顯復雜些,我們來簡化下看看:

public interface IEventBusSubscriptionsManager
{
    void AddSubscription<T, TH>()
    void RemoveSubscription<T, TH>()
    IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() 
}

最終,這三個方法就是我們要關注的,添加訂閱、移除訂閱、獲取指定事件的訂閱信息。

SubscriptionInfo是什么?

public bool IsDynamic { get; }
public Type HandlerType{ get; }

SubscriptionInfo中只有兩個信息,這是不是一個Dynamic類型的Event以及這個Event所對應的處理器的類型。

這是你可能會有另一個疑問:

這個和IEventBus有什么關系?

  1. IEventBusSubscriptionsManager含有更多功能:查看是否有訂閱,獲取事件的Type,獲取事件的處理器等等

  2. IEventBusSubscriptionsManagerIEventBus使用,在RabbitMq和ServiceBus的實現中,都使用Manager去存儲事件的信息,例如下面的代碼:

     public void Subscribe<T, TH>()
         where T : IntegrationEvent
         where TH : IIntegrationEventHandler<T>
     {
         // 查詢事件的全名
         var eventName = _subsManager.GetEventKey<T>();
    
         //向mq添加注冊
         DoInternalSubscription(eventName);
    
         // 向manager添加訂閱
         _subsManager.AddSubscription<T, TH>();
     }
    
     private void DoInternalSubscription(string eventName)
     {
         var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
         if (!containsKey)
         {
             if (!_persistentConnection.IsConnected)
             {
                 _persistentConnection.TryConnect();
             }
    
             using (var channel = _persistentConnection.CreateModel())
             {
                 channel.QueueBind(queue: _queueName,
                                     exchange: BROKER_NAME,
                                     routingKey: eventName);
             }
         }
     }
    

查詢事件的名字是manager做的,訂閱的時候是先向mq添加訂閱,之后又加到manager中,manager管理着訂閱的基本信息。

另外一個重要功能是獲取事件的處理器信息,在rabbit mq的實現中,ProcessEvent方法中用manager獲取了事件的處理器,再用依賴注入獲得處理器的實例,反射調用Handle方法處理事件信息:

    private async Task ProcessEvent(string eventName, string message)
    {
        // 從manager查詢信息
        if (_subsManager.HasSubscriptionsForEvent(eventName))
        {
            using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
            {

                // 從manager獲取處理器
                var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                foreach (var subscription in subscriptions)
                {

                    // Di + 反射調用,處理事件(兩個都是,只是針對是否是dynamic做了不同的處理)
                    if (subscription.IsDynamic)
                    { 
                        var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                        dynamic eventData = JObject.Parse(message);
                        await handler.Handle(eventData);
                    }
                    else
                    {
                        var eventType = _subsManager.GetEventTypeByName(eventName);
                        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                        var handler = scope.ResolveOptional(subscription.HandlerType);
                        var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                        await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                    }
                }
            }
        }
    }

IEventBusSubscriptionsManager的默認實現

在eShop中只有一個實現就是InMemoryEventBusSubscriptionsManager

這個類中有兩個重要的字段

    private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
    private readonly List<Type> _eventTypes;

他們分別存儲了事件列表和事件處理器信息詞典

接下來就是實現一個

基於內存的事件總線

我們要做什么呢?IEventBusSubscriptionsManager 已經有了InMemory的實現了,我們可以直接拿來用,所以我們只需要自己實現一個EventBus就好了

先貼出最終代碼:

public class InMemoryEventBus : IEventBus
{
    private readonly IServiceProvider _provider;
    private readonly ILogger<InMemoryEventBus> _logger;
    private readonly ISubscriptionsManager _manager;
    private readonly IList<IntegrationEvent> _events;
    public InMemoryEventBus(
        IServiceProvider provider,
        ILogger<InMemoryEventBus> logger, 
        ISubscriptionsManager manager)
    {
        _provider = provider;
        _logger = logger;
        _manager = manager;
    }

    public void Publish(IntegrationEvent e)
    {

        var eventType = e.GetType();
        var handlers = _manager.GetHandlersForEvent(eventType.FullName);

        foreach (var handlerInfo in handlers)
        {
            var handler = _provider.GetService(handlerInfo.HandlerType);

            var method = handlerInfo.HandlerType.GetMethod("Handle");

            method.Invoke(handler, new object[] { e });
        }
    }

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {

        _manager.AddSubscription<T, TH>();

    }

    public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
    {
        throw new NotImplementedException();
    }

    public void Unsubscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        _manager.RemoveSubscription<T, TH>();
    }

    public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
    {
        throw new NotImplementedException();
    }
}

首先構造函數中聲明我們要使用的東西:

public InMemoryEventBus(
    IServiceProvider provider,
    ILogger<InMemoryEventBus> logger, 
    ISubscriptionsManager manager)
{
    _provider = provider;
    _logger = logger;
    _manager = manager;
}

這里要注意的就是IServiceProvider provider這是 DI容器,當我們在切實處理事件的時候我們選擇從DI獲取處理器的實例,而不是反射創建,這要做的好處在於,處理器可以依賴於其它東西,並且可以是單例的

public void Subscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{

    _manager.AddSubscription<T, TH>();

}

public void Unsubscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    _manager.RemoveSubscription<T, TH>();
}

訂閱和取消訂閱很簡單,因為我們是InMemory的所以只調用了manager的方法。

接下來就是最重要的Publish方法,實現Publish有兩種方式:

  1. 使用額外的線程和Queue讓發布和處理異步

  2. 為了簡單起見,我們先寫個簡單易懂的同步的

     public void Publish(IntegrationEvent e)
     {
         // 首先要拿到集成事件的Type信息
         var eventType = e.GetType();
    
         // 獲取屬於這個事件的處理器列表,可能有很多,注意獲得的是SubscriptionInfo
         var handlers = _manager.GetHandlersForEvent(eventType.FullName);
    
         // 不解釋循環
         foreach (var handlerInfo in handlers)
         {
             // 從DI中獲取類型的實例
             var handler = _provider.GetService(handlerInfo.HandlerType);
    
             // 拿到Handle方法
             var method = handlerInfo.HandlerType.GetMethod("Handle");
    
             // 調用方法
             method.Invoke(handler, new object[] { e });
         }
     }
    

OK,我們的InMemoryEventBus就寫好了!

要實踐這個InMemoryEventBus,那么還需要一個IntegrationEvent的子類,和一個IIntegrationEventHandler<T>的實現類,這些都不難,例如我們做一個添加用戶的事件,A在添加用戶后,發起一個事件並將新用戶的名字作為事件數據,B去訂閱事件,並在自己的處理器中處理名字信息。

思路是這樣的:

  1. 寫一個 AddUserEvent:IntegrationEvent,里面有一個UserId和一個UserName

  2. 寫一個AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中輸出UserId和Name到日志。

  3. 注冊DI,你要注冊下面這些服務:

     IEventBus=>InMemoryEventBus
     ISubscriptionsManager=>InMemorySubscriptionsManager
     AddUserEventHandler=>AddUserEventHandler
    
  4. 在Startup中為剛剛寫的事件和處理器添加訂閱(在這里已經可以獲取到IEventBus實例了)

  5. 寫一個Api接口或是什么,調用IEventBus的Publish方法,new 一個新的AddUserEvent作為參數傳進去。

OK!到這里一個切實可用的InMemoryEventBus就可以使用了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM