網址:https://www.cnblogs.com/shanfeng1000/p/12274400.html
首先,如果你還沒有安裝好rabbitmq,可以參考我的博客:
Ubuntu16.04下,erlang安裝和rabbitmq安裝步驟
另外,我的另外一篇博客有介紹rabbitmq的基礎用法以及使用C#操作rabbitmq,並且對rabbitmq有一個簡單的封裝,這里使用.net core操作rabbitmq也會使用到這些封裝類,所以感興趣的可以看看:
好了現在開始我們的正文
Rabbitmq一般使用
說明一下,這里我們使用的是.net core 2.1
我們先創建一個RabbitMQDemo項目(我創建的是MVC項目),然后使用nuget安裝RabbitMQ.Client:

將上面封裝的rabbitmq操作類添加到項目中:
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQDemo
{
public class QueueOptions
{
/// <summary>
/// 是否持久化
/// </summary>
public bool Durable { get; set; } = true;
/// <summary>
/// 是否自動刪除
/// </summary>
public bool AutoDelete { get; set; } = false;
/// <summary>
/// 參數
/// </summary>
public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
}
public class ConsumeQueueOptions : QueueOptions
{
/// <summary>
/// 是否自動提交
/// </summary>
public bool AutoAck { get; set; } = false;
/// <summary>
/// 每次發送消息條數
/// </summary>
public ushort? FetchCount { get; set; }
}
public class ExchangeConsumeQueueOptions : ConsumeQueueOptions
{
/// <summary>
/// 路由值
/// </summary>
public string[] RoutingKeys { get; set; }
/// <summary>
/// 參數
/// </summary>
public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
}
public class ExchangeQueueOptions : QueueOptions
{
/// <summary>
/// 交換機類型
/// </summary>
public string Type { get; set; }
/// <summary>
/// 隊列及路由值
/// </summary>
public (string, string)[] QueueAndRoutingKey { get; set; }
/// <summary>
/// 參數
/// </summary>
public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace RabbitMQDemo
{
public static class RabbitMQExchangeType
{
/// <summary>
/// 普通模式
/// </summary>
public const string Common = "";
/// <summary>
/// 路由模式
/// </summary>
public const string Direct = "direct";
/// <summary>
/// 發布/訂閱模式
/// </summary>
public const string Fanout = "fanout";
/// <summary>
/// 匹配訂閱模式
/// </summary>
public const string Topic = "topic";
}
}
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace RabbitMQDemo
{
public abstract class RabbitBase : IDisposable
{
List<AmqpTcpEndpoint> amqpList;
IConnection connection;
protected RabbitBase(params string[] hosts)
{
if (hosts == null || hosts.Length == 0)
{
throw new ArgumentException("invalid hosts!", nameof(hosts));
}
this.amqpList = new List<AmqpTcpEndpoint>();
this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port)));
}
protected RabbitBase(params (string, int)[] hostAndPorts)
{
if (hostAndPorts == null || hostAndPorts.Length == 0)
{
throw new ArgumentException("invalid hosts!", nameof(hostAndPorts));
}
this.amqpList = new List<AmqpTcpEndpoint>();
this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2)));
}
/// <summary>
/// 端口
/// </summary>
public int Port { get; set; } = 5672;
/// <summary>
/// 賬號
/// </summary>
public string UserName { get; set; } = ConnectionFactory.DefaultUser;
/// <summary>
/// 密碼
/// </summary>
public string Password { get; set; } = ConnectionFactory.DefaultPass;
/// <summary>
/// 虛擬機
/// </summary>
public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost;
/// <summary>
/// 釋放
/// </summary>
public virtual void Dispose()
{
//connection?.Close();
//connection?.Dispose();
}
/// <summary>
/// 關閉連接
/// </summary>
public void Close()
{
connection?.Close();
connection?.Dispose();
}
#region Private
/// <summary>
/// 獲取rabbitmq的連接
/// </summary>
/// <returns></returns>
protected IModel GetChannel()
{
if (connection == null)
{
lock (this)
{
if (connection == null)
{
var factory = new ConnectionFactory();
factory.Port = Port;
factory.UserName = UserName;
factory.VirtualHost = VirtualHost;
factory.Password = Password;
connection = factory.CreateConnection(this.amqpList);
}
}
}
return connection.CreateModel();
}
#endregion
}
}
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace RabbitMQDemo
{
public class RabbitMQProducer : RabbitBase
{
public RabbitMQProducer(params string[] hosts) : base(hosts)
{
}
public RabbitMQProducer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
{
}
#region 普通模式、Work模式
/// <summary>
/// 發布消息
/// </summary>
/// <param name="queue"></param>
/// <param name="message"></param>
/// <param name="options"></param>
public void Publish(string queue, string message, QueueOptions options = null)
{
options = options ?? new QueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var buffer = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queue, null, buffer);
channel.Close();
}
/// <summary>
/// 發布消息
/// </summary>
/// <param name="queue"></param>
/// <param name="message"></param>
/// <param name="configure"></param>
public void Publish(string queue, string message, Action<QueueOptions> configure)
{
QueueOptions options = new QueueOptions();
configure?.Invoke(options);
Publish(queue, message, options);
}
#endregion
#region 訂閱模式、路由模式、Topic模式
/// <summary>
/// 發布消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="routingKey"></param>
/// <param name="message"></param>
/// <param name="options"></param>
public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null)
{
options = options ?? new ExchangeQueueOptions();
var channel = GetChannel();
channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.QueueAndRoutingKey != null)
{
foreach (var t in options.QueueAndRoutingKey)
{
if (!string.IsNullOrEmpty(t.Item1))
{
channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>());
}
}
}
var buffer = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange, routingKey, null, buffer);
channel.Close();
}
/// <summary>
/// 發布消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="routingKey"></param>
/// <param name="message"></param>
/// <param name="configure"></param>
public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure)
{
ExchangeQueueOptions options = new ExchangeQueueOptions();
configure?.Invoke(options);
Publish(exchange, routingKey, message, options);
}
#endregion
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMQDemo
{
public class RabbitMQConsumer : RabbitBase
{
public RabbitMQConsumer(params string[] hosts) : base(hosts)
{
}
public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
{
}
public event Action<RecieveResult> Received;
/// <summary>
/// 構造消費者
/// </summary>
/// <param name="channel"></param>
/// <param name="options"></param>
/// <returns></returns>
private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
try
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
if (!options.AutoAck)
{
cancellationTokenSource.Token.Register(() =>
{
channel.BasicAck(e.DeliveryTag, false);
});
}
Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
}
catch { }
};
if (options.FetchCount != null)
{
channel.BasicQos(0, options.FetchCount.Value, false);
}
return consumer;
}
#region 普通模式、Work模式
/// <summary>
/// 消費消息
/// </summary>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
{
options = options ?? new ConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消費消息
/// </summary>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
{
ConsumeQueueOptions options = new ConsumeQueueOptions();
configure?.Invoke(options);
return Listen(queue, options);
}
#endregion
#region 訂閱模式、路由模式、Topic模式
/// <summary>
/// 消費消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="options"></param>
public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
{
options = options ?? new ExchangeConsumeQueueOptions();
var channel = GetChannel();
channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
{
foreach (var key in options.RoutingKeys)
{
channel.QueueBind(queue, exchange, key, options.BindArguments);
}
}
var consumer = ConsumeInternal(channel, options);
channel.BasicConsume(queue, options.AutoAck, consumer);
ListenResult result = new ListenResult();
result.Token.Register(() =>
{
try
{
channel.Close();
channel.Dispose();
}
catch { }
});
return result;
}
/// <summary>
/// 消費消息
/// </summary>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="configure"></param>
public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
{
ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
configure?.Invoke(options);
return Listen(exchange, queue, options);
}
#endregion
}
public class RecieveResult
{
CancellationTokenSource cancellationTokenSource;
public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
{
this.Body = Encoding.UTF8.GetString(arg.Body);
this.ConsumerTag = arg.ConsumerTag;
this.DeliveryTag = arg.DeliveryTag;
this.Exchange = arg.Exchange;
this.Redelivered = arg.Redelivered;
this.RoutingKey = arg.RoutingKey;
this.cancellationTokenSource = cancellationTokenSource;
}
/// <summary>
/// 消息體
/// </summary>
public string Body { get; private set; }
/// <summary>
/// 消費者標簽
/// </summary>
public string ConsumerTag { get; private set; }
/// <summary>
/// Ack標簽
/// </summary>
public ulong DeliveryTag { get; private set; }
/// <summary>
/// 交換機
/// </summary>
public string Exchange { get; private set; }
/// <summary>
/// 是否Ack
/// </summary>
public bool Redelivered { get; private set; }
/// <summary>
/// 路由
/// </summary>
public string RoutingKey { get; private set; }
public void Commit()
{
if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
cancellationTokenSource = null;
}
}
public class ListenResult
{
CancellationTokenSource cancellationTokenSource;
/// <summary>
/// CancellationToken
/// </summary>
public CancellationToken Token { get { return cancellationTokenSource.Token; } }
/// <summary>
/// 是否已停止
/// </summary>
public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }
public ListenResult()
{
cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>
/// 停止監聽
/// </summary>
public void Stop()
{
cancellationTokenSource.Cancel();
}
}
}
修改Startup,在ConfigureServices中將rabbitmq的生產類RabbitMQProducer加入到DI容器中:
//將rabbitmq的生產類加入到DI容器中
var producer = new RabbitMQProducer("192.168.187.129");
producer.Password = "123456";
producer.UserName = "admin";
services.AddSingleton(producer);//這里我沒有使用集群
至於消息的消費,我們可以將消費者使用一個線程啟動並監聽,但是這里我個人推薦使用.net core 自帶的HostedService去實現,至於消費者的功能,我們就簡單的將消息記錄都文本文檔中,類似日志記錄。
我們創建一個RabbitHostedService類:
RabbitHostedService
同時,我們需要在Startup中將RabbitHostedService注入到容器中:
//注入消費者 services.AddSingleton<IHostedService, RabbitHostedService>();
得到Startup的代碼如下:
Startup
到這里,.net core 集成rabbitmq就寫好了,然后就是發送消息使用了,我們添加一個名為RabbitController的控制器,里面代碼如下:
RabbitController
RabbitController里面有兩個Action,它們返回同一個視圖:
Index
現在可以啟動項目,輸入http://localhost:5000/Rabbit就可以進入頁面測試了:

點擊上面確定之后,你會發現在項目根目錄下生成了一個logs目錄,里面有一個文件,文件里面就是我們發送的消息了
注:如果報錯,可以登錄rabbitmq后台查看賬號虛擬機權限是否存在

Rabbitmq日志記錄
上面是我們使用.net core集成rabbitmq的一種簡單方式,但是不建議在開發時這么使用
可以注意到,上面的例子我直接將RabbitMQProducer注入到容器中,但開發時應該按自己的需求對RabbitMQProducer做一層封裝,然后將封裝類注入到容器中,比如我們要使用rabbitmq做日志記錄,可以記錄到數據庫,也可以記錄到文件中去,但是.net core為我們提供了一整套的日志記錄功能,因此我們只需要將rabbitmq集成進去就可以了
首先,我們需要創建幾個類,將rabbitmq繼承到日志記錄功能中去:
RabbitMQOptions
RabbitLoggerProvider
RabbitLogger
接着,我們修改Startup的服務對象:
Startup
順帶提一下,這個Startup中服務最好使用拓展方法去實現,這里我是為了簡單沒有采用拓展方法,所以在開發時多注意一下吧
另外,我們也可以調整一下RabbitHostedService:
RabbitHostedService
到這里,rabbitmq就被繼承到.net core的日志記錄功能中去了,但是這里面為了避免記錄不要要的日志,我在其中添加了一個限制,rabbitmq只記錄categoryName包含Rabbit的日志,這樣一來,我們就可以在我們的控制器中使用:
RabbitController
現在可以啟動項目,輸入http://localhost:5000/Rabbit就可以進入頁面測試了,可以發現功能和上面是一樣的
細心的朋友可能會發現,本博文內使用的rabbitmq其實是它6中模式中最簡單的一種模式:hello world模式,讀者可以將上面的代碼稍作修改,即可變成其他幾種模式,但在現實開發中,具體使用哪種模式需要根據自己的業務需求定。
其實,有一個很好用的第三方封裝好的插件,可以讓我們很方便的操作rabbitmq,那就是EasyNetQ,可以使用一個消息總棧IBus來進行消息的操作,包含一系列消息模式:Publish/Subscribe, Request/Response和 Send/Receive,這些消息模式也是我們最常用的,所以以后有空再寫
