一般在工作中,都是直接使用已經封裝好的mq的程序集進行功能開發。所以很多時候都沒有去了解rabbitmq到底是如何封裝(實現使用的)。所以心血來潮,簡單記錄下自己對rabbitmq的簡單封裝
整體的思路是:約定消息體均繼承值Command,消息業務類均繼承於Handler,並且業務實體均實現Handle方法。消息發布者發送command;消費者接收到消息時通過反射,觸發對應的消費業務。
一,定義消息實體相關
1 /// <summary> 2 /// command消息接口 3 /// </summary> 4 public interface ICommand 5 { 6 } 7 /// <summary> 8 /// 消息基類 9 /// </summary> 10 public class Command : ICommand 11 { 12 public Command() { 13 Id = Guid.NewGuid(); 14 Time = DateTime.UtcNow; 15 } 16 /// <summary> 17 /// Id 18 /// </summary> 19 public Guid Id { get; set; } 20 /// <summary> 21 /// 消息時間 22 /// </summary> 23 public DateTime Time { get; set; } 24 }
實際的消息體
1 /// <summary> 2 /// 類別變更command 3 /// </summary> 4 public class CategoryChangedCommand : Command 5 { 6 /// <summary> 7 /// 類別Id 8 /// </summary> 9 public Guid CategoryId { get; set; } 10 /// <summary> 11 /// Sku id 12 /// </summary> 13 public Guid SkuId { get; set; } 14 }
二、定義消息處理類
1 /// <summary> 2 /// 消息處理接口 3 /// </summary> 4 /// <typeparam name="T"></typeparam> 5 public interface IHandler<T> where T : Command 6 { 7 /// <summary> 8 /// 處理消息 9 /// </summary> 10 /// <param name="t"></param> 11 void Handle(T t); 12 } 13 /// <summary> 14 /// 消息處理基類 15 /// </summary> 16 /// <typeparam name="T"></typeparam> 17 public abstract class Handler<T> : IHandler<T> where T : Command 18 { 19 public abstract void Handle(T t); 20 21 }
實現處理類
/// <summary> /// 類別變更處理類 /// </summary> public class CategoryChangeHandler : Handler<CategoryChangedCommand> { public override void Handle(CategoryChangedCommand t) { Console.WriteLine("sku 類別變化,修改對應listing的類別"); } }
三、通過反射,定義mq消費者(重,以下代碼只是實現簡單的direct方式的消費,僅作參考輔助理解)
消費者一般隨着程序啟動建立,所以一般都是在startup.cs中進行初始化啟動監聽消費。
1、初始化消費類別字典,字典的約定為command與對應的Handler作為一對。(如果是支持fanout,則可以設置成,一個command類別對應多個Handler)
private void InitHandler() { Assembly assembly = Assembly.LoadFrom(Path.Combine(AppContext.BaseDirectory, "SimpleNetCore.RabbitmqCommon.dll")); var types = assembly.GetTypes().Where(p => p.IsClass && !p.IsAbstract && p.GetInterfaces().Any(x => x.Name == "IHandler`1")); foreach (var type in types) { var handleMethod = type.GetMethod("Handle"); if (handleMethod != null) { var parameter = handleMethod.GetParameters()[0]; var parameterType = parameter.ParameterType; _DicHandlerType.Add(parameterType, type); } } }
2、創建消費者
1 var factory = new ConnectionFactory() 2 { 3 HostName = mqConfig.Host, 4 VirtualHost = mqConfig.VirtualHost, 5 UserName = mqConfig.UserName, 6 Password = mqConfig.Password 7 }; 8 foreach (var item in _DicHandlerType) 9 { 10 var connection = factory.CreateConnection(); 11 12 13 var channel = connection.CreateModel(); 14 channel.QueueDeclare(item.Key.FullName, true, false, false, null); 15 channel.ExchangeDeclare(item.Key.FullName, ExchangeType.Direct, true, false, null); 16 ///定義事件消費者 17 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); 18 consumer.Received += Consumer_Received; ; 19 //消費 20 channel.BasicConsume(item.Key.FullName, false, consumer); 21 //此處不關閉connection,channel,保持開啟持續消費 22 23 24 }
3、消費事件方法,通過反射調用真正的業務類進行業務處理
1 private void Consumer_Received(object sender, BasicDeliverEventArgs e) 2 { 3 var message = Encoding.UTF8.GetString(e.Body.ToArray()); 4 #region 業務處理 5 Console.WriteLine(message); 6 #endregion 7 8 EventingBasicConsumer consumer = sender as EventingBasicConsumer; 9 string exchangeName = e.Exchange; 10 var typeItem = _DicHandlerType.FirstOrDefault(p => p.Key.FullName == exchangeName); 11 if (typeItem.Key != null) 12 { 13 var t = JsonHelper.DeserialType(message, typeItem.Key); 14 var obj = Activator.CreateInstance(typeItem.Value); 15 var method = typeItem.Value.GetMethod("Handle"); 16 method.Invoke(obj, new object[] { t }); 17 } 18 string routeKey = e.RoutingKey; 19 //設置已經被消費 20 consumer.Model.BasicAck(e.DeliveryTag, false); 21 22 }
四、定義MQ消息生產者(重)
1 /// <summary> 2 /// 消息發布者接口 3 /// </summary> 4 public interface IBusFactory 5 { 6 /// <summary> 7 /// 發送command--針對的是direct方式 8 /// </summary> 9 /// <typeparam name="T"></typeparam> 10 /// <param name="command"></param> 11 void SendCommand<T>(T command) where T : ICommand; 12 } 13 /// <summary> 14 /// 消息發布者 15 /// </summary> 16 public class BusFactory : IBusFactory 17 { 18 private RabbitmqConfig _config; 19 private ConnectionFactory _factory; 20 public BusFactory(IOptions<RabbitmqConfig> config) 21 { 22 this._config = config.Value; 23 this._factory = new ConnectionFactory() 24 { 25 HostName = _config.Host, 26 VirtualHost = _config.VirtualHost, 27 UserName = _config.UserName, 28 Password = _config.Password, 29 }; 30 } 31 public void SendCommand<T>(T command) where T : ICommand 32 { 33 string queueName = command.GetType().FullName; 34 string exchangeName = command.GetType().FullName; 35 string routeKey = command.GetType().FullName; 36 using (var connection = _factory.CreateConnection()) 37 { 38 using (var channel = connection.CreateModel()) 39 { 40 //定義交換機 41 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null); 42 //定義隊列 43 channel.QueueDeclare(queueName, true, false, false, null); 44 //綁定 45 channel.QueueBind(queueName, exchangeName, routeKey, null); 46 string message = JsonHelper.Serial(command); 47 byte[] data = Encoding.UTF8.GetBytes(message); 48 //發送消息 49 channel.BasicPublish(exchangeName, routeKey, null, data); 50 } 51 } 52 } 53 }
附 mq配置類
1 /// <summary> 2 /// rabbitmq 配置類 3 /// </summary> 4 public class RabbitmqConfig 5 { 6 /// <summary> 7 /// 主機地址 8 /// </summary> 9 public string Host { get; set; } 10 /// <summary> 11 /// 用戶名稱 12 /// </summary> 13 public string UserName { get; set; } 14 /// <summary> 15 /// 密碼 16 /// </summary> 17 public string Password { get; set; } 18 /// <summary> 19 /// 虛擬主機名稱 20 /// </summary> 21 public string VirtualHost { get; set; } 22 }
1 /// <summary> 2 /// rabbitmq 配置類 3 /// </summary> 4 public class RabbitmqConfig 5 { 6 /// <summary> 7 /// 主機地址 8 /// </summary> 9 public string Host { get; set; } 10 /// <summary> 11 /// 用戶名稱 12 /// </summary> 13 public string UserName { get; set; } 14 /// <summary> 15 /// 密碼 16 /// </summary> 17 public string Password { get; set; } 18 /// <summary> 19 /// 虛擬主機名稱 20 /// </summary> 21 public string VirtualHost { get; set; } 22 }
對應配置文件
1 "RabbitmqConfig": { 2 "Host":"127.0.0.1", 3 "UserName": "qingy", 4 "Password": "r3295", 5 "VirtualHost": "vTest" 6 }
----
以上就是非常簡單的封裝實現。僅作為參考!