【NetCore】RabbitMQ 封裝事件總線基本功能


RabbitMQ 封裝

代碼

https://gitee.com/wosperry/wosperry-rabbit-mqtest/tree/master

參考Abp事件總線的用法,對拷貝的Demo進行簡單封裝

定義 RabbitMQOptions 用於配置

我希望可以通過修改appsettings.json配置RabbitMQ的地址、密碼等信息

{ 
  "MyRabbitMQOptions": {
    "UserName": "admin",
    "Password": "admin",
    "Host": "192.168.124.220",
    "Port": 5672,
    "ExchangeName": "PerryExchange"
  }
}

    public class MyRabbitMQOptions
    {
        public string UserName { get; set; }
        public string Password { get; set; }
        public string Host { get; set; }
        public int Port { get; set; }
        public string ExchangeName { get; set; } = "";
    }

定義 QueueNameAttribute 控制隊列名字

由於一開始寫的時候,直接使用類的FullName為隊列名稱,不太友好,所以單獨加了一個特性,用於更改隊列名

    /// <summary>
    /// 定義隊列名字,優先級高於類完整名
    /// </summary>
    [AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
    public class QueueNameAttribute : Attribute
    {
        public string QueueName { get; }
        public QueueNameAttribute(string queueName)
        {
            QueueName = queueName;
        }
    }

定義 IMyPublisher<T>,通過注入某個類型的 IMyPublisher<T>,自動序列化對象並發布到配置好的MQ里

我希望可以一次配置之后,在使用到的地方直接注入一個服務,調用某一個方法,即可發布消息,不需要在意具體怎么實現的。於是定義一個接口,只添加了一個方法。
然后在接口實現類里完成實體轉換成消息內容並發送到RabbitMQ中去。
具體實現可以看代碼注釋,關鍵地方都加了描述

    /// <summary>
    /// 用於注入使用
    /// </summary>
    public interface IMyEventHandler<T> : IMyEventHandler where T : class
    {
        Task PublishAsync(T data, Encoding encoding = null);
    } 
    /// <summary>
    /// 方便程序尋找IMyEventHandler的實現
    /// </summary>
    public interface IMyEventHandler
    {
        void Begin(IConnection connection);
    }
    public class MyPublisher<T> : IMyPublisher<T>, IDisposable where T : class
    {
        private readonly MyRabbitMQOptions _myOptions;
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _queueName;


        /// <summary>
        /// 非注入時使用此構造方法
        /// </summary>
        public MyPublisher(IConnection connection)
        {
            _connection = connection;
        }

        /// <summary>
        /// 依賴注入自動走這個構造方法
        /// </summary>
        /// <param name="optionsMonitor"></param>
        /// <param name="factory"></param>
        public MyPublisher(IOptionsMonitor<MyRabbitMQOptions> optionsMonitor, ConnectionFactory factory)
        {
            _myOptions = optionsMonitor.CurrentValue;
            _connection = factory.CreateConnection();

            // 創建通道
            _channel = _connection.CreateModel();

            // 聲明一個Exchange
            _channel.ExchangeDeclare(_myOptions.ExchangeName, ExchangeType.Direct, false, false, null);


            var type = typeof(T);
            // 獲取類上的QueueNameAttribute特性,如果不存在則使用類的完整名
            var attr = type.GetCustomAttribute<QueueNameAttribute>();
            _queueName = string.IsNullOrWhiteSpace(attr?.QueueName) ? type.FullName : attr.QueueName;

            // 聲明一個隊列 
            _channel.QueueDeclare(_queueName, false, false, false, null);

            //將隊列綁定到交換機
            _channel.QueueBind(_queueName, _myOptions.ExchangeName, _queueName, null);
        }

        /// <summary>
        /// 發布消息
        /// </summary>
        public Task PublishAsync(T data, Encoding encoding = null)
        {
            // 對象轉 object[] 發送
            var msg = JsonConvert.SerializeObject(data);
            byte[] bytes = (encoding ?? Encoding.UTF8).GetBytes(msg);
            _channel.BasicPublish(_myOptions.ExchangeName, _queueName, null, bytes);

            return Task.CompletedTask;
        }

        public void Dispose()
        {
            // 結束
            _channel.Close();
            _connection.Close();
        }
    }

定義 IMyEventHandler<T> ,供 NetCore 項目注入使用,配置后,可以在程序啟動的時候,找到該接口所有的實現類,並開啟消費者

我希望可以像Abp那樣,通過一個抽象類基類,自動獲取所有實現了這個抽象類的派生類,自動開啟對應的消息監聽,監聽到消息后,自動轉回對象類型,並執行派生類里的某個重寫方法。
具體實現可以查看代碼注釋

    /// <summary>
    /// Handler的配置
    /// </summary>
    public class MyEventHandlerOptions
    {
        /// <summary>
        /// 禁用 byte[] 解析
        /// </summary>
        public bool DisableDeserializeObject { get; set; } = false;
        /// <summary>
        /// 配置Encoding
        /// </summary>
        public Encoding Encoding { get; set; } = Encoding.UTF8;
    }
    public abstract class MyEventHandler<T> : IMyEventHandler<T> where T : class
    {
        private IModel _channel;
        private string _queueName;
        private EventingBasicConsumer _consumer;
        public MyEventHandlerOptions Options = new()
        {
            DisableDeserializeObject = false
        };

        public void Begin(IConnection connection)
        {
            var type = typeof(T);
            // 獲取類上的QueueNameAttribute特性,如果不存在則使用類的完整名
            var attr = type.GetCustomAttribute<QueueNameAttribute>();
            _queueName = string.IsNullOrWhiteSpace(attr?.QueueName) ? type.FullName : attr.QueueName;

            //創建通道
            _channel = connection.CreateModel();

            _consumer = new EventingBasicConsumer(_channel);
            _consumer.Received += MyReceivedHandler;
            //消費者
            _channel.BasicConsume(_queueName, false, _consumer);
        }

        // 收到消息后
        private void MyReceivedHandler(object sender, BasicDeliverEventArgs e)
        {
            try
            {
                // 反序列化為對象
                var message = Options.Encoding.GetString(e.Body);
                T data = null;
                // 如果未配置禁用則不解析,后面抽象方法的data參數會始終為空
                if (!Options.DisableDeserializeObject)
                {
                    data = JsonConvert.DeserializeObject<T>(message);
                }

                OnReceivedAsync(data, message).Wait();

                // 確認該消息已被消費
                _channel?.BasicAck(e.DeliveryTag, false);
            }
            catch (Exception ex)
            {
                OnConsumerException(ex);
            }
        }

        /// <summary>
        /// 收到消息 
        /// </summary>
        /// <param name="data">解析后的對象</param>
        /// <param name="message">消息原文</param> 
        /// <remarks>Options.DisableDeserializeObject為true時,data始終為null</remarks>
        public abstract Task OnReceivedAsync(T data, string message);

        /// <summary>
        /// 異常
        /// </summary>
        /// <param name="ex">派生類不重寫的話,異常被隱藏</param>
        public virtual void OnConsumerException(Exception ex)
        {

        }
    }


給依賴注入寫一些拓展方法

這里是因為如果要宿主機程序配置Ioc容器注冊的話,需要在宿主程序寫好些個代碼,為了減少使用時候的配置,把所有相關的Ioc容器注冊步驟封裝到拓展方法,減少使用時的代碼量

另外,事件消息消費者的啟動在有兩個方法,實現邏輯是先在app尚未被創建之前,把需要啟動的EventHandler注冊到容器里(可以直接使用程序集的所有類)
在app創建之后,調用UseMyEventHandler時,會從Ioc容器里找到所有的實現,並調用他們的Begin方法。

    public static class MyRabbiteMQExtensions
    {
        /// <summary>
        /// 初始化消息隊列,並添加Publisher到IoC容器
        /// </summary>
        /// <remarks>從Configuration讀取"MyRabbbitMQOptions配置項"</remarks>
        public static IServiceCollection AddMyRabbitMQ(this IServiceCollection services, IConfiguration configuration)
        {
            #region 配置項
            // 從Configuration讀取"MyRabbbitMQOptions配置項
            var optionSection = configuration.GetSection("MyRabbitMQOptions");

            // 這個myOptions是當前方法使用
            MyRabbitMQOptions myOptions = new();
            optionSection.Bind(myOptions);

            // 加了這行,才可以注入IOptions<MyRabbitMQOptions>或者IOptionsMonitor<MyRabbitMQOptions>
            services.Configure<MyRabbitMQOptions>(optionSection);
            #endregion

            // 加了這行,才可以注入任意類型參數的 IMyPublisher<> 使用
            services.AddTransient(typeof(IMyPublisher<>), typeof(MyPublisher<>));

            // 創建一個工廠對象,並配置單例注入
            services.AddSingleton(new ConnectionFactory
            {
                UserName = myOptions.UserName,
                Password = myOptions.Password,
                HostName = myOptions.Host,
                Port = myOptions.Port
            });

            return services;
        }

        /// <summary>
        /// IServiceCollection的拓展方法,用於發現自定義的EventHandler並添加到服務容器
        /// </summary> 
        /// <param name="types">包含了自定義Handler的類集合,可以使用assembly.GetTypes()</param> 
        /// <remarks>遍歷所有types,將繼承自IMyEventHandler的類注冊到容器</remarks>
        public static IServiceCollection AddMyRabbitMQEventHandlers(this IServiceCollection services, Type[] types)
        {
            var baseType = typeof(IMyEventHandler);

            foreach (var type in types)
            {
                // baseType可以放type,並且type不是baseType
                if (baseType.IsAssignableFrom(type) && baseType != type)
                {
                    // 瞬態注入配置
                    services.AddTransient(typeof(IMyEventHandler), type);
                }
            }

            return services;
        }

        /// <summary>
        /// 給app拓展方法
        /// </summary>
        /// <remarks>
        /// 在IoC容器里獲取到所有繼承自IMyEvetnHandler的實現類,並開啟消費者
        /// </remarks>
        public static IApplicationBuilder UseMyEventHandler(this IApplicationBuilder app)
        {
            var handlers = app.ApplicationServices.GetServices(typeof(IMyEventHandler));
            var factory = app.ApplicationServices.GetService<ConnectionFactory>();

            // 遍歷調用自定義的Begin方法
            foreach (var h in handlers)
            {
                var handler = h as IMyEventHandler;
                handler?.Begin(factory.CreateConnection());
            }

            return app;
        }
    }

在Net6 WebApi中使用

這個項目是引用了上方封裝的一個獨立的宿主項目,
如果僅使用發布者,則只需要提供appsettings.json里的配置,和 services.AddMyRabbitMQ(builder.Configuration); 方法即可。

如果需要使用事件隊列消費者,則需要添加兩個方法調用,一是把EventHandler添加到容器中,二是需要在app創建后調用UseMyEventHandler();
代碼執行如上文所說,會從Ioc容器里找到所有的實現,並調用他們的Begin方法開啟監聽。

program.cs

    var builder = WebApplication.CreateBuilder(args);
    builder.Services.AddControllers();


    // 添加MyRabbitMQ到services
    builder.Services.AddMyRabbitMQ(builder.Configuration);

    // 當前程序集所有類型
    var currentAssemblyTypes = Assembly.GetExecutingAssembly().GetTypes();
    builder.Services.AddMyRabbitMQEventHandlers(currentAssemblyTypes ); 

    // 上面參數可以像下面這樣配置
    // builder.Services.AddMyRabbitMQEventHandlers(new Type[]{typeof(MyHandler1), typeof(MyHandler2), typeof(MyHandler3)}); 
    // builder.Services.AddMyRabbitMQEventHandlers(typeof(MyHandler1), typeof(MyHandler2), typeof(MyHandler3)); 

    var app = builder.Build();

    // 使用 MyEventHandler
    app.UseMyEventHandler();


    app.MapControllers();
    app.Run();

定義ETO
其實是一個普通的類
這個類有兩個用處,

一是要給是在使用時注入IPublisher的時候,指定消息的實體是什么。
二是寫消息消費者時,需要給BaseEventHandler指定消息的實體是什么。這樣在接收到消息之后,程序就知道該反序列化成什么類型的對象。

    [QueueName("perry.test")]
    public class PerryTest
    {
        public Guid Id { get; set; }
        public string? Name { get; set; }
        public int Count { get; set; }
        public string? Remark { get; set; }
    }

定義EventHandler

只需要繼承了MyEventHandler並提供一個普通類型,則可以監聽這個類型的消息,隊列名字可以通過[QueueName("隊列名")]修改

    public class PerryTestEventHandler : MyEventHandler<PerryTest>
    {
        public override Task OnReceivedAsync(PerryTest data, string message)
        {
            Console.WriteLine(message);
            return Task.CompletedTask;
        }

        public override void OnConsumerException(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

控制器中檢查是否可以正常使用

指定類型注入IMyPublisher<>,即可使用PublishAsync(data)方法發送消息,隊列名字可以通過[QueueName("隊列名")]修改

    [Route("api")]
    [ApiController]
    public class TestController : ControllerBase
    {
        public IMyPublisher<PerryTest> TestPublisher { get; }

        public TestController(IMyPublisher<PerryTest> testPublisher)
        {
            TestPublisher = testPublisher;
        }
        [HttpGet("test")]
        public async Task<string> TestAsync()
        {
            var data = new PerryTest()
            {
                Id = Guid.NewGuid(),
                Name = "AAA",
                Count = 123,
                Remark = "哈哈哈"
            };

            await TestPublisher.PublishAsync(data);

            return "發送了一個消息";
        }
    }

運行截圖

參考 .NET Core 使用RabbitMQ,拷貝了一些Demo

文章里的生產者Demo

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用戶名
        Password = "admin",//密碼
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();
    //聲明一個隊列
    channel.QueueDeclare("hello", false, false, false, null);

    Console.WriteLine("\nRabbitMQ連接成功,請輸入消息,輸入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //發布消息
        channel.BasicPublish("", "hello", null, sendBytes);
    } while (input.Trim().ToLower()!="exit");
    channel.Close();
    connection.Close();

文章里的消費者Demo

    //創建連接工廠
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用戶名
        Password = "admin",//密碼
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //創建連接
    var connection = factory.CreateConnection();
    //創建通道
    var channel = connection.CreateModel();

    //事件基本消費者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

    //接收到消息事件
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);
        Console.WriteLine($"收到消息: {message}");
        //確認該消息已被消費
        channel.BasicAck(ea.DeliveryTag, false);
    };
    //啟動消費者 設置為手動應答消息
    channel.BasicConsume("hello", false, consumer);
    Console.WriteLine("消費者已啟動");
    Console.ReadKey();
    channel.Dispose();
    connection.Close();

這次封裝的總結

  1. 上網找個Demo
  2. 先按最簡單的寫法,寫完能正常使用的,功能獨立的代碼。其實是提醒自己,不要陷入到雜七雜八的各項優化中去,先寫完實現再考慮怎么去改成更加好的。
  3. 腦袋里想着,我要怎么樣使用這個功能,怎么樣才能讓用的時候寫的代碼少一些,配置簡單一些
  4. 檢查哪些內容應該分離到配置項,然后抽離出去
  5. 考慮要支持哪些類型的項目,如果想要支持低版本,可能需要降級一些依賴包
  6. 支持構造函數注入的話,要注意最多參數的構造函數給依賴注入使用,依賴注入用的構造函數不好被非注入時使用的話,考慮多提供一個方法給非注入時使用。
  7. 最后,希望這篇文章能幫助到正在學習MQ或者ABP的事件總線的朋友們

轉載請注明出處 https://www.cnblogs.com/wosperry/p/15645927.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM