.net core使用rabbitmq消息隊列


  看博文的朋友,本文有些過時了,還有些BUG,如果想了解更多用法,看看這篇吧.net core使用rabbitmq消息隊列 (二)

  

  首先,如果你還沒有安裝好rabbitmq,可以參考我的博客:

  Ubuntu16.04下,erlang安裝和rabbitmq安裝步驟

  Ubuntu16.04下,rabbimq集群搭建

  另外,我的另外一篇博客有介紹rabbitmq的基礎用法以及使用C#操作rabbitmq,並且對rabbitmq有一個簡單的封裝,這里使用.net core操作rabbitmq也會使用到這些封裝類,所以感興趣的可以看看:

  C# .net 環境下使用rabbitmq消息隊列

  好了現在開始我們的正文

  Rabbitmq一般使用

  說明一下,這里我們使用的是.net core 2.1

  我們先創建一個RabbitMQDemo項目(我創建的是MVC項目),然后使用nuget安裝RabbitMQ.Client:

  

   將上面封裝的rabbitmq操作類添加到項目中:   

  
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQDemo
{
    public class QueueOptions
    {
        /// <summary>
        /// 是否持久化
        /// </summary>
        public bool Durable { get; set; } = true;
        /// <summary>
        /// 是否自動刪除
        /// </summary>
        public bool AutoDelete { get; set; } = false;
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
    }
    public class ConsumeQueueOptions : QueueOptions
    {
        /// <summary>
        /// 是否自動提交
        /// </summary>
        public bool AutoAck { get; set; } = false;
        /// <summary>
        /// 每次發送消息條數
        /// </summary>
        public ushort? FetchCount { get; set; }
    }
    public class ExchangeConsumeQueueOptions : ConsumeQueueOptions
    {
        /// <summary>
        /// 路由值
        /// </summary>
        public string[] RoutingKeys { get; set; }
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
    }
    public class ExchangeQueueOptions : QueueOptions
    {
        /// <summary>
        /// 交換機類型
        /// </summary>
        public string Type { get; set; }
        /// <summary>
        /// 隊列及路由值
        /// </summary>
        public (string, string)[] QueueAndRoutingKey { get; set; }
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
    }
}
QueueOptions
  
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQDemo
{
    public static class RabbitMQExchangeType
    {
        /// <summary>
        /// 普通模式
        /// </summary>
        public const string Common = "";
        /// <summary>
        /// 路由模式
        /// </summary>
        public const string Direct = "direct";
        /// <summary>
        /// 發布/訂閱模式
        /// </summary>
        public const string Fanout = "fanout";
        /// <summary>
        /// 匹配訂閱模式
        /// </summary>
        public const string Topic = "topic";
    }
}
RabbitMQDemo
  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQDemo
{
    public abstract class RabbitBase : IDisposable
    {
        List<AmqpTcpEndpoint> amqpList;
        IConnection connection;

        protected RabbitBase(params string[] hosts)
        {
            if (hosts == null || hosts.Length == 0)
            {
                throw new ArgumentException("invalid hosts!", nameof(hosts));
            }

            this.amqpList = new List<AmqpTcpEndpoint>();
            this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port)));
        }
        protected RabbitBase(params (string, int)[] hostAndPorts)
        {
            if (hostAndPorts == null || hostAndPorts.Length == 0)
            {
                throw new ArgumentException("invalid hosts!", nameof(hostAndPorts));
            }

            this.amqpList = new List<AmqpTcpEndpoint>();
            this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2)));
        }

        /// <summary>
        /// 端口
        /// </summary>
        public int Port { get; set; } = 5672;
        /// <summary>
        /// 賬號
        /// </summary>
        public string UserName { get; set; } = ConnectionFactory.DefaultUser;
        /// <summary>
        /// 密碼
        /// </summary>
        public string Password { get; set; } = ConnectionFactory.DefaultPass;
        /// <summary>
        /// 虛擬機
        /// </summary>
        public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost;

        /// <summary>
        /// 釋放
        /// </summary>
        public virtual void Dispose()
        {
            //connection?.Close();
            //connection?.Dispose();
        }
        /// <summary>
        /// 關閉連接
        /// </summary>
        public void Close()
        {
            connection?.Close();
            connection?.Dispose();
        }

        #region Private
        /// <summary>
        /// 獲取rabbitmq的連接
        /// </summary>
        /// <returns></returns>
        protected IModel GetChannel()
        {
            if (connection == null)
            {
                lock (this)
                {
                    if (connection == null)
                    {
                        var factory = new ConnectionFactory();
                        factory.Port = Port;
                        factory.UserName = UserName;
                        factory.VirtualHost = VirtualHost;
                        factory.Password = Password;
                        connection = factory.CreateConnection(this.amqpList);
                    }
                }
            }
            return connection.CreateModel();
        }

        #endregion
    }
}
RabbitBase
  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQDemo
{
    public class RabbitMQProducer : RabbitBase
    {
        public RabbitMQProducer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQProducer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        #region 普通模式、Work模式
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="message"></param>
        /// <param name="options"></param>
        public void Publish(string queue, string message, QueueOptions options = null)
        {
            options = options ?? new QueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var buffer = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", queue, null, buffer);
            channel.Close();
        }
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="message"></param>
        /// <param name="configure"></param>
        public void Publish(string queue, string message, Action<QueueOptions> configure)
        {
            QueueOptions options = new QueueOptions();
            configure?.Invoke(options);
            Publish(queue, message, options);
        }
        #endregion
        #region 訂閱模式、路由模式、Topic模式
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        /// <param name="options"></param>
        public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null)
        {
            options = options ?? new ExchangeQueueOptions();
            var channel = GetChannel();
            channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.QueueAndRoutingKey != null)
            {
                foreach (var t in options.QueueAndRoutingKey)
                {
                    if (!string.IsNullOrEmpty(t.Item1))
                    {
                        channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>());
                    }
                }
            }
            var buffer = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange, routingKey, null, buffer);
            channel.Close();
        }
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        /// <param name="configure"></param>
        public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure)
        {
            ExchangeQueueOptions options = new ExchangeQueueOptions();
            configure?.Invoke(options);
            Publish(exchange, routingKey, message, options);
        }
        #endregion
    }
}
RabbitMQProducer
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQDemo
{
    public class RabbitMQConsumer : RabbitBase
    {
        public RabbitMQConsumer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        public event Action<RecieveResult> Received;

        /// <summary>
        /// 構造消費者
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                try
                {
                    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                    if (!options.AutoAck)
                    {
                        cancellationTokenSource.Token.Register(() =>
                        {
                            channel.BasicAck(e.DeliveryTag, false);
                        });
                    }
                    Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
                }
                catch { }
            };
            if (options.FetchCount != null)
            {
                channel.BasicQos(0, options.FetchCount.Value, false);
            }
            return consumer;
        }

        #region 普通模式、Work模式
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
        {
            options = options ?? new ConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
        {
            ConsumeQueueOptions options = new ConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(queue, options);
        }
        #endregion
        #region 訂閱模式、路由模式、Topic模式
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
        {
            options = options ?? new ExchangeConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
            {
                foreach (var key in options.RoutingKeys)
                {
                    channel.QueueBind(queue, exchange, key, options.BindArguments);
                }
            }
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
        {
            ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(exchange, queue, options);
        }
        #endregion
    }
    public class RecieveResult
    {
        CancellationTokenSource cancellationTokenSource;
        public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
        {
            this.Body = Encoding.UTF8.GetString(arg.Body);
            this.ConsumerTag = arg.ConsumerTag;
            this.DeliveryTag = arg.DeliveryTag;
            this.Exchange = arg.Exchange;
            this.Redelivered = arg.Redelivered;
            this.RoutingKey = arg.RoutingKey;
            this.cancellationTokenSource = cancellationTokenSource;
        }

        /// <summary>
        /// 消息體
        /// </summary>
        public string Body { get; private set; }
        /// <summary>
        /// 消費者標簽
        /// </summary>
        public string ConsumerTag { get; private set; }
        /// <summary>
        /// Ack標簽
        /// </summary>
        public ulong DeliveryTag { get; private set; }
        /// <summary>
        /// 交換機
        /// </summary>
        public string Exchange { get; private set; }
        /// <summary>
        /// 是否Ack
        /// </summary>
        public bool Redelivered { get; private set; }
        /// <summary>
        /// 路由
        /// </summary>
        public string RoutingKey { get; private set; }

        public void Commit()
        {
            if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;

            cancellationTokenSource.Cancel();
            cancellationTokenSource.Dispose();
            cancellationTokenSource = null;
        }
    }
    public class ListenResult
    {
        CancellationTokenSource cancellationTokenSource;

        /// <summary>
        /// CancellationToken
        /// </summary>
        public CancellationToken Token { get { return cancellationTokenSource.Token; } }
        /// <summary>
        /// 是否已停止
        /// </summary>
        public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }

        public ListenResult()
        {
            cancellationTokenSource = new CancellationTokenSource();
        }

        /// <summary>
        /// 停止監聽
        /// </summary>
        public void Stop()
        {
            cancellationTokenSource.Cancel();
        }
    }
}
RabbitMQConsumer

  修改Startup,在ConfigureServices中將rabbitmq的生產類RabbitMQProducer加入到DI容器中:   

  //將rabbitmq的生產類加入到DI容器中
  var producer = new RabbitMQProducer("192.168.187.129");
  producer.Password = "123456";
  producer.UserName = "admin";
  services.AddSingleton(producer);//這里我沒有使用集群

   至於消息的消費,我們可以將消費者使用一個線程啟動並監聽,但是這里我個人推薦使用.net core 自帶的HostedService去實現,至於消費者的功能,我們就簡單的將消息記錄都文本文檔中,類似日志記錄。

  我們創建一個RabbitHostedService類:   

  
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.IO;

namespace RabbitMQDemo
{
    public class RabbitHostedService : IHostedService
    {
        RabbitMQConsumer consumer;

        public RabbitHostedService()
        {
            consumer = new RabbitMQConsumer("192.168.187.129");
            consumer.Password = "123456";
            consumer.UserName = "admin";
        }

        /// <summary>
        /// 服務啟動
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await Task.Run(() =>
            {
                consumer.Received += new Action<RecieveResult>(result =>
                {
                    //文件路徑
                    string path = Path.Combine(Directory.GetCurrentDirectory(), "logs");
                    if (!Directory.Exists(path))
                    {
                        Directory.CreateDirectory(path);
                    }

                    //文件
                    string fileName = Path.Combine(path, $"{DateTime.Now.ToString("yyyyMMdd")}.log");
                    string message = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}接收到消息:{result.Body}{Environment.NewLine}";
                    File.AppendAllText(fileName, message);

                    //提交
                    result.Commit();
                });
                
                consumer.Listen("queue1", options =>
                {
                    options.AutoAck = false;
                    options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                    options.AutoDelete = false;
                    options.Durable = true;
                });
            });
        }
        /// <summary>
        /// 服務停止
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task StopAsync(CancellationToken cancellationToken)
        {
            var task = Task.Run(() =>
            {
                consumer.Close();
            });

            await Task.WhenAny(task, Task.Delay(-1, cancellationToken));
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
}
RabbitHostedService

  同時,我們需要在Startup中將RabbitHostedService注入到容器中:  

  //注入消費者
  services.AddSingleton<IHostedService, RabbitHostedService>();

  得到Startup的代碼如下:  

  
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace RabbitMQDemo
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.Configure<CookiePolicyOptions>(options =>
            {
                // This lambda determines whether user consent for non-essential cookies is needed for a given request.
                options.CheckConsentNeeded = context => true;
                options.MinimumSameSitePolicy = SameSiteMode.None;
            });

            //將rabbitmq的生產類加入到DI容器中
            var producer = new RabbitMQProducer("192.168.187.129");
            producer.Password = "123456";
            producer.UserName = "admin";
            services.AddSingleton(producer);//這里我沒有使用集群
            //注入消費者
            services.AddSingleton<IHostedService, RabbitHostedService>();

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, Microsoft.Extensions.Hosting.IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                app.UseExceptionHandler("/Home/Error");
                app.UseHsts();
            }

            app.UseHttpsRedirection();
            app.UseStaticFiles();
            app.UseCookiePolicy();

            app.UseMvc(routes =>
            {
                routes.MapRoute(
                    name: "default",
                    template: "{controller=Home}/{action=Index}/{id?}");
            });
        }
    }
}
Startup

  到這里,.net core 集成rabbitmq就寫好了,然后就是發送消息使用了,我們添加一個名為RabbitController的控制器,里面代碼如下:   

  
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using RabbitMQDemo.Models;

namespace RabbitMQDemo.Controllers
{
    public class RabbitController : Controller
    {
        RabbitMQProducer producer;
        public RabbitController(RabbitMQProducer producer)
        {
            this.producer = producer;
        }

        /// <summary>
        /// 首頁
        /// </summary>
        /// <returns></returns>
        public IActionResult Index()
        {
            return View();
        }
        /// <summary>
        /// 消息提交
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public IActionResult Submit(string message)
        {
            if (!string.IsNullOrEmpty(message))
            {
                //發送消息
                producer.Publish("queue1", message, options =>
                {
                    options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                    options.AutoDelete = false;
                    options.Durable = true;
                });
            }
            return View("Index");
        }
    }
}
RabbitController

   RabbitController里面有兩個Action,它們返回同一個視圖:    

  
@{
    Layout = null;
}

@using (Html.BeginForm("Submit", "Rabbit"))
{
    @Html.DisplayName("消息:");
    <br />
    @Html.TextArea("message", new { @class = "multieditbox" })
    <br />
    <input type="submit" class="buttoncss" value="確定" />
}
<style type="text/css">
    .buttoncss {
        font-family: "tahoma", "宋體"; /*www.52css.com*/
        font-size: 9pt;
        color: #003399;
        border: 1px #003399 solid;
        color: #006699;
        border-bottom: #93bee2 1px solid;
        border-left: #93bee2 1px solid;
        border-right: #93bee2 1px solid;
        border-top: #93bee2 1px solid;
        background-image: url(../images/bluebuttonbg.gif);
        background-color: #e8f4ff;
        font-style: normal;
        width: 60px;
        height: 22px;
    }

    .multieditbox {
        background: #f8f8f8;
        border-bottom: #b7b7b7 1px solid;
        border-left: #b7b7b7 1px solid;
        border-right: #b7b7b7 1px solid;
        border-top: #b7b7b7 1px solid;
        color: #000000;
        cursor: text;
        font-family: "arial";
        font-size: 9pt;
        padding: 1px; /*www.52css.com*/
        width: 200px;
        height: 80px;
    }
</style>
Index

  現在可以啟動項目,輸入http://localhost:5000/Rabbit就可以進入頁面測試了:

  

    點擊上面確定之后,你會發現在項目根目錄下生成了一個logs目錄,里面有一個文件,文件里面就是我們發送的消息了

    注:如果報錯,可以登錄rabbitmq后台查看賬號虛擬機權限是否存在

  

 

   Rabbitmq日志記錄

  上面是我們使用.net core集成rabbitmq的一種簡單方式,但是不建議在開發時這么使用

  可以注意到,上面的例子我直接將RabbitMQProducer注入到容器中,但開發時應該按自己的需求對RabbitMQProducer做一層封裝,然后將封裝類注入到容器中,比如我們要使用rabbitmq做日志記錄,可以記錄到數據庫,也可以記錄到文件中去,但是.net core為我們提供了一整套的日志記錄功能,因此我們只需要將rabbitmq集成進去就可以了

  首先,我們需要創建幾個類,將rabbitmq繼承到日志記錄功能中去:  

  
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace RabbitMQDemo
{
    public abstract class RabbitMQOptions
    {
        /// <summary>
        /// 服務節點
        /// </summary>
        public string[] Hosts { get; set; }
        /// <summary>
        /// 端口
        /// </summary>
        public int Port { get; set; }
        /// <summary>
        /// 賬號
        /// </summary>
        public string UserName { get; set; }
        /// <summary>
        /// 密碼
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        /// 虛擬機
        /// </summary>
        public string VirtualHost { get; set; }
        /// <summary>
        /// 是否持久化
        /// </summary>
        public bool Durable { get; set; } = true;
        /// <summary>
        /// 是否自動刪除
        /// </summary>
        public bool AutoDelete { get; set; } = false;
        /// <summary>
        /// 隊列
        /// </summary>
        public string Queue { get; set; }
        /// <summary>
        /// 交換機
        /// </summary>
        public string Exchange { get; set; }
        /// <summary>
        /// 交換機類型,放空則為普通模式
        /// </summary>
        public string Type { get; set; }
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
    }
    public class RabbitMQLoggerOptions : RabbitMQOptions
    {
        /// <summary>
        /// 最低日志記錄
        /// </summary>
        public LogLevel MinLevel { get; set; } = LogLevel.Information;
        /// <summary>
        /// 分類
        /// </summary>
        public string Category { get; set; } = "Rabbit";
    }
    public class RabbitMQConsumerOptions : RabbitMQOptions
    {
        /// <summary>
        /// 是否自動提交
        /// </summary>
        public bool AutoAck { get; set; } = false;
        /// <summary>
        /// 每次發送消息條數
        /// </summary>
        public ushort? FetchCount { get; set; }
    }
}
RabbitMQOptions
  
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace RabbitMQDemo
{
    public class RabbitLoggerProvider : ILoggerProvider
    {
        RabbitMQLoggerOptions loggerOptions;

        public RabbitLoggerProvider(IOptionsMonitor<RabbitMQLoggerOptions> options)
        {
            loggerOptions = options.CurrentValue;
        }

        /// <summary>
        /// 創建Logger對象
        /// </summary>
        /// <param name="categoryName"></param>
        /// <returns></returns>
        public ILogger CreateLogger(string categoryName)
        {
            //可緩存實例,這里略過了
            return new RabbitLogger(categoryName, loggerOptions);
        }

        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            
        }
    }
}
RabbitLoggerProvider
  
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace RabbitMQDemo
{
    public class RabbitLogger : ILogger, IDisposable
    {
        string category;
        RabbitMQLoggerOptions loggerOptions;
        RabbitMQProducer producer;

        public RabbitLogger(string category, RabbitMQLoggerOptions options)
        {
            this.category = category;
            this.loggerOptions = options;

            producer = new RabbitMQProducer(options.Hosts);
            producer.Password = options.Password;
            producer.UserName = options.UserName;
            producer.Port = options.Port;
            producer.VirtualHost = options.VirtualHost;
        }

        public IDisposable BeginScope<TState>(TState state)
        {
            return this;
        }
        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            GC.Collect();
        }
        /// <summary>
        /// 是否記錄日志
        /// </summary>
        /// <param name="logLevel"></param>
        /// <returns></returns>
        public bool IsEnabled(LogLevel logLevel)
        {
            //只記錄日志等級大於指定最小等級且屬於Rabbit分類的日志
            return logLevel >= loggerOptions.MinLevel && category.Contains(loggerOptions.Category, StringComparison.OrdinalIgnoreCase);
        }

        /// <summary>
        /// 日志記錄
        /// </summary>
        /// <typeparam name="TState"></typeparam>
        /// <param name="logLevel"></param>
        /// <param name="eventId"></param>
        /// <param name="state"></param>
        /// <param name="exception"></param>
        /// <param name="formatter"></param>
        public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
        {
            if (IsEnabled(logLevel))
            {
                string message = "";
                if (state != null)
                {
                    message = state.ToString();
                }
                if (exception != null)
                {
                    message += Environment.NewLine + formatter?.Invoke(state, exception);
                }
                //發送消息
                producer.Publish(loggerOptions.Queue, message, options =>
                {
                    options.Arguments = loggerOptions.Arguments;
                    options.AutoDelete = loggerOptions.AutoDelete;
                    options.Durable = loggerOptions.Durable;
                });
            }
        }
    }
}
RabbitLogger

  接着,我們修改Startup的服務對象:  

  
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace RabbitMQDemo
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.Configure<CookiePolicyOptions>(options =>
            {
                // This lambda determines whether user consent for non-essential cookies is needed for a given request.
                options.CheckConsentNeeded = context => true;
                options.MinimumSameSitePolicy = SameSiteMode.None;
            });

            //配置消息發布
            services.Configure<RabbitMQLoggerOptions>(options =>
            {
                options.Category = "Rabbit";
                options.Hosts = new string[] { "192.168.187.129" };
                options.MinLevel = Microsoft.Extensions.Logging.LogLevel.Information;
                options.Password = "123456";
                options.Port = 5672;
                options.Queue = "queue1";
                options.UserName = "admin";
                options.VirtualHost = "/";
                options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                options.AutoDelete = false;
                options.Durable = true;
            });
            //將RabbitLoggerProvider加入到容器中
            services.AddSingleton<ILoggerProvider, RabbitLoggerProvider>(); 

            //配置消息消費
            services.Configure<RabbitMQConsumerOptions>(options =>
            {
                options.Hosts = new string[] { "192.168.187.129" };
                options.Password = "123456";
                options.Port = 5672;
                options.Queue = "queue1";
                options.UserName = "admin";
                options.VirtualHost = "/";
                options.Arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                options.AutoDelete = false;
                options.Durable = true;
                options.AutoAck = false;
            });
            //注入消費者
            services.AddSingleton<IHostedService, RabbitHostedService>();

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, Microsoft.Extensions.Hosting.IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                app.UseExceptionHandler("/Home/Error");
                app.UseHsts();
            }

            app.UseHttpsRedirection();
            app.UseStaticFiles();
            app.UseCookiePolicy();

            app.UseMvc(routes =>
            {
                routes.MapRoute(
                    name: "default",
                    template: "{controller=Home}/{action=Index}/{id?}");
            });
        }
    }
}
Startup

  順帶提一下,這個Startup中服務最好使用拓展方法去實現,這里我是為了簡單沒有采用拓展方法,所以在開發時多注意一下吧

  另外,我們也可以調整一下RabbitHostedService:  

  
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using Microsoft.Extensions.Options;

namespace RabbitMQDemo
{
    public class RabbitHostedService : IHostedService
    {
        RabbitMQConsumerOptions consumerOptions;
        RabbitMQConsumer consumer;

        public RabbitHostedService(IOptions<RabbitMQConsumerOptions> options)
        {
            consumerOptions = options.Value;

            consumer = new RabbitMQConsumer(consumerOptions.Hosts);
            consumer.Password = consumerOptions.Password;
            consumer.UserName = consumerOptions.UserName;
            consumer.VirtualHost = consumerOptions.VirtualHost;
            consumer.Port = consumerOptions.Port;
        }

        /// <summary>
        /// 服務啟動
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await Task.Run(() =>
            {
                consumer.Received += new Action<RecieveResult>(result =>
                {
                    //文件路徑
                    string path = Path.Combine(Directory.GetCurrentDirectory(), "logs");
                    if (!Directory.Exists(path))
                    {
                        Directory.CreateDirectory(path);
                    }

                    //文件
                    string fileName = Path.Combine(path, $"{DateTime.Now.ToString("yyyyMMdd")}.log");
                    string message = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}接收到消息:{result.Body}{Environment.NewLine}";
                    File.AppendAllText(fileName, message);

                    //提交
                    result.Commit();
                });
                
                consumer.Listen(consumerOptions.Queue, options =>
                {
                    options.AutoAck = consumerOptions.AutoAck;
                    options.Arguments = consumerOptions.Arguments;
                    options.AutoDelete = consumerOptions.AutoDelete;
                    options.Durable = consumerOptions.Durable;
                });
            });
        }
        /// <summary>
        /// 服務停止
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task StopAsync(CancellationToken cancellationToken)
        {
            var task = Task.Run(() =>
            {
                consumer.Close();
            });

            await Task.WhenAny(task, Task.Delay(-1, cancellationToken));
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
}
RabbitHostedService

  到這里,rabbitmq就被繼承到.net core的日志記錄功能中去了,但是這里面為了避免記錄不要要的日志,我在其中添加了一個限制,rabbitmq只記錄categoryName包含Rabbit的日志,這樣一來,我們就可以在我們的控制器中使用:  

  
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using RabbitMQDemo.Models;

namespace RabbitMQDemo.Controllers
{
    public class RabbitController : Controller
    {
        ILogger logger;
        public RabbitController(ILoggerFactory loggerFactory)
        {
            //下面的logger的categoryName=typeof(RabbitController).FullName=RabbitMQDemo.Controllers.RabbitController
            logger = loggerFactory.CreateLogger<RabbitController>();
        }

        /// <summary>
        /// 首頁
        /// </summary>
        /// <returns></returns>
        public IActionResult Index()
        {
            return View();
        }
        /// <summary>
        /// 消息提交
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public IActionResult Submit(string message)
        {
            if (!string.IsNullOrEmpty(message))
            {
                //發送消息,這里也可以檢驗日志級別的過濾
                logger.LogCritical($"Log from Critical:{message}");
                logger.LogDebug($"Log from Debug:{message}");
                logger.LogError($"Log from Error:{message}");
                logger.LogInformation($"Log from Information:{message}");
                logger.LogTrace($"Log from Trace:{message}");
                logger.LogWarning($"Log from Warning:{message}");
            }
            return View("Index");
        }
    }
}
RabbitController

  現在可以啟動項目,輸入http://localhost:5000/Rabbit就可以進入頁面測試了,可以發現功能和上面是一樣的

  細心的朋友可能會發現,本博文內使用的rabbitmq其實是它6中模式中最簡單的一種模式:hello world模式,讀者可以將上面的代碼稍作修改,即可變成其他幾種模式,但在現實開發中,具體使用哪種模式需要根據自己的業務需求定。 

  其實,有一個很好用的第三方封裝好的插件,可以讓我們很方便的操作rabbitmq,那就是EasyNetQ,可以使用一個消息總棧IBus來進行消息的操作,包含一系列消息模式:Publish/Subscribe, Request/Response和 Send/Receive,這些消息模式也是我們最常用的,所以以后有空再寫


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM