网址:https://www.cnblogs.com/shanfeng1000/p/12274400.html
首先,如果你还没有安装好rabbitmq,可以参考我的博客:
Ubuntu16.04下,erlang安装和rabbitmq安装步骤
另外,我的另外一篇博客有介绍rabbitmq的基础用法以及使用C#操作rabbitmq,并且对rabbitmq有一个简单的封装,这里使用.net core操作rabbitmq也会使用到这些封装类,所以感兴趣的可以看看:
好了现在开始我们的正文
Rabbitmq一般使用
说明一下,这里我们使用的是.net core 2.1
我们先创建一个RabbitMQDemo项目(我创建的是MVC项目),然后使用nuget安装RabbitMQ.Client:
将上面封装的rabbitmq操作类添加到项目中:

using System; using System.Collections.Generic; using System.Text; namespace RabbitMQDemo { public class QueueOptions { /// <summary> /// 是否持久化 /// </summary> public bool Durable { get; set; } = true; /// <summary> /// 是否自动删除 /// </summary> public bool AutoDelete { get; set; } = false; /// <summary> /// 参数 /// </summary> public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>(); } public class ConsumeQueueOptions : QueueOptions { /// <summary> /// 是否自动提交 /// </summary> public bool AutoAck { get; set; } = false; /// <summary> /// 每次发送消息条数 /// </summary> public ushort? FetchCount { get; set; } } public class ExchangeConsumeQueueOptions : ConsumeQueueOptions { /// <summary> /// 路由值 /// </summary> public string[] RoutingKeys { get; set; } /// <summary> /// 参数 /// </summary> public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>(); } public class ExchangeQueueOptions : QueueOptions { /// <summary> /// 交换机类型 /// </summary> public string Type { get; set; } /// <summary> /// 队列及路由值 /// </summary> public (string, string)[] QueueAndRoutingKey { get; set; } /// <summary> /// 参数 /// </summary> public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>(); } }

using System; using System.Collections.Generic; using System.Text; namespace RabbitMQDemo { public static class RabbitMQExchangeType { /// <summary> /// 普通模式 /// </summary> public const string Common = ""; /// <summary> /// 路由模式 /// </summary> public const string Direct = "direct"; /// <summary> /// 发布/订阅模式 /// </summary> public const string Fanout = "fanout"; /// <summary> /// 匹配订阅模式 /// </summary> public const string Topic = "topic"; } }

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace RabbitMQDemo { public abstract class RabbitBase : IDisposable { List<AmqpTcpEndpoint> amqpList; IConnection connection; protected RabbitBase(params string[] hosts) { if (hosts == null || hosts.Length == 0) { throw new ArgumentException("invalid hosts!", nameof(hosts)); } this.amqpList = new List<AmqpTcpEndpoint>(); this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port))); } protected RabbitBase(params (string, int)[] hostAndPorts) { if (hostAndPorts == null || hostAndPorts.Length == 0) { throw new ArgumentException("invalid hosts!", nameof(hostAndPorts)); } this.amqpList = new List<AmqpTcpEndpoint>(); this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2))); } /// <summary> /// 端口 /// </summary> public int Port { get; set; } = 5672; /// <summary> /// 账号 /// </summary> public string UserName { get; set; } = ConnectionFactory.DefaultUser; /// <summary> /// 密码 /// </summary> public string Password { get; set; } = ConnectionFactory.DefaultPass; /// <summary> /// 虚拟机 /// </summary> public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost; /// <summary> /// 释放 /// </summary> public virtual void Dispose() { //connection?.Close(); //connection?.Dispose(); } /// <summary> /// 关闭连接 /// </summary> public void Close() { connection?.Close(); connection?.Dispose(); } #region Private /// <summary> /// 获取rabbitmq的连接 /// </summary> /// <returns></returns> protected IModel GetChannel() { if (connection == null) { lock (this) { if (connection == null) { var factory = new ConnectionFactory(); factory.Port = Port; factory.UserName = UserName; factory.VirtualHost = VirtualHost; factory.Password = Password; connection = factory.CreateConnection(this.amqpList); } } } return connection.CreateModel(); } #endregion } }

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace RabbitMQDemo { public class RabbitMQProducer : RabbitBase { public RabbitMQProducer(params string[] hosts) : base(hosts) { } public RabbitMQProducer(params (string, int)[] hostAndPorts) : base(hostAndPorts) { } #region 普通模式、Work模式 /// <summary> /// 发布消息 /// </summary> /// <param name="queue"></param> /// <param name="message"></param> /// <param name="options"></param> public void Publish(string queue, string message, QueueOptions options = null) { options = options ?? new QueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, buffer); channel.Close(); } /// <summary> /// 发布消息 /// </summary> /// <param name="queue"></param> /// <param name="message"></param> /// <param name="configure"></param> public void Publish(string queue, string message, Action<QueueOptions> configure) { QueueOptions options = new QueueOptions(); configure?.Invoke(options); Publish(queue, message, options); } #endregion #region 订阅模式、路由模式、Topic模式 /// <summary> /// 发布消息 /// </summary> /// <param name="exchange"></param> /// <param name="routingKey"></param> /// <param name="message"></param> /// <param name="options"></param> public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null) { options = options ?? new ExchangeQueueOptions(); var channel = GetChannel(); channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.QueueAndRoutingKey != null) { foreach (var t in options.QueueAndRoutingKey) { if (!string.IsNullOrEmpty(t.Item1)) { channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>()); } } } var buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange, routingKey, null, buffer); channel.Close(); } /// <summary> /// 发布消息 /// </summary> /// <param name="exchange"></param> /// <param name="routingKey"></param> /// <param name="message"></param> /// <param name="configure"></param> public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure) { ExchangeQueueOptions options = new ExchangeQueueOptions(); configure?.Invoke(options); Publish(exchange, routingKey, message, options); } #endregion } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQDemo { public class RabbitMQConsumer : RabbitBase { public RabbitMQConsumer(params string[] hosts) : base(hosts) { } public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts) { } public event Action<RecieveResult> Received; /// <summary> /// 构造消费者 /// </summary> /// <param name="channel"></param> /// <param name="options"></param> /// <returns></returns> private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options) { EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { try { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); if (!options.AutoAck) { cancellationTokenSource.Token.Register(() => { channel.BasicAck(e.DeliveryTag, false); }); } Received?.Invoke(new RecieveResult(e, cancellationTokenSource)); } catch { } }; if (options.FetchCount != null) { channel.BasicQos(0, options.FetchCount.Value, false); } return consumer; } #region 普通模式、Work模式 /// <summary> /// 消费消息 /// </summary> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string queue, ConsumeQueueOptions options = null) { options = options ?? new ConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消费消息 /// </summary> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure) { ConsumeQueueOptions options = new ConsumeQueueOptions(); configure?.Invoke(options); return Listen(queue, options); } #endregion #region 订阅模式、路由模式、Topic模式 /// <summary> /// 消费消息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null) { options = options ?? new ExchangeConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange)) { foreach (var key in options.RoutingKeys) { channel.QueueBind(queue, exchange, key, options.BindArguments); } } var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消费消息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure) { ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions(); configure?.Invoke(options); return Listen(exchange, queue, options); } #endregion } public class RecieveResult { CancellationTokenSource cancellationTokenSource; public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource) { this.Body = Encoding.UTF8.GetString(arg.Body); this.ConsumerTag = arg.ConsumerTag; this.DeliveryTag = arg.DeliveryTag; this.Exchange = arg.Exchange; this.Redelivered = arg.Redelivered; this.RoutingKey = arg.RoutingKey; this.cancellationTokenSource = cancellationTokenSource; } /// <summary> /// 消息体 /// </summary> public string Body { get; private set; } /// <summary> /// 消费者标签 /// </summary> public string ConsumerTag { get; private set; } /// <summary> /// Ack标签 /// </summary> public ulong DeliveryTag { get; private set; } /// <summary> /// 交换机 /// </summary> public string Exchange { get; private set; } /// <summary> /// 是否Ack /// </summary> public bool Redelivered { get; private set; } /// <summary> /// 路由 /// </summary> public string RoutingKey { get; private set; } public void Commit() { if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return; cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); cancellationTokenSource = null; } } public class ListenResult { CancellationTokenSource cancellationTokenSource; /// <summary> /// CancellationToken /// </summary> public CancellationToken Token { get { return cancellationTokenSource.Token; } } /// <summary> /// 是否已停止 /// </summary> public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } } public ListenResult() { cancellationTokenSource = new CancellationTokenSource(); } /// <summary> /// 停止监听 /// </summary> public void Stop() { cancellationTokenSource.Cancel(); } } }
修改Startup,在ConfigureServices中将rabbitmq的生产类RabbitMQProducer加入到DI容器中:
//将rabbitmq的生产类加入到DI容器中 var producer = new RabbitMQProducer("192.168.187.129"); producer.Password = "123456"; producer.UserName = "admin"; services.AddSingleton(producer);//这里我没有使用集群
至于消息的消费,我们可以将消费者使用一个线程启动并监听,但是这里我个人推荐使用.net core 自带的HostedService去实现,至于消费者的功能,我们就简单的将消息记录都文本文档中,类似日志记录。
我们创建一个RabbitHostedService类:

同时,我们需要在Startup中将RabbitHostedService注入到容器中:
//注入消费者 services.AddSingleton<IHostedService, RabbitHostedService>();
得到Startup的代码如下:

到这里,.net core 集成rabbitmq就写好了,然后就是发送消息使用了,我们添加一个名为RabbitController的控制器,里面代码如下:

RabbitController里面有两个Action,它们返回同一个视图:

现在可以启动项目,输入http://localhost:5000/Rabbit就可以进入页面测试了:
点击上面确定之后,你会发现在项目根目录下生成了一个logs目录,里面有一个文件,文件里面就是我们发送的消息了
注:如果报错,可以登录rabbitmq后台查看账号虚拟机权限是否存在
Rabbitmq日志记录
上面是我们使用.net core集成rabbitmq的一种简单方式,但是不建议在开发时这么使用
可以注意到,上面的例子我直接将RabbitMQProducer注入到容器中,但开发时应该按自己的需求对RabbitMQProducer做一层封装,然后将封装类注入到容器中,比如我们要使用rabbitmq做日志记录,可以记录到数据库,也可以记录到文件中去,但是.net core为我们提供了一整套的日志记录功能,因此我们只需要将rabbitmq集成进去就可以了
首先,我们需要创建几个类,将rabbitmq继承到日志记录功能中去:



接着,我们修改Startup的服务对象:

顺带提一下,这个Startup中服务最好使用拓展方法去实现,这里我是为了简单没有采用拓展方法,所以在开发时多注意一下吧
另外,我们也可以调整一下RabbitHostedService:

到这里,rabbitmq就被继承到.net core的日志记录功能中去了,但是这里面为了避免记录不要要的日志,我在其中添加了一个限制,rabbitmq只记录categoryName包含Rabbit的日志,这样一来,我们就可以在我们的控制器中使用:

现在可以启动项目,输入http://localhost:5000/Rabbit就可以进入页面测试了,可以发现功能和上面是一样的
细心的朋友可能会发现,本博文内使用的rabbitmq其实是它6中模式中最简单的一种模式:hello world模式,读者可以将上面的代码稍作修改,即可变成其他几种模式,但在现实开发中,具体使用哪种模式需要根据自己的业务需求定。
其实,有一个很好用的第三方封装好的插件,可以让我们很方便的操作rabbitmq,那就是EasyNetQ,可以使用一个消息总栈IBus来进行消息的操作,包含一系列消息模式:Publish/Subscribe, Request/Response和 Send/Receive,这些消息模式也是我们最常用的,所以以后有空再写