EventBus In eShop -- 解析微軟微服務架構Demo(四)


引言

大家好像對分析源碼厭倦了,說實在我也會厭倦,不過不看是無法分析其后面的東西,從易到難是一個必要的過程。

今天說下EventBus,前幾天園里的大神已經把其解刨,我今天就借着大神的肩膀,分析下在eShop項目中EventBus的實現。

最近發覺轉發文章不寫出處的,特此加上鏈接:http://inday.cnblogs.com

解析源碼

我們知道使用EventBus是為了解除Publisher和Subscriber之間的依賴性,這樣我們的Publisher就不需要知道有多少Subscribers,只需要通過EventBus進行注冊管理就好了,在eShop項目中,有一個這樣的接口IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)

public interface IEventBus
{
    void Subscribe<T, TH>(Func<TH> handler)
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;
    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;

    void Publish(IntegrationEvent @event);
}

我們可以看到這個接口定義了EventBus所需的一些操作, 對比大神的EventBus,相關功能都是一致的,我們看下它的實現類:EventBusRabbitMQ,從名字上可以看出,這是一個通過RabbitMQ來進行管理的EventBus,我們可以看到它使用了IEventBusSubscriptionsManager進行訂閱存儲,也就是大神文中的:

private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;

微軟在Demo中把其提取出了接口,把一些常用方法給提煉了出來,但是核心還是Dictionary<string, List<Delegate>>, 使用Dictionary進行Map映射。通過Subscribe和UnSubscribe進行訂閱和取消,使用Publish方法進行發布操作。

public void Subscribe<T, TH>(Func<TH> handler)
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = typeof(T).Name;
    var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
    if (!containsKey)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueBind(queue: _queueName,
                                exchange: BROKER_NAME,
                                routingKey: eventName);
        }
    }

    _subsManager.AddSubscription<T, TH>(handler);

}

我們看到在訂閱的時候,EventBus會檢查下在Map中是否有相應的注冊,如果沒有的話首先回去RabbitMQ中創建一個新的channel進行綁定,隨后在Map中進行注冊映射。

UnSubscribe則直接從Map中取消映射,通過OnEventRemoved事件判斷Map下此映射的subscriber是否為空,為空則從RabbitMQ中關閉channel。

在RabbitMQ的構造方法中,我們看到這樣一個創建:CreateConsumerChannel(),這里創建了一個EventingBasicConsumer,當Queue中有新的消息時會通過ProcessEvent執行Map中注冊的handler(subscribers),看圖可能更清晰些:

 

Event In eShop

在ProcessEvent方法中,回去Map中找尋subscribers,然后通過動態反射進行執行:

private async Task ProcessEvent(string eventName, string message)
{

    if (_subsManager.HasSubscriptionsForEvent(eventName))
    { 
        var eventType = _subsManager.GetEventTypeByName(eventName);
        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
        var handlers = _subsManager.GetHandlersForEvent(eventName);

        foreach (var handlerfactory in handlers)
        {
            var handler = handlerfactory.DynamicInvoke();
            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
        }
    }
}

微軟通過簡單的代碼解耦了Publisher和Subscribers之間的依賴關系,我們引用大神的總結:

image

應用

在catalog.api中,微軟出現了EventBus,我在上一篇中也提到了,這是我的一個疑惑,因為在catalog中並沒有訂閱操作,直接執行了Publish操作,原先以為是一個空操作,后來看了Basket.Api我才知道為何微軟要用RabbitMQ。

使用RabbitMQ,我們不僅是從類之間的解耦,更可以跨項目,跨語言,跨平台的解耦,publisher僅僅需要把消息體(IntegrationEvent)傳送到RabbitMQ,Consumer從Queue中獲取消息體,然后推送到Subscribers執行相應的操作。我們看下Basket.Api.Startup.cs:

protected virtual void ConfigureEventBus(IApplicationBuilder app)
{
    var catalogPriceHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>();

    var orderStartedHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();

    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

    eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());

    eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
}

在這個方法里,我們看到了Subscribe操作,想想之前的提問有點搞笑,不過研究明白了也不錯,對吧!

總結

今天我們看了EventBus在Demo中的應用,總結一下。

1、EventBus可以很好的解耦訂閱者和發布者之間的依賴

2、使用RabbitMQ能夠跨項目、跨平台、跨語言的解耦訂閱者和發布者

雖然在Demo中我們看到對訂閱者的管理是通過Dictionary內存的方式,所以我們的Subscribe僅僅只在Basket.Api中看到,但微軟是通過IEventBusSubscriptionsManager接口定義的,我們可以通過自己的需求來進行定制,可以做成分布式的,比如使用memcached。

寫在最后

每個月到下旬就會比較忙,所以文章發布會比較慢,但我也會堅持學習完eShop的,為了學習,我建了個群,大家可以進來一起學習,有什么建議和問題都可以進來哦。

eShop雖好,但不建議大家放到生產環境,畢竟是一個Demo,而且目前還是ALPHA版本,用來學習是一個很好的教材,這就是一個大雜燴,學習中你會學到很多新的東西,大家如果看好core的發展,可以一起研究下。

QQ群:376248054


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM