RabbitMQ的簡單封裝


一般在工作中,都是直接使用已經封裝好的mq的程序集進行功能開發。所以很多時候都沒有去了解rabbitmq到底是如何封裝(實現使用的)。所以心血來潮,簡單記錄下自己對rabbitmq的簡單封裝

整體的思路是:約定消息體均繼承值Command,消息業務類均繼承於Handler,並且業務實體均實現Handle方法。消息發布者發送command;消費者接收到消息時通過反射,觸發對應的消費業務。

一,定義消息實體相關

 1 /// <summary>
 2     /// command消息接口
 3     /// </summary>
 4     public interface ICommand
 5     {
 6     }
 7     /// <summary>
 8     /// 消息基類
 9     /// </summary>
10     public class Command : ICommand
11     {
12         public Command() {
13             Id = Guid.NewGuid();
14             Time = DateTime.UtcNow;
15         }
16         /// <summary>
17         /// Id
18         /// </summary>
19         public Guid Id { get; set; }
20         /// <summary>
21         /// 消息時間
22         /// </summary>
23         public DateTime Time { get; set; }
24     }

實際的消息體

 1     /// <summary>
 2     /// 類別變更command
 3     /// </summary>
 4     public class CategoryChangedCommand : Command
 5     {
 6         /// <summary>
 7         /// 類別Id
 8         /// </summary>
 9         public Guid CategoryId { get; set; }
10         /// <summary>
11         /// Sku id
12         /// </summary>
13         public Guid SkuId { get; set; }
14     }

二、定義消息處理類

 1     /// <summary>
 2     /// 消息處理接口
 3     /// </summary>
 4     /// <typeparam name="T"></typeparam>
 5     public interface IHandler<T> where T : Command
 6     {
 7         /// <summary>
 8         /// 處理消息
 9         /// </summary>
10         /// <param name="t"></param>
11         void Handle(T t);
12     }
13     /// <summary>
14     /// 消息處理基類
15     /// </summary>
16     /// <typeparam name="T"></typeparam>
17     public abstract class Handler<T> : IHandler<T> where T : Command
18     {
19         public abstract void Handle(T t);
20 
21     }

實現處理類

    /// <summary>
    /// 類別變更處理類
    /// </summary>
    public class CategoryChangeHandler : Handler<CategoryChangedCommand>
    {
        public override void Handle(CategoryChangedCommand t)
        {
            Console.WriteLine("sku 類別變化,修改對應listing的類別");
        }
    }

三、通過反射,定義mq消費者(重,以下代碼只是實現簡單的direct方式的消費,僅作參考輔助理解)

消費者一般隨着程序啟動建立,所以一般都是在startup.cs中進行初始化啟動監聽消費。

1、初始化消費類別字典,字典的約定為command與對應的Handler作為一對。(如果是支持fanout,則可以設置成,一個command類別對應多個Handler)

    private void InitHandler()
        {
            Assembly assembly = Assembly.LoadFrom(Path.Combine(AppContext.BaseDirectory, "SimpleNetCore.RabbitmqCommon.dll"));
            var types = assembly.GetTypes().Where(p => p.IsClass && !p.IsAbstract && p.GetInterfaces().Any(x => x.Name == "IHandler`1"));
            foreach (var type in types)
            {
                var handleMethod = type.GetMethod("Handle");
                if (handleMethod != null)
                {
                    var parameter = handleMethod.GetParameters()[0];
                    var parameterType = parameter.ParameterType;
                    _DicHandlerType.Add(parameterType, type);
                }
            }
        }

 

2、創建消費者

 1   var factory = new ConnectionFactory()
 2             {
 3                 HostName = mqConfig.Host,
 4                 VirtualHost = mqConfig.VirtualHost,
 5                 UserName = mqConfig.UserName,
 6                 Password = mqConfig.Password
 7             };
 8             foreach (var item in _DicHandlerType)
 9             {
10                 var connection = factory.CreateConnection();
11 
12 
13                 var channel = connection.CreateModel();
14                 channel.QueueDeclare(item.Key.FullName, true, false, false, null);
15                 channel.ExchangeDeclare(item.Key.FullName, ExchangeType.Direct, true, false, null);
16                 ///定義事件消費者
17                 EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
18                 consumer.Received += Consumer_Received; ;
19                 //消費
20                 channel.BasicConsume(item.Key.FullName, false, consumer);
21                 //此處不關閉connection,channel,保持開啟持續消費
22 
23 
24             }

3、消費事件方法,通過反射調用真正的業務類進行業務處理

 1         private void Consumer_Received(object sender, BasicDeliverEventArgs e)
 2         {
 3             var message = Encoding.UTF8.GetString(e.Body.ToArray());
 4             #region 業務處理
 5             Console.WriteLine(message);
 6             #endregion
 7 
 8             EventingBasicConsumer consumer = sender as EventingBasicConsumer;
 9             string exchangeName = e.Exchange;
10             var typeItem = _DicHandlerType.FirstOrDefault(p => p.Key.FullName == exchangeName);
11             if (typeItem.Key != null)
12             {
13                 var t = JsonHelper.DeserialType(message, typeItem.Key);
14                 var obj = Activator.CreateInstance(typeItem.Value);
15                 var method = typeItem.Value.GetMethod("Handle");
16                 method.Invoke(obj, new object[] { t });
17             }
18             string routeKey = e.RoutingKey;
19             //設置已經被消費
20             consumer.Model.BasicAck(e.DeliveryTag, false);
21            
22         }

 

四、定義MQ消息生產者(重)

 1     /// <summary>
 2     /// 消息發布者接口
 3     /// </summary>
 4     public interface IBusFactory
 5     {
 6         /// <summary>
 7         /// 發送command--針對的是direct方式
 8         /// </summary>
 9         /// <typeparam name="T"></typeparam>
10         /// <param name="command"></param>
11         void SendCommand<T>(T command) where T : ICommand;
12     }
13     /// <summary>
14     /// 消息發布者
15     /// </summary>
16     public class BusFactory : IBusFactory
17     {
18         private RabbitmqConfig _config;
19         private ConnectionFactory _factory;
20         public BusFactory(IOptions<RabbitmqConfig> config)
21         {
22             this._config = config.Value;
23             this._factory = new ConnectionFactory()
24             {
25                 HostName = _config.Host,
26                 VirtualHost = _config.VirtualHost,
27                 UserName = _config.UserName,
28                 Password = _config.Password,
29             };
30         }
31         public void SendCommand<T>(T command) where T : ICommand
32         {
33             string queueName = command.GetType().FullName;
34             string exchangeName = command.GetType().FullName;
35             string routeKey = command.GetType().FullName;
36             using (var connection = _factory.CreateConnection())
37             {
38                 using (var channel = connection.CreateModel())
39                 {
40                     //定義交換機
41                     channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
42                     //定義隊列
43                     channel.QueueDeclare(queueName, true, false, false, null);
44                     //綁定
45                     channel.QueueBind(queueName, exchangeName, routeKey, null);
46                     string message = JsonHelper.Serial(command);
47                     byte[] data = Encoding.UTF8.GetBytes(message);
48                     //發送消息
49                     channel.BasicPublish(exchangeName, routeKey, null, data);
50                 }
51             }
52         }
53     }

 

附 mq配置類

 1     /// <summary>
 2     /// rabbitmq 配置類
 3     /// </summary>
 4     public class RabbitmqConfig
 5     {
 6         /// <summary>
 7         /// 主機地址
 8         /// </summary>
 9         public string Host { get; set; }
10         /// <summary>
11         /// 用戶名稱
12         /// </summary>
13         public string UserName { get; set; }
14         /// <summary>
15         /// 密碼
16         /// </summary>
17         public string Password { get; set; }
18         /// <summary>
19         /// 虛擬主機名稱
20         /// </summary>
21         public string VirtualHost { get; set; }
22     }
 1     /// <summary>
 2     /// rabbitmq 配置類
 3     /// </summary>
 4     public class RabbitmqConfig
 5     {
 6         /// <summary>
 7         /// 主機地址
 8         /// </summary>
 9         public string Host { get; set; }
10         /// <summary>
11         /// 用戶名稱
12         /// </summary>
13         public string UserName { get; set; }
14         /// <summary>
15         /// 密碼
16         /// </summary>
17         public string Password { get; set; }
18         /// <summary>
19         /// 虛擬主機名稱
20         /// </summary>
21         public string VirtualHost { get; set; }
22     }

對應配置文件

1  "RabbitmqConfig": {
2     "Host":"127.0.0.1",
3     "UserName": "qingy",
4     "Password": "r3295",
5     "VirtualHost": "vTest"
6   }

 

 

 

----

以上就是非常簡單的封裝實現。僅作為參考!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM