看博文的朋友,本文有些過時了,還有些BUG,如果想了解更多用法,看看這篇吧:.net core使用rabbitmq消息隊列 (二)
首先,如果你還沒有安裝好rabbitmq,可以參考我的博客:
Ubuntu16.04下,erlang安裝和rabbitmq安裝步驟
另外,我的另外一篇博客有介紹rabbitmq的基礎用法以及使用C#操作rabbitmq,並且對rabbitmq有一個簡單的封裝,這里使用.net core操作rabbitmq也會使用到這些封裝類,所以感興趣的可以看看:
好了現在開始我們的正文
Rabbitmq一般使用
說明一下,這里我們使用的是.net core 2.1
我們先創建一個RabbitMQDemo項目(我創建的是MVC項目),然后使用nuget安裝RabbitMQ.Client:
將上面封裝的rabbitmq操作類添加到項目中:

using System; using System.Collections.Generic; using System.Text; namespace RabbitMQDemo { public class QueueOptions { /// <summary> /// 是否持久化 /// </summary> public bool Durable { get; set; } = true; /// <summary> /// 是否自動刪除 /// </summary> public bool AutoDelete { get; set; } = false; /// <summary> /// 參數 /// </summary> public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>(); } public class ConsumeQueueOptions : QueueOptions { /// <summary> /// 是否自動提交 /// </summary> public bool AutoAck { get; set; } = false; /// <summary> /// 每次發送消息條數 /// </summary> public ushort? FetchCount { get; set; } } public class ExchangeConsumeQueueOptions : ConsumeQueueOptions { /// <summary> /// 路由值 /// </summary> public string[] RoutingKeys { get; set; } /// <summary> /// 參數 /// </summary> public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>(); } public class ExchangeQueueOptions : QueueOptions { /// <summary> /// 交換機類型 /// </summary> public string Type { get; set; } /// <summary> /// 隊列及路由值 /// </summary> public (string, string)[] QueueAndRoutingKey { get; set; } /// <summary> /// 參數 /// </summary> public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>(); } }

using System; using System.Collections.Generic; using System.Text; namespace RabbitMQDemo { public static class RabbitMQExchangeType { /// <summary> /// 普通模式 /// </summary> public const string Common = ""; /// <summary> /// 路由模式 /// </summary> public const string Direct = "direct"; /// <summary> /// 發布/訂閱模式 /// </summary> public const string Fanout = "fanout"; /// <summary> /// 匹配訂閱模式 /// </summary> public const string Topic = "topic"; } }

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace RabbitMQDemo { public abstract class RabbitBase : IDisposable { List<AmqpTcpEndpoint> amqpList; IConnection connection; protected RabbitBase(params string[] hosts) { if (hosts == null || hosts.Length == 0) { throw new ArgumentException("invalid hosts!", nameof(hosts)); } this.amqpList = new List<AmqpTcpEndpoint>(); this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port))); } protected RabbitBase(params (string, int)[] hostAndPorts) { if (hostAndPorts == null || hostAndPorts.Length == 0) { throw new ArgumentException("invalid hosts!", nameof(hostAndPorts)); } this.amqpList = new List<AmqpTcpEndpoint>(); this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2))); } /// <summary> /// 端口 /// </summary> public int Port { get; set; } = 5672; /// <summary> /// 賬號 /// </summary> public string UserName { get; set; } = ConnectionFactory.DefaultUser; /// <summary> /// 密碼 /// </summary> public string Password { get; set; } = ConnectionFactory.DefaultPass; /// <summary> /// 虛擬機 /// </summary> public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost; /// <summary> /// 釋放 /// </summary> public virtual void Dispose() { //connection?.Close(); //connection?.Dispose(); } /// <summary> /// 關閉連接 /// </summary> public void Close() { connection?.Close(); connection?.Dispose(); } #region Private /// <summary> /// 獲取rabbitmq的連接 /// </summary> /// <returns></returns> protected IModel GetChannel() { if (connection == null) { lock (this) { if (connection == null) { var factory = new ConnectionFactory(); factory.Port = Port; factory.UserName = UserName; factory.VirtualHost = VirtualHost; factory.Password = Password; connection = factory.CreateConnection(this.amqpList); } } } return connection.CreateModel(); } #endregion } }

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace RabbitMQDemo { public class RabbitMQProducer : RabbitBase { public RabbitMQProducer(params string[] hosts) : base(hosts) { } public RabbitMQProducer(params (string, int)[] hostAndPorts) : base(hostAndPorts) { } #region 普通模式、Work模式 /// <summary> /// 發布消息 /// </summary> /// <param name="queue"></param> /// <param name="message"></param> /// <param name="options"></param> public void Publish(string queue, string message, QueueOptions options = null) { options = options ?? new QueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, buffer); channel.Close(); } /// <summary> /// 發布消息 /// </summary> /// <param name="queue"></param> /// <param name="message"></param> /// <param name="configure"></param> public void Publish(string queue, string message, Action<QueueOptions> configure) { QueueOptions options = new QueueOptions(); configure?.Invoke(options); Publish(queue, message, options); } #endregion #region 訂閱模式、路由模式、Topic模式 /// <summary> /// 發布消息 /// </summary> /// <param name="exchange"></param> /// <param name="routingKey"></param> /// <param name="message"></param> /// <param name="options"></param> public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null) { options = options ?? new ExchangeQueueOptions(); var channel = GetChannel(); channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.QueueAndRoutingKey != null) { foreach (var t in options.QueueAndRoutingKey) { if (!string.IsNullOrEmpty(t.Item1)) { channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>()); } } } var buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange, routingKey, null, buffer); channel.Close(); } /// <summary> /// 發布消息 /// </summary> /// <param name="exchange"></param> /// <param name="routingKey"></param> /// <param name="message"></param> /// <param name="configure"></param> public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure) { ExchangeQueueOptions options = new ExchangeQueueOptions(); configure?.Invoke(options); Publish(exchange, routingKey, message, options); } #endregion } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQDemo { public class RabbitMQConsumer : RabbitBase { public RabbitMQConsumer(params string[] hosts) : base(hosts) { } public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts) { } public event Action<RecieveResult> Received; /// <summary> /// 構造消費者 /// </summary> /// <param name="channel"></param> /// <param name="options"></param> /// <returns></returns> private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options) { EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { try { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); if (!options.AutoAck) { cancellationTokenSource.Token.Register(() => { channel.BasicAck(e.DeliveryTag, false); }); } Received?.Invoke(new RecieveResult(e, cancellationTokenSource)); } catch { } }; if (options.FetchCount != null) { channel.BasicQos(0, options.FetchCount.Value, false); } return consumer; } #region 普通模式、Work模式 /// <summary> /// 消費消息 /// </summary> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string queue, ConsumeQueueOptions options = null) { options = options ?? new ConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消費消息 /// </summary> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure) { ConsumeQueueOptions options = new ConsumeQueueOptions(); configure?.Invoke(options); return Listen(queue, options); } #endregion #region 訂閱模式、路由模式、Topic模式 /// <summary> /// 消費消息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null) { options = options ?? new ExchangeConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange)) { foreach (var key in options.RoutingKeys) { channel.QueueBind(queue, exchange, key, options.BindArguments); } } var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消費消息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure) { ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions(); configure?.Invoke(options); return Listen(exchange, queue, options); } #endregion } public class RecieveResult { CancellationTokenSource cancellationTokenSource; public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource) { this.Body = Encoding.UTF8.GetString(arg.Body); this.ConsumerTag = arg.ConsumerTag; this.DeliveryTag = arg.DeliveryTag; this.Exchange = arg.Exchange; this.Redelivered = arg.Redelivered; this.RoutingKey = arg.RoutingKey; this.cancellationTokenSource = cancellationTokenSource; } /// <summary> /// 消息體 /// </summary> public string Body { get; private set; } /// <summary> /// 消費者標簽 /// </summary> public string ConsumerTag { get; private set; } /// <summary> /// Ack標簽 /// </summary> public ulong DeliveryTag { get; private set; } /// <summary> /// 交換機 /// </summary> public string Exchange { get; private set; } /// <summary> /// 是否Ack /// </summary> public bool Redelivered { get; private set; } /// <summary> /// 路由 /// </summary> public string RoutingKey { get; private set; } public void Commit() { if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return; cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); cancellationTokenSource = null; } } public class ListenResult { CancellationTokenSource cancellationTokenSource; /// <summary> /// CancellationToken /// </summary> public CancellationToken Token { get { return cancellationTokenSource.Token; } } /// <summary> /// 是否已停止 /// </summary> public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } } public ListenResult() { cancellationTokenSource = new CancellationTokenSource(); } /// <summary> /// 停止監聽 /// </summary> public void Stop() { cancellationTokenSource.Cancel(); } } }
修改Startup,在ConfigureServices中將rabbitmq的生產類RabbitMQProducer加入到DI容器中:
//將rabbitmq的生產類加入到DI容器中 var producer = new RabbitMQProducer("192.168.187.129"); producer.Password = "123456"; producer.UserName = "admin"; services.AddSingleton(producer);//這里我沒有使用集群
至於消息的消費,我們可以將消費者使用一個線程啟動並監聽,但是這里我個人推薦使用.net core 自帶的HostedService去實現,至於消費者的功能,我們就簡單的將消息記錄都文本文檔中,類似日志記錄。
我們創建一個RabbitHostedService類:

using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.IO; namespace RabbitMQDemo { public class RabbitHostedService : IHostedService { RabbitMQConsumer consumer; public RabbitHostedService() { consumer = new RabbitMQConsumer("192.168.187.129"); consumer.Password = "123456"; consumer.UserName = "admin"; } /// <summary> /// 服務啟動 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task StartAsync(CancellationToken cancellationToken) { await Task.Run(() => { consumer.Received += new Action<RecieveResult>(result => { //文件路徑 string path = Path.Combine(Directory.GetCurrentDirectory(), "logs"); if (!Directory.Exists(path)) { Directory.CreateDirectory(path); } //文件 string fileName = Path.Combine(path, $"{DateTime.Now.ToString("yyyyMMdd")}.log"); string message = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}接收到消息:{result.Body}{Environment.NewLine}"; File.AppendAllText(fileName, message); //提交 result.Commit(); }); consumer.Listen("queue1", options => { options.AutoAck = false; options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; options.AutoDelete = false; options.Durable = true; }); }); } /// <summary> /// 服務停止 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task StopAsync(CancellationToken cancellationToken) { var task = Task.Run(() => { consumer.Close(); }); await Task.WhenAny(task, Task.Delay(-1, cancellationToken)); cancellationToken.ThrowIfCancellationRequested(); } } }
同時,我們需要在Startup中將RabbitHostedService注入到容器中:
//注入消費者 services.AddSingleton<IHostedService, RabbitHostedService>();
得到Startup的代碼如下:

using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.HttpsPolicy; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; namespace RabbitMQDemo { public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.Configure<CookiePolicyOptions>(options => { // This lambda determines whether user consent for non-essential cookies is needed for a given request. options.CheckConsentNeeded = context => true; options.MinimumSameSitePolicy = SameSiteMode.None; }); //將rabbitmq的生產類加入到DI容器中 var producer = new RabbitMQProducer("192.168.187.129"); producer.Password = "123456"; producer.UserName = "admin"; services.AddSingleton(producer);//這里我沒有使用集群 //注入消費者 services.AddSingleton<IHostedService, RabbitHostedService>(); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, Microsoft.Extensions.Hosting.IHostingEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } else { app.UseExceptionHandler("/Home/Error"); app.UseHsts(); } app.UseHttpsRedirection(); app.UseStaticFiles(); app.UseCookiePolicy(); app.UseMvc(routes => { routes.MapRoute( name: "default", template: "{controller=Home}/{action=Index}/{id?}"); }); } } }
到這里,.net core 集成rabbitmq就寫好了,然后就是發送消息使用了,我們添加一個名為RabbitController的控制器,里面代碼如下:

using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using RabbitMQDemo.Models; namespace RabbitMQDemo.Controllers { public class RabbitController : Controller { RabbitMQProducer producer; public RabbitController(RabbitMQProducer producer) { this.producer = producer; } /// <summary> /// 首頁 /// </summary> /// <returns></returns> public IActionResult Index() { return View(); } /// <summary> /// 消息提交 /// </summary> /// <param name="message"></param> /// <returns></returns> public IActionResult Submit(string message) { if (!string.IsNullOrEmpty(message)) { //發送消息 producer.Publish("queue1", message, options => { options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; options.AutoDelete = false; options.Durable = true; }); } return View("Index"); } } }
RabbitController里面有兩個Action,它們返回同一個視圖:

@{ Layout = null; } @using (Html.BeginForm("Submit", "Rabbit")) { @Html.DisplayName("消息:"); <br /> @Html.TextArea("message", new { @class = "multieditbox" }) <br /> <input type="submit" class="buttoncss" value="確定" /> } <style type="text/css"> .buttoncss { font-family: "tahoma", "宋體"; /*www.52css.com*/ font-size: 9pt; color: #003399; border: 1px #003399 solid; color: #006699; border-bottom: #93bee2 1px solid; border-left: #93bee2 1px solid; border-right: #93bee2 1px solid; border-top: #93bee2 1px solid; background-image: url(../images/bluebuttonbg.gif); background-color: #e8f4ff; font-style: normal; width: 60px; height: 22px; } .multieditbox { background: #f8f8f8; border-bottom: #b7b7b7 1px solid; border-left: #b7b7b7 1px solid; border-right: #b7b7b7 1px solid; border-top: #b7b7b7 1px solid; color: #000000; cursor: text; font-family: "arial"; font-size: 9pt; padding: 1px; /*www.52css.com*/ width: 200px; height: 80px; } </style>
現在可以啟動項目,輸入http://localhost:5000/Rabbit就可以進入頁面測試了:
點擊上面確定之后,你會發現在項目根目錄下生成了一個logs目錄,里面有一個文件,文件里面就是我們發送的消息了
注:如果報錯,可以登錄rabbitmq后台查看賬號虛擬機權限是否存在
Rabbitmq日志記錄
上面是我們使用.net core集成rabbitmq的一種簡單方式,但是不建議在開發時這么使用
可以注意到,上面的例子我直接將RabbitMQProducer注入到容器中,但開發時應該按自己的需求對RabbitMQProducer做一層封裝,然后將封裝類注入到容器中,比如我們要使用rabbitmq做日志記錄,可以記錄到數據庫,也可以記錄到文件中去,但是.net core為我們提供了一整套的日志記錄功能,因此我們只需要將rabbitmq集成進去就可以了
首先,我們需要創建幾個類,將rabbitmq繼承到日志記錄功能中去:

using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace RabbitMQDemo { public abstract class RabbitMQOptions { /// <summary> /// 服務節點 /// </summary> public string[] Hosts { get; set; } /// <summary> /// 端口 /// </summary> public int Port { get; set; } /// <summary> /// 賬號 /// </summary> public string UserName { get; set; } /// <summary> /// 密碼 /// </summary> public string Password { get; set; } /// <summary> /// 虛擬機 /// </summary> public string VirtualHost { get; set; } /// <summary> /// 是否持久化 /// </summary> public bool Durable { get; set; } = true; /// <summary> /// 是否自動刪除 /// </summary> public bool AutoDelete { get; set; } = false; /// <summary> /// 隊列 /// </summary> public string Queue { get; set; } /// <summary> /// 交換機 /// </summary> public string Exchange { get; set; } /// <summary> /// 交換機類型,放空則為普通模式 /// </summary> public string Type { get; set; } /// <summary> /// 參數 /// </summary> public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>(); } public class RabbitMQLoggerOptions : RabbitMQOptions { /// <summary> /// 最低日志記錄 /// </summary> public LogLevel MinLevel { get; set; } = LogLevel.Information; /// <summary> /// 分類 /// </summary> public string Category { get; set; } = "Rabbit"; } public class RabbitMQConsumerOptions : RabbitMQOptions { /// <summary> /// 是否自動提交 /// </summary> public bool AutoAck { get; set; } = false; /// <summary> /// 每次發送消息條數 /// </summary> public ushort? FetchCount { get; set; } } }

using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace RabbitMQDemo { public class RabbitLoggerProvider : ILoggerProvider { RabbitMQLoggerOptions loggerOptions; public RabbitLoggerProvider(IOptionsMonitor<RabbitMQLoggerOptions> options) { loggerOptions = options.CurrentValue; } /// <summary> /// 創建Logger對象 /// </summary> /// <param name="categoryName"></param> /// <returns></returns> public ILogger CreateLogger(string categoryName) { //可緩存實例,這里略過了 return new RabbitLogger(categoryName, loggerOptions); } /// <summary> /// 釋放 /// </summary> public void Dispose() { } } }

using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace RabbitMQDemo { public class RabbitLogger : ILogger, IDisposable { string category; RabbitMQLoggerOptions loggerOptions; RabbitMQProducer producer; public RabbitLogger(string category, RabbitMQLoggerOptions options) { this.category = category; this.loggerOptions = options; producer = new RabbitMQProducer(options.Hosts); producer.Password = options.Password; producer.UserName = options.UserName; producer.Port = options.Port; producer.VirtualHost = options.VirtualHost; } public IDisposable BeginScope<TState>(TState state) { return this; } /// <summary> /// 釋放 /// </summary> public void Dispose() { GC.Collect(); } /// <summary> /// 是否記錄日志 /// </summary> /// <param name="logLevel"></param> /// <returns></returns> public bool IsEnabled(LogLevel logLevel) { //只記錄日志等級大於指定最小等級且屬於Rabbit分類的日志 return logLevel >= loggerOptions.MinLevel && category.Contains(loggerOptions.Category, StringComparison.OrdinalIgnoreCase); } /// <summary> /// 日志記錄 /// </summary> /// <typeparam name="TState"></typeparam> /// <param name="logLevel"></param> /// <param name="eventId"></param> /// <param name="state"></param> /// <param name="exception"></param> /// <param name="formatter"></param> public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter) { if (IsEnabled(logLevel)) { string message = ""; if (state != null) { message = state.ToString(); } if (exception != null) { message += Environment.NewLine + formatter?.Invoke(state, exception); } //發送消息 producer.Publish(loggerOptions.Queue, message, options => { options.Arguments = loggerOptions.Arguments; options.AutoDelete = loggerOptions.AutoDelete; options.Durable = loggerOptions.Durable; }); } } } }
接着,我們修改Startup的服務對象:

using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.HttpsPolicy; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace RabbitMQDemo { public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.Configure<CookiePolicyOptions>(options => { // This lambda determines whether user consent for non-essential cookies is needed for a given request. options.CheckConsentNeeded = context => true; options.MinimumSameSitePolicy = SameSiteMode.None; }); //配置消息發布 services.Configure<RabbitMQLoggerOptions>(options => { options.Category = "Rabbit"; options.Hosts = new string[] { "192.168.187.129" }; options.MinLevel = Microsoft.Extensions.Logging.LogLevel.Information; options.Password = "123456"; options.Port = 5672; options.Queue = "queue1"; options.UserName = "admin"; options.VirtualHost = "/"; options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; options.AutoDelete = false; options.Durable = true; }); //將RabbitLoggerProvider加入到容器中 services.AddSingleton<ILoggerProvider, RabbitLoggerProvider>(); //配置消息消費 services.Configure<RabbitMQConsumerOptions>(options => { options.Hosts = new string[] { "192.168.187.129" }; options.Password = "123456"; options.Port = 5672; options.Queue = "queue1"; options.UserName = "admin"; options.VirtualHost = "/"; options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; options.AutoDelete = false; options.Durable = true; options.AutoAck = false; }); //注入消費者 services.AddSingleton<IHostedService, RabbitHostedService>(); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, Microsoft.Extensions.Hosting.IHostingEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } else { app.UseExceptionHandler("/Home/Error"); app.UseHsts(); } app.UseHttpsRedirection(); app.UseStaticFiles(); app.UseCookiePolicy(); app.UseMvc(routes => { routes.MapRoute( name: "default", template: "{controller=Home}/{action=Index}/{id?}"); }); } } }
順帶提一下,這個Startup中服務最好使用拓展方法去實現,這里我是為了簡單沒有采用拓展方法,所以在開發時多注意一下吧
另外,我們也可以調整一下RabbitHostedService:

using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.IO; using Microsoft.Extensions.Options; namespace RabbitMQDemo { public class RabbitHostedService : IHostedService { RabbitMQConsumerOptions consumerOptions; RabbitMQConsumer consumer; public RabbitHostedService(IOptions<RabbitMQConsumerOptions> options) { consumerOptions = options.Value; consumer = new RabbitMQConsumer(consumerOptions.Hosts); consumer.Password = consumerOptions.Password; consumer.UserName = consumerOptions.UserName; consumer.VirtualHost = consumerOptions.VirtualHost; consumer.Port = consumerOptions.Port; } /// <summary> /// 服務啟動 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task StartAsync(CancellationToken cancellationToken) { await Task.Run(() => { consumer.Received += new Action<RecieveResult>(result => { //文件路徑 string path = Path.Combine(Directory.GetCurrentDirectory(), "logs"); if (!Directory.Exists(path)) { Directory.CreateDirectory(path); } //文件 string fileName = Path.Combine(path, $"{DateTime.Now.ToString("yyyyMMdd")}.log"); string message = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}接收到消息:{result.Body}{Environment.NewLine}"; File.AppendAllText(fileName, message); //提交 result.Commit(); }); consumer.Listen(consumerOptions.Queue, options => { options.AutoAck = consumerOptions.AutoAck; options.Arguments = consumerOptions.Arguments; options.AutoDelete = consumerOptions.AutoDelete; options.Durable = consumerOptions.Durable; }); }); } /// <summary> /// 服務停止 /// </summary> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task StopAsync(CancellationToken cancellationToken) { var task = Task.Run(() => { consumer.Close(); }); await Task.WhenAny(task, Task.Delay(-1, cancellationToken)); cancellationToken.ThrowIfCancellationRequested(); } } }
到這里,rabbitmq就被繼承到.net core的日志記錄功能中去了,但是這里面為了避免記錄不要要的日志,我在其中添加了一個限制,rabbitmq只記錄categoryName包含Rabbit的日志,這樣一來,我們就可以在我們的控制器中使用:

using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using RabbitMQDemo.Models; namespace RabbitMQDemo.Controllers { public class RabbitController : Controller { ILogger logger; public RabbitController(ILoggerFactory loggerFactory) { //下面的logger的categoryName=typeof(RabbitController).FullName=RabbitMQDemo.Controllers.RabbitController logger = loggerFactory.CreateLogger<RabbitController>(); } /// <summary> /// 首頁 /// </summary> /// <returns></returns> public IActionResult Index() { return View(); } /// <summary> /// 消息提交 /// </summary> /// <param name="message"></param> /// <returns></returns> public IActionResult Submit(string message) { if (!string.IsNullOrEmpty(message)) { //發送消息,這里也可以檢驗日志級別的過濾 logger.LogCritical($"Log from Critical:{message}"); logger.LogDebug($"Log from Debug:{message}"); logger.LogError($"Log from Error:{message}"); logger.LogInformation($"Log from Information:{message}"); logger.LogTrace($"Log from Trace:{message}"); logger.LogWarning($"Log from Warning:{message}"); } return View("Index"); } } }
現在可以啟動項目,輸入http://localhost:5000/Rabbit就可以進入頁面測試了,可以發現功能和上面是一樣的
細心的朋友可能會發現,本博文內使用的rabbitmq其實是它6中模式中最簡單的一種模式:hello world模式,讀者可以將上面的代碼稍作修改,即可變成其他幾種模式,但在現實開發中,具體使用哪種模式需要根據自己的業務需求定。
其實,有一個很好用的第三方封裝好的插件,可以讓我們很方便的操作rabbitmq,那就是EasyNetQ,可以使用一個消息總棧IBus來進行消息的操作,包含一系列消息模式:Publish/Subscribe, Request/Response和 Send/Receive,這些消息模式也是我們最常用的,所以以后有空再寫