源碼解析-Abp vNext丨分布式事件總線DistributedEventBus


前言

上一節咱們講了LocalEventBus,本節來講本地事件總線(DistributedEventBus),采用的RabbitMQ進行實現。

Volo.Abp.EventBus.RabbitMQ模塊內部代碼並不多,RabbitMQ的操作都集中在Volo.Abp.RabbitMQ這個包中。

正文

我們從模塊定義開始看,項目啟動的時候分別讀取了appsetting.json的配置參數和調用了RabbitMqDistributedEventBusInitialize函數。

    public class AbpEventBusRabbitMqModule : AbpModule
    {
        public override void ConfigureServices(ServiceConfigurationContext context)
        {
            var configuration = context.Services.GetConfiguration();

            Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
        }

        public override void OnApplicationInitialization(ApplicationInitializationContext context)
        {
            context
                .ServiceProvider
                .GetRequiredService<RabbitMqDistributedEventBus>()
                .Initialize();
        }
    }

Initialize函數中我們根據 MessageConsumerFactory.Create向內部進行查閱可以看到最終調用方法為RabbitMqMessageConsumer.TryCreateChannelAsync並且在其內部我們可以看到下面代碼,這里定義了消費的回調函數。反推Initialize方法其實是在啟動一個消費者。

      public void Initialize()
        {
            Consumer = MessageConsumerFactory.Create(
                new ExchangeDeclareConfiguration(
                    AbpRabbitMqEventBusOptions.ExchangeName,
                    type: "direct",
                    durable: true
                ),
                new QueueDeclareConfiguration(
                    AbpRabbitMqEventBusOptions.ClientName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false
                ),
                AbpRabbitMqEventBusOptions.ConnectionName
            );

            Consumer.OnMessageReceived(ProcessEventAsync);

            SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
        }

 var consumer = new AsyncEventingBasicConsumer(Channel);
                consumer.Received += HandleIncomingMessageAsync;

繼續向下看Consumer.OnMessageReceived(ProcessEventAsync);該方法向一個並發安全集合輸入一個委托事件,並該事件會在上面的HandleIncomingMessageAsync會調中觸發故確定為消費者的執行邏輯,而ProcessEventAsync其實還是走了我們在講LocalEventBus哪一套,尋找Handler執行函數。

SubscribeHandlers還是上節講的基類的函數,這里要注意內部調用的Subscribe該方法中的 Consumer.BindAsync會根據為消費者Bind路由,這樣才能觸發事件處理函數。


       public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
        {
            var handlerFactories = GetOrCreateHandlerFactories(eventType);

            if (factory.IsInFactories(handlerFactories))
            {
                return NullDisposable.Instance;
            }

            handlerFactories.Add(factory);

            if (handlerFactories.Count == 1) //TODO: Multi-threading!
            {
                Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
            }

            return new EventHandlerFactoryUnregistrar(this, eventType, factory);
        }

看完了事件消費者我們來看看事件發布,直接看PublishAsync函數就完事了,整個函數非常簡單,都是RabbitMQ的操作語法,這里的路由Key是在EventNameAttribute.GetNameOrDefault(eventType);函數中通過讀取ETO上指定注解Name來指定的。

protected Task PublishAsync(
            string eventName,
            byte[] body,
            IBasicProperties properties,
            Dictionary<string, object> headersArguments = null,
            Guid? eventId = null)
        {
            using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
            {
                channel.ExchangeDeclare(
                    AbpRabbitMqEventBusOptions.ExchangeName,
                    "direct",
                    durable: true
                );

                if (properties == null)
                {
                    properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
                }

                if (properties.MessageId.IsNullOrEmpty())
                {
                    properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
                }

                SetEventMessageHeaders(properties, headersArguments);

                channel.BasicPublish(
                    exchange: AbpRabbitMqEventBusOptions.ExchangeName,
                    routingKey: eventName,
                    mandatory: true,
                    basicProperties: properties,
                    body: body
                );
            }

            return Task.CompletedTask;
        }

解析

整個分布式事件的實現其實非常簡單,在事件發生時發布者只需要定義好路由名稱和消息內容發送RabbitMQ中,而消費者則是在項目運行的時候的通過調用Initialize就啟動起來了。

這里我們也同樣根據整個原理自己實現一下這個流程。

Dppt.EventBus分別定義IDistributedEventBus、DistributedEventBusOptions、IDistributedEventHandler分別用於采用分布式事件總線調用、配置選項用於存儲處理程序Handler、定義分布式處理程序抽象。

新建Dppt.EventBus.RabbitMQ類庫先簡單對RabbitMQ進行一個簡單的封裝

public class RabbitMqConnections : IRabbitMqConnections
    {
        private readonly IConnectionFactory _connectionFactory;
        private readonly ILogger<RabbitMqConnections> _logger;
        IConnection _connection;
        bool _disposed;
        public RabbitMqConnections(IConnectionFactory connectionFactory, ILogger<RabbitMqConnections> logger)
        {
            _connectionFactory = connectionFactory;
            _logger = logger;
        }


        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }

        public void TryConnect() {

            _connection = _connectionFactory.CreateConnection();

        }


        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
            }

            return _connection.CreateModel();
        }


        public void Dispose()
        {
            if (_disposed) return;

            _disposed = true;

            try
            {
                _connection.Dispose();
            }
            catch (IOException ex)
            {
                _logger.LogCritical(ex.ToString());
            }
        }

    }

然后我們分別定義ExchangeDeclareConfiguration、QueueDeclareConfiguration用於記錄配置信息。

開始處理RabbitMqEventBus處理程序首先是發布事件,大體代碼如下就是往RabbitMQ里面丟消息。

        /// <summary>
        /// rabbmitmq 連接服務
        /// </summary>
        public readonly IRabbitMqConnections _rabbitMqConnections;


public Task PublishAsync<TEvent>(TEvent eventData)
        {
            var eventName = EventNameAttribute.GetNameOrDefault(typeof(TEvent));
            var body = JsonSerializer.Serialize(eventData);
            return PublishAsync(eventName, body, null, null);
        }

        public Task PublishAsync(string eventName, string body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null)
        {

            if (!_rabbitMqConnections.IsConnected)
            {
                _rabbitMqConnections.TryConnect();
            }
            using (var channel = _rabbitMqConnections.CreateModel())
            {
                // durable 設置隊列持久化  
                channel.ExchangeDeclare(RabbitMqEventBusOptions.ExchangeName, "direct", durable: true);

                if (properties == null)
                {
                    properties = channel.CreateBasicProperties();
                    // 設置消息持久化
                    properties.DeliveryMode = 2;
                }

                if (properties.MessageId.IsNullOrEmpty())
                {
                    // 消息的唯一性標識
                    properties.MessageId = (eventId ?? Guid.NewGuid()).ToString("N");
                }

                SetEventMessageHeaders(properties, headersArguments);

                channel.BasicPublish(
                   exchange: RabbitMqEventBusOptions.ExchangeName,
                   routingKey: eventName,
                   mandatory: true,
                   basicProperties: properties,
                   body: Encoding.UTF8.GetBytes(body)
               );

            }

            return Task.CompletedTask;
        }

      private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
        {
            if (headersArguments == null)
            {
                return;
            }

            properties.Headers ??= new Dictionary<string, object>();

            foreach (var header in headersArguments)
            {
                properties.Headers[header.Key] = header.Value;
            }
        }


然后就是消費者的處理,我們同樣定義Initialize函數,並簡化部分封裝代碼,完成消費者啟動。

 public void Initialize()
        {

            Exchange = new ExchangeDeclareConfiguration(RabbitMqEventBusOptions.ExchangeName,"direct",true);
            Queue = new QueueDeclareConfiguration(RabbitMqEventBusOptions.ClientName, true, false, false);

            // 啟動一個消費者
            if (!_rabbitMqConnections.IsConnected)
            {
                _rabbitMqConnections.TryConnect();
            }

            try
            {

                Channel = _rabbitMqConnections.CreateModel();



                Channel.ExchangeDeclare(
                  exchange: Exchange.ExchangeName,
                  type: Exchange.Type,
                  durable: Exchange.Durable,
                  autoDelete: Exchange.AutoDelete,
                  arguments: Exchange.Arguments
              );


                Channel.QueueDeclare(
                   queue: Queue.QueueName,
                   durable: Queue.Durable,
                   exclusive: Queue.Exclusive,
                   autoDelete: Queue.AutoDelete,
                   arguments: Queue.Arguments
               );

                var consumer = new AsyncEventingBasicConsumer(Channel);
                consumer.Received += HandleIncomingMessageAsync;

                Channel.BasicConsume(
                    queue: Queue.QueueName,
                    autoAck: false,
                    consumer: consumer
                );

                SubscribeHandlers(DistributedEventBusOptions.Handlers);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error:" + ex.Message);
            }
        }

參數配置這邊主要是讀取AppSetting信息和索要Handler

 public static class DpptEventBusRabbitMqRegistrar
    {
        public static void AddDpptEventBusRabbitMq(this IServiceCollection services, IConfiguration configuration, List<Type> types)
        {
     
            services.AddSingleton<IRabbitMqConnections>(sp =>
            {
                var logger = sp.GetRequiredService<ILogger<RabbitMqConnections>>();

                var factory = new ConnectionFactory()
                {
                    HostName = configuration["RabbitMQ:EventBusConnection"],
                    VirtualHost = configuration["RabbitMQ:EventBusVirtualHost"],
                    DispatchConsumersAsync = true,
                    AutomaticRecoveryEnabled = true
            };

                if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusUserName"]))
                {
                    factory.UserName = configuration["RabbitMQ:EventBusUserName"];
                }

                if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusPassword"]))
                {
                    factory.Password = configuration["RabbitMQ:EventBusPassword"];
                }

                return new RabbitMqConnections(factory, logger);
            });

            var distributedHandlers = types;
            foreach (var item in distributedHandlers)
            {
                services.AddSingleton(item);
            }

            services.Configure<DistributedEventBusOptions>(options =>
            {
                options.Handlers.AddIfNotContains(distributedHandlers);
            });

            services.Configure<DpptRabbitMqEventBusOptions>(options => {

                options.ExchangeName = configuration["RabbitMQ:EventBus:ExchangeName"];
                options.ClientName = configuration["RabbitMQ:EventBus:ClientName"];
            });

            services.AddSingleton<IDistributedEventBus, RabbitMqDistributedEventBus>();

          
        }
    }

測試

新建一個空項目,進行插件注冊,然后創建ETO和Handler進行測試。

64

測試結果放在下面了。

62

63

結語

本次挑選了一個比較簡單的示例來講,整個EventBus我應該分成3篇 下一篇我來講分布式事務。

最后歡迎各位讀者關注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 歡迎大家Star

另外這里有個社區地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技術點希望我提前檔期可以寫在這里,希望本項目助力我們一起成長


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM