一般在工作中,都是直接使用已经封装好的mq的程序集进行功能开发。所以很多时候都没有去了解rabbitmq到底是如何封装(实现使用的)。所以心血来潮,简单记录下自己对rabbitmq的简单封装
整体的思路是:约定消息体均继承值Command,消息业务类均继承于Handler,并且业务实体均实现Handle方法。消息发布者发送command;消费者接收到消息时通过反射,触发对应的消费业务。
一,定义消息实体相关
1 /// <summary> 2 /// command消息接口 3 /// </summary> 4 public interface ICommand 5 { 6 } 7 /// <summary> 8 /// 消息基类 9 /// </summary> 10 public class Command : ICommand 11 { 12 public Command() { 13 Id = Guid.NewGuid(); 14 Time = DateTime.UtcNow; 15 } 16 /// <summary> 17 /// Id 18 /// </summary> 19 public Guid Id { get; set; } 20 /// <summary> 21 /// 消息时间 22 /// </summary> 23 public DateTime Time { get; set; } 24 }
实际的消息体
1 /// <summary> 2 /// 类别变更command 3 /// </summary> 4 public class CategoryChangedCommand : Command 5 { 6 /// <summary> 7 /// 类别Id 8 /// </summary> 9 public Guid CategoryId { get; set; } 10 /// <summary> 11 /// Sku id 12 /// </summary> 13 public Guid SkuId { get; set; } 14 }
二、定义消息处理类
1 /// <summary> 2 /// 消息处理接口 3 /// </summary> 4 /// <typeparam name="T"></typeparam> 5 public interface IHandler<T> where T : Command 6 { 7 /// <summary> 8 /// 处理消息 9 /// </summary> 10 /// <param name="t"></param> 11 void Handle(T t); 12 } 13 /// <summary> 14 /// 消息处理基类 15 /// </summary> 16 /// <typeparam name="T"></typeparam> 17 public abstract class Handler<T> : IHandler<T> where T : Command 18 { 19 public abstract void Handle(T t); 20 21 }
实现处理类
/// <summary> /// 类别变更处理类 /// </summary> public class CategoryChangeHandler : Handler<CategoryChangedCommand> { public override void Handle(CategoryChangedCommand t) { Console.WriteLine("sku 类别变化,修改对应listing的类别"); } }
三、通过反射,定义mq消费者(重,以下代码只是实现简单的direct方式的消费,仅作参考辅助理解)
消费者一般随着程序启动建立,所以一般都是在startup.cs中进行初始化启动监听消费。
1、初始化消费类别字典,字典的约定为command与对应的Handler作为一对。(如果是支持fanout,则可以设置成,一个command类别对应多个Handler)
private void InitHandler() { Assembly assembly = Assembly.LoadFrom(Path.Combine(AppContext.BaseDirectory, "SimpleNetCore.RabbitmqCommon.dll")); var types = assembly.GetTypes().Where(p => p.IsClass && !p.IsAbstract && p.GetInterfaces().Any(x => x.Name == "IHandler`1")); foreach (var type in types) { var handleMethod = type.GetMethod("Handle"); if (handleMethod != null) { var parameter = handleMethod.GetParameters()[0]; var parameterType = parameter.ParameterType; _DicHandlerType.Add(parameterType, type); } } }
2、创建消费者
1 var factory = new ConnectionFactory() 2 { 3 HostName = mqConfig.Host, 4 VirtualHost = mqConfig.VirtualHost, 5 UserName = mqConfig.UserName, 6 Password = mqConfig.Password 7 }; 8 foreach (var item in _DicHandlerType) 9 { 10 var connection = factory.CreateConnection(); 11 12 13 var channel = connection.CreateModel(); 14 channel.QueueDeclare(item.Key.FullName, true, false, false, null); 15 channel.ExchangeDeclare(item.Key.FullName, ExchangeType.Direct, true, false, null); 16 ///定义事件消费者 17 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); 18 consumer.Received += Consumer_Received; ; 19 //消费 20 channel.BasicConsume(item.Key.FullName, false, consumer); 21 //此处不关闭connection,channel,保持开启持续消费 22 23 24 }
3、消费事件方法,通过反射调用真正的业务类进行业务处理
1 private void Consumer_Received(object sender, BasicDeliverEventArgs e) 2 { 3 var message = Encoding.UTF8.GetString(e.Body.ToArray()); 4 #region 业务处理 5 Console.WriteLine(message); 6 #endregion 7 8 EventingBasicConsumer consumer = sender as EventingBasicConsumer; 9 string exchangeName = e.Exchange; 10 var typeItem = _DicHandlerType.FirstOrDefault(p => p.Key.FullName == exchangeName); 11 if (typeItem.Key != null) 12 { 13 var t = JsonHelper.DeserialType(message, typeItem.Key); 14 var obj = Activator.CreateInstance(typeItem.Value); 15 var method = typeItem.Value.GetMethod("Handle"); 16 method.Invoke(obj, new object[] { t }); 17 } 18 string routeKey = e.RoutingKey; 19 //设置已经被消费 20 consumer.Model.BasicAck(e.DeliveryTag, false); 21 22 }
四、定义MQ消息生产者(重)
1 /// <summary> 2 /// 消息发布者接口 3 /// </summary> 4 public interface IBusFactory 5 { 6 /// <summary> 7 /// 发送command--针对的是direct方式 8 /// </summary> 9 /// <typeparam name="T"></typeparam> 10 /// <param name="command"></param> 11 void SendCommand<T>(T command) where T : ICommand; 12 } 13 /// <summary> 14 /// 消息发布者 15 /// </summary> 16 public class BusFactory : IBusFactory 17 { 18 private RabbitmqConfig _config; 19 private ConnectionFactory _factory; 20 public BusFactory(IOptions<RabbitmqConfig> config) 21 { 22 this._config = config.Value; 23 this._factory = new ConnectionFactory() 24 { 25 HostName = _config.Host, 26 VirtualHost = _config.VirtualHost, 27 UserName = _config.UserName, 28 Password = _config.Password, 29 }; 30 } 31 public void SendCommand<T>(T command) where T : ICommand 32 { 33 string queueName = command.GetType().FullName; 34 string exchangeName = command.GetType().FullName; 35 string routeKey = command.GetType().FullName; 36 using (var connection = _factory.CreateConnection()) 37 { 38 using (var channel = connection.CreateModel()) 39 { 40 //定义交换机 41 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null); 42 //定义队列 43 channel.QueueDeclare(queueName, true, false, false, null); 44 //绑定 45 channel.QueueBind(queueName, exchangeName, routeKey, null); 46 string message = JsonHelper.Serial(command); 47 byte[] data = Encoding.UTF8.GetBytes(message); 48 //发送消息 49 channel.BasicPublish(exchangeName, routeKey, null, data); 50 } 51 } 52 } 53 }
附 mq配置类
1 /// <summary> 2 /// rabbitmq 配置类 3 /// </summary> 4 public class RabbitmqConfig 5 { 6 /// <summary> 7 /// 主机地址 8 /// </summary> 9 public string Host { get; set; } 10 /// <summary> 11 /// 用户名称 12 /// </summary> 13 public string UserName { get; set; } 14 /// <summary> 15 /// 密码 16 /// </summary> 17 public string Password { get; set; } 18 /// <summary> 19 /// 虚拟主机名称 20 /// </summary> 21 public string VirtualHost { get; set; } 22 }
1 /// <summary> 2 /// rabbitmq 配置类 3 /// </summary> 4 public class RabbitmqConfig 5 { 6 /// <summary> 7 /// 主机地址 8 /// </summary> 9 public string Host { get; set; } 10 /// <summary> 11 /// 用户名称 12 /// </summary> 13 public string UserName { get; set; } 14 /// <summary> 15 /// 密码 16 /// </summary> 17 public string Password { get; set; } 18 /// <summary> 19 /// 虚拟主机名称 20 /// </summary> 21 public string VirtualHost { get; set; } 22 }
对应配置文件
1 "RabbitmqConfig": { 2 "Host":"127.0.0.1", 3 "UserName": "qingy", 4 "Password": "r3295", 5 "VirtualHost": "vTest" 6 }
----
以上就是非常简单的封装实现。仅作为参考!