.net core使用rabbitmq消息队列


网址:https://www.cnblogs.com/shanfeng1000/p/12274400.html

 

 首先,如果你还没有安装好rabbitmq,可以参考我的博客:

  Ubuntu16.04下,erlang安装和rabbitmq安装步骤

  Ubuntu16.04下,rabbimq集群搭建

  另外,我的另外一篇博客有介绍rabbitmq的基础用法以及使用C#操作rabbitmq,并且对rabbitmq有一个简单的封装,这里使用.net core操作rabbitmq也会使用到这些封装类,所以感兴趣的可以看看:

  C# .net 环境下使用rabbitmq消息队列

  好了现在开始我们的正文

  Rabbitmq一般使用

  说明一下,这里我们使用的是.net core 2.1

  我们先创建一个RabbitMQDemo项目(我创建的是MVC项目),然后使用nuget安装RabbitMQ.Client:

  

   将上面封装的rabbitmq操作类添加到项目中:   

  
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQDemo
{
    public class QueueOptions
    {
        /// <summary>
        /// 是否持久化
        /// </summary>
        public bool Durable { get; set; } = true;
        /// <summary>
        /// 是否自动删除
        /// </summary>
        public bool AutoDelete { get; set; } = false;
        /// <summary>
        /// 参数
        /// </summary>
        public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
    }
    public class ConsumeQueueOptions : QueueOptions
    {
        /// <summary>
        /// 是否自动提交
        /// </summary>
        public bool AutoAck { get; set; } = false;
        /// <summary>
        /// 每次发送消息条数
        /// </summary>
        public ushort? FetchCount { get; set; }
    }
    public class ExchangeConsumeQueueOptions : ConsumeQueueOptions
    {
        /// <summary>
        /// 路由值
        /// </summary>
        public string[] RoutingKeys { get; set; }
        /// <summary>
        /// 参数
        /// </summary>
        public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
    }
    public class ExchangeQueueOptions : QueueOptions
    {
        /// <summary>
        /// 交换机类型
        /// </summary>
        public string Type { get; set; }
        /// <summary>
        /// 队列及路由值
        /// </summary>
        public (string, string)[] QueueAndRoutingKey { get; set; }
        /// <summary>
        /// 参数
        /// </summary>
        public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
    }
}
  
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQDemo
{
    public static class RabbitMQExchangeType
    {
        /// <summary>
        /// 普通模式
        /// </summary>
        public const string Common = "";
        /// <summary>
        /// 路由模式
        /// </summary>
        public const string Direct = "direct";
        /// <summary>
        /// 发布/订阅模式
        /// </summary>
        public const string Fanout = "fanout";
        /// <summary>
        /// 匹配订阅模式
        /// </summary>
        public const string Topic = "topic";
    }
}
  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQDemo
{
    public abstract class RabbitBase : IDisposable
    {
        List<AmqpTcpEndpoint> amqpList;
        IConnection connection;

        protected RabbitBase(params string[] hosts)
        {
            if (hosts == null || hosts.Length == 0)
            {
                throw new ArgumentException("invalid hosts!", nameof(hosts));
            }

            this.amqpList = new List<AmqpTcpEndpoint>();
            this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port)));
        }
        protected RabbitBase(params (string, int)[] hostAndPorts)
        {
            if (hostAndPorts == null || hostAndPorts.Length == 0)
            {
                throw new ArgumentException("invalid hosts!", nameof(hostAndPorts));
            }

            this.amqpList = new List<AmqpTcpEndpoint>();
            this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2)));
        }

        /// <summary>
        /// 端口
        /// </summary>
        public int Port { get; set; } = 5672;
        /// <summary>
        /// 账号
        /// </summary>
        public string UserName { get; set; } = ConnectionFactory.DefaultUser;
        /// <summary>
        /// 密码
        /// </summary>
        public string Password { get; set; } = ConnectionFactory.DefaultPass;
        /// <summary>
        /// 虚拟机
        /// </summary>
        public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost;

        /// <summary>
        /// 释放
        /// </summary>
        public virtual void Dispose()
        {
            //connection?.Close();
            //connection?.Dispose();
        }
        /// <summary>
        /// 关闭连接
        /// </summary>
        public void Close()
        {
            connection?.Close();
            connection?.Dispose();
        }

        #region Private
        /// <summary>
        /// 获取rabbitmq的连接
        /// </summary>
        /// <returns></returns>
        protected IModel GetChannel()
        {
            if (connection == null)
            {
                lock (this)
                {
                    if (connection == null)
                    {
                        var factory = new ConnectionFactory();
                        factory.Port = Port;
                        factory.UserName = UserName;
                        factory.VirtualHost = VirtualHost;
                        factory.Password = Password;
                        connection = factory.CreateConnection(this.amqpList);
                    }
                }
            }
            return connection.CreateModel();
        }

        #endregion
    }
}
  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQDemo
{
    public class RabbitMQProducer : RabbitBase
    {
        public RabbitMQProducer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQProducer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        #region 普通模式、Work模式
        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="message"></param>
        /// <param name="options"></param>
        public void Publish(string queue, string message, QueueOptions options = null)
        {
            options = options ?? new QueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var buffer = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", queue, null, buffer);
            channel.Close();
        }
        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="message"></param>
        /// <param name="configure"></param>
        public void Publish(string queue, string message, Action<QueueOptions> configure)
        {
            QueueOptions options = new QueueOptions();
            configure?.Invoke(options);
            Publish(queue, message, options);
        }
        #endregion
        #region 订阅模式、路由模式、Topic模式
        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        /// <param name="options"></param>
        public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null)
        {
            options = options ?? new ExchangeQueueOptions();
            var channel = GetChannel();
            channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.QueueAndRoutingKey != null)
            {
                foreach (var t in options.QueueAndRoutingKey)
                {
                    if (!string.IsNullOrEmpty(t.Item1))
                    {
                        channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>());
                    }
                }
            }
            var buffer = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange, routingKey, null, buffer);
            channel.Close();
        }
        /// <summary>
        /// 发布消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        /// <param name="configure"></param>
        public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure)
        {
            ExchangeQueueOptions options = new ExchangeQueueOptions();
            configure?.Invoke(options);
            Publish(exchange, routingKey, message, options);
        }
        #endregion
    }
}
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQDemo
{
    public class RabbitMQConsumer : RabbitBase
    {
        public RabbitMQConsumer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        public event Action<RecieveResult> Received;

        /// <summary>
        /// 构造消费者
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                try
                {
                    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                    if (!options.AutoAck)
                    {
                        cancellationTokenSource.Token.Register(() =>
                        {
                            channel.BasicAck(e.DeliveryTag, false);
                        });
                    }
                    Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
                }
                catch { }
            };
            if (options.FetchCount != null)
            {
                channel.BasicQos(0, options.FetchCount.Value, false);
            }
            return consumer;
        }

        #region 普通模式、Work模式
        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
        {
            options = options ?? new ConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
        {
            ConsumeQueueOptions options = new ConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(queue, options);
        }
        #endregion
        #region 订阅模式、路由模式、Topic模式
        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
        {
            options = options ?? new ExchangeConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
            {
                foreach (var key in options.RoutingKeys)
                {
                    channel.QueueBind(queue, exchange, key, options.BindArguments);
                }
            }
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消费消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
        {
            ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(exchange, queue, options);
        }
        #endregion
    }
    public class RecieveResult
    {
        CancellationTokenSource cancellationTokenSource;
        public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
        {
            this.Body = Encoding.UTF8.GetString(arg.Body);
            this.ConsumerTag = arg.ConsumerTag;
            this.DeliveryTag = arg.DeliveryTag;
            this.Exchange = arg.Exchange;
            this.Redelivered = arg.Redelivered;
            this.RoutingKey = arg.RoutingKey;
            this.cancellationTokenSource = cancellationTokenSource;
        }

        /// <summary>
        /// 消息体
        /// </summary>
        public string Body { get; private set; }
        /// <summary>
        /// 消费者标签
        /// </summary>
        public string ConsumerTag { get; private set; }
        /// <summary>
        /// Ack标签
        /// </summary>
        public ulong DeliveryTag { get; private set; }
        /// <summary>
        /// 交换机
        /// </summary>
        public string Exchange { get; private set; }
        /// <summary>
        /// 是否Ack
        /// </summary>
        public bool Redelivered { get; private set; }
        /// <summary>
        /// 路由
        /// </summary>
        public string RoutingKey { get; private set; }

        public void Commit()
        {
            if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;

            cancellationTokenSource.Cancel();
            cancellationTokenSource.Dispose();
            cancellationTokenSource = null;
        }
    }
    public class ListenResult
    {
        CancellationTokenSource cancellationTokenSource;

        /// <summary>
        /// CancellationToken
        /// </summary>
        public CancellationToken Token { get { return cancellationTokenSource.Token; } }
        /// <summary>
        /// 是否已停止
        /// </summary>
        public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }

        public ListenResult()
        {
            cancellationTokenSource = new CancellationTokenSource();
        }

        /// <summary>
        /// 停止监听
        /// </summary>
        public void Stop()
        {
            cancellationTokenSource.Cancel();
        }
    }
}

  修改Startup,在ConfigureServices中将rabbitmq的生产类RabbitMQProducer加入到DI容器中:   

  //将rabbitmq的生产类加入到DI容器中
  var producer = new RabbitMQProducer("192.168.187.129");
  producer.Password = "123456";
  producer.UserName = "admin";
  services.AddSingleton(producer);//这里我没有使用集群

   至于消息的消费,我们可以将消费者使用一个线程启动并监听,但是这里我个人推荐使用.net core 自带的HostedService去实现,至于消费者的功能,我们就简单的将消息记录都文本文档中,类似日志记录。

  我们创建一个RabbitHostedService类:   

     RabbitHostedService

  同时,我们需要在Startup中将RabbitHostedService注入到容器中:  

  //注入消费者
  services.AddSingleton<IHostedService, RabbitHostedService>();

  得到Startup的代码如下:  

     Startup

  到这里,.net core 集成rabbitmq就写好了,然后就是发送消息使用了,我们添加一个名为RabbitController的控制器,里面代码如下:   

     RabbitController

   RabbitController里面有两个Action,它们返回同一个视图:    

     Index

  现在可以启动项目,输入http://localhost:5000/Rabbit就可以进入页面测试了:

  

    点击上面确定之后,你会发现在项目根目录下生成了一个logs目录,里面有一个文件,文件里面就是我们发送的消息了

    注:如果报错,可以登录rabbitmq后台查看账号虚拟机权限是否存在

  

 

   Rabbitmq日志记录

  上面是我们使用.net core集成rabbitmq的一种简单方式,但是不建议在开发时这么使用

  可以注意到,上面的例子我直接将RabbitMQProducer注入到容器中,但开发时应该按自己的需求对RabbitMQProducer做一层封装,然后将封装类注入到容器中,比如我们要使用rabbitmq做日志记录,可以记录到数据库,也可以记录到文件中去,但是.net core为我们提供了一整套的日志记录功能,因此我们只需要将rabbitmq集成进去就可以了

  首先,我们需要创建几个类,将rabbitmq继承到日志记录功能中去:  

     RabbitMQOptions
     RabbitLoggerProvider
     RabbitLogger

  接着,我们修改Startup的服务对象:  

     Startup

  顺带提一下,这个Startup中服务最好使用拓展方法去实现,这里我是为了简单没有采用拓展方法,所以在开发时多注意一下吧

  另外,我们也可以调整一下RabbitHostedService:  

     RabbitHostedService

  到这里,rabbitmq就被继承到.net core的日志记录功能中去了,但是这里面为了避免记录不要要的日志,我在其中添加了一个限制,rabbitmq只记录categoryName包含Rabbit的日志,这样一来,我们就可以在我们的控制器中使用:  

     RabbitController

  现在可以启动项目,输入http://localhost:5000/Rabbit就可以进入页面测试了,可以发现功能和上面是一样的

  细心的朋友可能会发现,本博文内使用的rabbitmq其实是它6中模式中最简单的一种模式:hello world模式,读者可以将上面的代码稍作修改,即可变成其他几种模式,但在现实开发中,具体使用哪种模式需要根据自己的业务需求定。 

  其实,有一个很好用的第三方封装好的插件,可以让我们很方便的操作rabbitmq,那就是EasyNetQ,可以使用一个消息总栈IBus来进行消息的操作,包含一系列消息模式:Publish/Subscribe, Request/Response和 Send/Receive,这些消息模式也是我们最常用的,所以以后有空再写


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM