ABP vNext 封裝了兩種事件總線結構,第一種是 ABP vNext 自己實現的本地事件總線,這種事件總線無法跨項目發布和訂閱。第二種則是分布式事件總線,ABP vNext 自己封裝了一個抽象層進行定義,並使用 RabbitMQ 編寫了一個基本實現。
我們在實際生產中並不能開箱即用,所以集成了Rebus框架,重新開發了消息隊列組件,Rebus的相關介紹資料不多,接下來就給大家介紹下我們是如何集成Rebus到ABP VNext框架里的。
首先我們這個一個標准的服務分為兩個項目,一個是前端API站點,另外一個是API Work站點,后者只用來接收MQ消息進行消費,不提供對外接口。
1.ReBus簡介
Rebus是.NET的精益服務總線實現。它依賴於Newtonsoft JSON.NET,它支持.NET 4.5和.NET Standard 2.0作為平台目標。這意味着無論您使用的是完整的.NET框架還是使用的是.NET Core,您的平台上都非常有可能支持Rebus。
簡單來說它屏蔽了消息隊列工具內部復雜的配置交互邏輯,對外暴露出一系列相關的直接使用方法以謀求達到接近於開箱即用的效果。
Rebus GitHub:https://github.com/rebus-org/Rebus
2.封裝介紹
那如何對其進行改造集成到我們的ABP VNext項目中呢,我來介紹下其內部的核心方法來幫助大家理解其中的消息隊列模塊。
那我們知道消息隊列是分為生產者和消費者的,生產者發布消息,消費者訂閱相關隊列來接收消息。那兩者之間就是通過隊列在進行綁定的,所以在項目啟動的時間我們首先要做的就是聲明綁定隊列。
在這里交換器我們使用Rebus默認聲明的交換器哈。
我們封裝了一個方法來進行隊列的聲明和綁定,里面就是針對於Rebus提供的相關接口的封裝:
public static void UseRabbitMq(this ServiceConfigurationContext context, [NotNull] string queueName) { if (string.IsNullOrEmpty(queueName)) { throw new ArgumentNullException(nameof(queueName)); } context.Services.AddRabbitMqConnection(); context.Services.Configure<AegisRebusOptions>(options => { options.ConfigureLibraryDefault(); options.UseRabbitMq(); }); context.Services.Configure<AegisRebusRabbitMqOptions>(options => { options.QueueName = queueName; }); }
在使用的時候只需要傳入對於的隊列名稱即可,UseRabbitMq()會根據傳入的隊列名稱進行聲明綁定:
public static void UseRabbitMq(this AegisRebusOptions rebusOptions, string connectionString = null, string inputQueueName = null, Action<RabbitMqOptionsBuilder> optionsAction = null) { rebusOptions.Configure(context => { var rabbitMqOptions = context.ServiceProvider .GetRequiredService<IOptions<AegisRebusRabbitMqOptions>>().Value; connectionString ??= rabbitMqOptions.RabbitMqConnection; inputQueueName ??= rabbitMqOptions.QueueName; context.Transport(t => { var builder = t.UseRabbitMq(connectionString, inputQueueName); optionsAction?.Invoke(builder); }); }); }
OK,隊列有了之后,那如何將隊列與Routing key來進行綁定呢,我們在實際使用中一個Routing key就代表了一個消費方法或者接口。為了統一管理Routing key的聲明和綁定,我們設定了一個統一的泛型接口:IDistributedEventHandler
public interface IDistributedEventHandler<in TEvent> : IEventHandler { /// <summary> /// Handler handles the event by implementing this method. /// </summary> /// <param name="eventData">Event data</param> Task HandleEventAsync(TEvent eventData); }
那這個泛型接口有什么用的,首先所有的消費類都需要繼承於它,類似於這樣:
public class TestWebMQHandler : IDistributedEventHandler<TestMQEvent>, ITransientDependency { public async Task HandleEventAsync(TestMQEvent eventData) { string result = eventData.SendMessage; if (result != null) { result = "test"; } await Task.FromResult(result); } }
namespace TestWeb.Domain { public class TestMQEvent { public string SendMessage { get; set; } } }
TestMQEvent這個是我自定義的消息內容類,你可以理解為消息的入參,就是我們要消費的消息。繼承IDistributedEventHandler接口之后,程序默認會根據消息內容類的程序集生成Routing Key,方式就是NameSpace+類名。
那如上這個實例消費類會生成的Routing Key是:TestWeb.Domain.TestMQEvent, TestWeb.Domain,實際效果如下:
OK,Routing Key生成之后又是如何與隊列綁定的呢,也就是消費者如何訂閱的。
項目運行時,消費者的訂閱時統一處理的,我們上面說·所有的消費類都需要繼承 IDistributedEventHandler 這個泛型接口。
我們首先會通過反射拿到所有繼承於IDistributedEventHandler的消息類,類似上面就是TestWebMQHandler,然后獲取對應的消息提類來綁定到隊列上:
private static void AddEventHandlers(IServiceCollection services) { var localHandlers = new List<Type>(); var distributedHandlers = new List<Type>(); services.OnRegistered(context => { if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(ILocalEventHandler<>))) { localHandlers.Add(context.ImplementationType); } else if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IDistributedEventHandler<>))) { distributedHandlers.Add(context.ImplementationType); } }); services.Configure<LocalEventBusOptions>(options => { options.Handlers.AddIfNotContains(localHandlers); }); services.Configure<DistributedEventBusOptions>(options => { options.Handlers.AddIfNotContains(distributedHandlers); }); }
上面時拿到對應的消費方法類,接着獲取消息內容類,從而調用訂閱方法:
private void Initialize(IEnumerable<Type> handleTypes) { foreach (var handleType in handleTypes) { var interfaces = handleType.GetInterfaces(); foreach (var @interface in interfaces) { if (!typeof(IEventHandler).IsAssignableFrom(@interface)) { continue; } var genericArgs = @interface.GetGenericArguments(); if (genericArgs.Length != 1) continue; var types = _handlerTypes.GetOrAdd(genericArgs[0], new List<Type>()); types.AddIfNotContains(handleType); } } }
public static IServiceProvider UseRebus(this IServiceProvider provider) { var bus = provider.GetRequiredService<IBus>(); var activator = provider.GetRequiredService<IAegisDistributedEventHandlerActivator>(); foreach (var eventType in activator.GetAllEventTypes()) { bus.Subscribe(eventType); } return provider; }
核心的邏輯就是這樣,在使用中調用Rebus提供的發布方法即可。
這樣的使用方式將消費處理剝離出來,而不是將生產和消費的處理程序集成在同一個服務器上,方便快速增加消費者橫向擴容。