C# .net 環境下使用rabbitmq消息隊列


  消息隊列的地位越來越重要,幾乎是面試的必問問題了,不會使用幾種消息隊列都顯得尷尬,正好本文使用C#來帶你認識rabbitmq消息隊列

  首先,我們要安裝rabbitmq,當然,如果有現成的,也可以使用,不知道曾幾何時,我喜歡將數據庫等等軟件安裝在linux虛擬機,如果沒現成的rabbitmq,按照下面的來吧,嘿嘿

  rabbitmq安裝:https://www.cnblogs.com/shanfeng1000/p/11951703.html

  如果要實現rabbitmq集群,參考:https://www.cnblogs.com/shanfeng1000/p/12097054.html

  我這里使用的是rabbitmq集群,但是沒有比較,只是已經安裝好了,就直接使用算了

  虛擬機集群地址:192.168.209.133,192.168.209.134,192.168.209.135

  端口使用的默認端口,都是5672,也就是AMQP協議端口

  Rabbitmq的工作模式

  先說說幾個概念

  生產者(producer):負責生產消息,可以有多個生產者,可以理解為生成消息的那部分邏輯

  消費者(consumer):從隊列中獲取消息,對消息處理的那部分邏輯

  隊列(queue):用於存放消息,可以理解為先進先出的一個對象

  交換機(exchange):顧名思義,就是個中介的角色,將接收到的消息按不同的規則轉發到其他交換機或者隊列中

  路由(route):就是交換機分發消息的規則,交換機可以指定路由規則,生產者在發布消息時也可以指定消息路由,比如交換機中設置A路由表示將消息轉發到隊列1,B路由表示將消息轉發到隊列2,那么當交換機接收到消息時,如果消息的路由滿足A路由,則將消息轉發到隊列1,如果滿足B路由則將消息轉發到隊列2

  虛擬主機(virtual host):虛擬地址,用於進行邏輯隔離,一個虛擬主機里面可以有若干個 exchange 和 queue,但是里面不能有相同名稱的 exchange 或 queue

  再看看rabbitmq的幾種工作模式,具體可參考rabbitmq官網給出的Demo:https://www.rabbitmq.com/getstarted.html

    

  其中,第6中類似我們常用的請求-響應模式,但是使用的RPC請求響應,用的比較少,這里就不過多解釋,感興趣的可以參考官網文檔:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  總的來說,就是生產者將消息發布到rabbitmq上,然后消費者連接rabbitmq,獲取到消息就消費,但是有幾點說明一下

  1、rabbitmq中的消息是可被多次消費的,因為rabbitmq提供了ack機制,當消費者在消費消息時,如果將自動ack設置成false,那么需要手動提交ack才能告訴rabbitmq消息已被使用,否則當通道關閉時,消息會繼續呆在隊列中等待消費

  2、當存在多個消費者時,默認情況下,一個消費者獲取一個消息,處理完成后再獲取下一個,但是rabbitmq消費一次性獲取多個,當然后當這些消息消費完成后,再獲取下一批,這也就是rabbitmq的Qos機制

  

  C#使用rabbitmq

  如果感興趣的人多,到時候再單獨開一篇博文,現在就介紹其中的1-5種,也可以分類成兩種:不使用交換機和使用交換機,所以下面就分這兩種來說明

  首先,我們創建了兩個Demo項目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分別使用使用nuget安裝RabbitMQ.Client:

  

  其中RabbitMQ.PublishConsole是用來生產消息,RabbitMQ.ConsumeConsole用來消費消息  

  這里我們安裝的是最新版本,舊版本和新版本在使用上可能會有一些區別


 

  不使用交換機情形

  不使用交換機有兩種模式:簡單模式和工作模式

  這里先貼上生產者生成消息的代碼,簡單模式和工作模式這部分測試代碼是一樣的:  

  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.PublishConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            string queue = "queue1";//隊列名稱

            //創建一個通道
            //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            var channel = connection.CreateModel();
            //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
            channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

            //發布10條消息
            for (var i = 0; i < 10; i++)
            {
                var buffer = Encoding.UTF8.GetBytes(i.ToString());
                channel.BasicPublish("", queue, null, buffer);
            }
            channel.Close();

            Console.ReadKey();
        }
    }
}
RabbitMQ.PublishConsole

  上述代碼執行完成后,隊列queue1中就有了10條消息,可以在rabbitmq的后台管理中看到:  

  

  代碼中提到,通道在申明隊列時,如果隊列已經存在,則申明的參數一定要對上,否則會拋出異常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10

  比如這里,我實現在rabbitmq后台創建了隊列,那么他們的對應關系如下圖: 

   

  

   簡單模式

  這個模式很簡單,其實就是只有一個消費者,簡單的保證操作的順序性

  

   接着貼上消費者代碼:

  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsumeConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            string queue = "queue1";//隊列名稱

            //創建一個通道
            //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            var channel = connection.CreateModel();
            //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
            channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
            //channel.BasicQos(2, 2, false);//設置QOS

            //在通道中定義一個事件消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                string message = Encoding.UTF8.GetString(e.Body);
                Console.WriteLine($"接收到消息:{message}");

                Thread.Sleep(500);//暫停一下

                //通知消息已被處理,如果沒有,那么消息將會被重復消費
                channel.BasicAck(e.DeliveryTag, false);
            };
            //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息
            channel.BasicConsume(queue, false, consumer);

            Console.ReadKey();
        }

        
    }
}
RabbitMQ.ConsumeConsole

  上述代碼執行完成后,在后台管理中可以看到消息被消費掉了

  

 

  工作模式

   工作模式是簡單模式的拓展,如果業務簡單,對消息的消費是一個耗時的過程,這個模式是一個好的選擇。

   

   接着調用生產者代碼生產10條消息,下面是消費者的測試代碼  

  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsumeConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            Consumer(connection, 1);//消費者1
            Consumer(connection, 2);//消費者2

            Console.ReadKey();
        }

        static void Consumer(IConnection connection, ushort prefetch)
        {
            //使用多線程來執行,可以模擬多個消費者
            new Thread(() =>
            {
                int threadId = Thread.CurrentThread.ManagedThreadId;//線程Id,用於區分消費者
                string queue = "queue1";//隊列名稱

                //創建一個通道
                //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
                var channel = connection.CreateModel();
                //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
                //設置消費者每次獲取的消息數,可以用來設置消費者消息的權重
                //必須等獲取的消息都消費完成后才能重新獲取
                channel.BasicQos(0, prefetch, true);

                //在通道中定義一個事件消費者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    string message = Encoding.UTF8.GetString(e.Body);
                    Console.WriteLine($"ThreadId:【{threadId}】 接收到消息:{message}");

                    Thread.Sleep(500);

                    //通知消息已被處理,如果沒有,那么消息將會被重復消費
                    channel.BasicAck(e.DeliveryTag, false);

                };
                //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息
                channel.BasicConsume(queue, false, consumer);
            }).Start();
        }
    }
}
RabbitMQ.ConsumeConsole

  另外說明一下,代碼中提到rabbitmq的QOS機制,這里簡單解釋一下,當生產者將消息發布到rabbitmq之后,如果在未配置QOS的情況下,rabbitmq盡可能快速地發送隊列中的所有消息到消費者端,如果消息比較多,消費者來不及處理,就會緩存這些消息,當消息堆積過多,可能導致服務器內存不足而影響其他進程,rabbitmq的QOS可以很好的解決這類問題,QOS就是限制消費者一次性從rabbitmq中獲取消息的個數,而不是獲取所有消息。比如設置rabbitmq的QOS為10,也就是prefetch=10,就是說,哪怕rabbitmq中有100條消息,消費者也只是一次性獲取10條,然后消費者消費這10條消息,剩下的交給其他消費者,當10條消息中的unacked個數少於prefetch * 消費者數目時,會繼續從rabbitmq獲取消息,如果在工作模式中,不使用QOS,你會發現,所有的消息都被一個消費者消費了

  


 

  使用交換機情形

  使用交換機的情形有3種:發布訂閱模式,路由模式,主題模式

  上面說了,交換機是一個中介的角色,當一個交換機創建后,可以將其他隊列或者交換機與當前交換機綁定,綁定時需要指定綁定路由規則,這個和交換機類型有關。

  當我們不使用交換機時,那么生產者是直接將消息發布到隊列中去的,生產者只需要指定消息接收的隊列即可,而使用交換機做中轉時,生產者只需要將消息發布到交換機,然后交換機根據接收到的消息,按與交換機綁定的路由規則,將消息轉發到其他交換機或者隊列中,這個處理過程和交換機的類型有關,交換機一般分為4類:

  direct:直連類型,就是將消息的路由和交換機的綁定路由作比較,當兩者一致時,則匹配成功,然后消息就會被轉發到這個綁定路由后的隊列或者交換機

  fanout:這種類型的交換機是不需要指定路由的,當交換機接收到消息時,會將消息廣播到所有綁定到它的所有隊列或交換機中

  topic:主題類型,類似direct類型,只不過在將消息的路由和綁定路由做比較時,是通過特定表達式去比較的,其中# 匹配一個或多個,* 匹配一個

  headers:頭部交換機,允許使用消息頭中的信息來做匹配規則,這個用的少,基本上不用,這里也就不過多介紹了

  到這里,你應該發覺,使用交換機的三種情形,無非就是使用交換機的類型不一樣,發布訂閱模式--fanout,路由模式--direct,主題模式--topic

  現在我們先去rabbitmq的后台中,創建這幾種交換機:

  交換機的創建及綁定都可以在代碼中實現,如IModel類的QueueBind,ExchangeBind等方法,用多了就自然熟了,這里為了方便截圖,就到后台去創建了

  

    然后我們創建兩個隊列,並按指定類型分別綁定到這3個交換機中:

   隊列:

  

    demo.direct綁定隊列規則:

   

   demo.fanout綁定隊列規則:

  

   demo.topic綁定隊列規則:

    

   上面所描述的,無非就是三種模式中發布消息方式的不一樣,消費者當然還是從隊列獲取消息消費的,這里我們就先貼出消費者的代碼:

  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsumeConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            Consumer(connection, "queue1");//消費者1
            Consumer(connection, "queue2");//消費者2

            Console.ReadKey();
        }

        static void Consumer(IConnection connection, string queue)
        {
            //使用多線程來執行,可以模擬多個消費者
            new Thread(() =>
            {
                int threadId = Thread.CurrentThread.ManagedThreadId;//線程Id,用於區分消費者

                //創建一個通道
                //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
                var channel = connection.CreateModel();
                //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯
                var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
                channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

                //在通道中定義一個事件消費者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, e) =>
                {
                    string message = Encoding.UTF8.GetString(e.Body);
                    Console.WriteLine($"ThreadId:【{threadId}】 接收到消息:{message}");

                    Thread.Sleep(500);

                    //通知消息已被處理,如果沒有,那么消息將會被重復消費
                    channel.BasicAck(e.DeliveryTag, false);
                };
                //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息
                channel.BasicConsume(queue, false, consumer);
            }).Start();
        }
    }
}
RabbitMQ.ConsumeConsole

  這里我們使用了兩個隊列,每個隊列我們這里只用了一個消費者,對於下面幾種模式,這個消費者代碼都能消費到

  發布訂閱模式

  發布訂閱模式使用的是fanout類型的交換機,這個類型無需指定路由,交換機會將消息廣播到每個綁定到交換機的隊列或者交換機  

  

  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.PublishConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            string exchange = "demo.fanout";//交換機名稱
            string exchangeType = "fanout";//交換機類型

            //創建一個通道
            //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            var channel = connection.CreateModel();
            //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列
            var arguments = new Dictionary<string, object>() { };
            channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);
            
            //發布10條消息
            for (var i = 0; i < 10; i++)
            {
                var buffer = Encoding.UTF8.GetBytes(i.ToString());
                channel.BasicPublish(exchange, "", null, buffer);
            }
            channel.Close();

            Console.ReadKey();
        }
    }
}
RabbitMQ.PublishConsole

  代碼中,我們往交換機發布了10條消息,交換機接收到消息后,會將消息轉發到queue1和queue2,因此,queue1和queue2都會收到10條消息:

  

  路由模式

  路由模式使用的是direct類型的交換機,也即在進行路由匹配時,需要匹配的路由一直才算匹配成功,我們把發布訂閱模式的代碼稍作修改即可,貼出生產者部分代碼:  

   

  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.PublishConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            string exchange = "demo.direct";//交換機名稱
            string exchangeType = "direct";//交換機類型

            //創建一個通道
            //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            var channel = connection.CreateModel();
            //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列
            var arguments = new Dictionary<string, object>() { };
            channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);

            string[] routes = new string[] { "apple", "banana" };

            //發布10條消息
            for (var i = 0; i < 10; i++)
            {
                var buffer = Encoding.UTF8.GetBytes(i.ToString());
                channel.BasicPublish(exchange, routes[i % 2], null, buffer);
            }
            channel.Close();

            Console.ReadKey();
        }
    }
}
RabbitMQ.PublishConsole

  代碼中,我們往demo.direct交換機發布了10條消息,其中5條消息的路由是apple,另外5條消息的路由是banana,demo.direct交換機綁定的兩個隊列中,queue1的綁定路由是apple,queue2的綁定路由是banana,那么demo.direct交換機會將路由是apple的消息轉發到queue1,將路由是banana的消息轉發到queue2,從后台可以看每個隊列中已經有5個消息准備好了:

  

    接下來可以使用消費者將它們消費掉

  主題模式

   主題模式使用的topic類型的交換機,在進行匹配時,是根據表達式去匹配,# 匹配一個或多個,* 匹配一個,我們將路由模式的代碼稍作修改:    

  

  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.PublishConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";

            //創建一個連接工廠
            var factory = new ConnectionFactory();
            factory.UserName = userName;
            factory.Password = password;
            factory.Port = port;
            factory.VirtualHost = virtualHost;
            //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成
            //一個連接可以創建多個通道
            var connection = factory.CreateConnection(hosts);

            string exchange = "demo.topic";//交換機名稱
            string exchangeType = "topic";//交換機類型

            //創建一個通道
            //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成
            var channel = connection.CreateModel();
            //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列
            var arguments = new Dictionary<string, object>() { };
            channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments);

            string[] routes = new string[] { "apple.", "banana." };

            //發布10條消息
            for (var i = 0; i < 10; i++)
            {
                var buffer = Encoding.UTF8.GetBytes(i.ToString());
                channel.BasicPublish(exchange, routes[i % 2] + i, null, buffer);
            }
            channel.Close();

            Console.ReadKey();
        }
    }
}
RabbitMQ.PublishConsole

  代碼中,我們往demo.topic交換機中發布了10條消息,其中5條消息的路由是以apple開頭的,另外5條消息的路由是以banana開頭的,demo.direct交換機綁定的兩個隊列中,queue1的綁定路由是apple.#,就是匹配以apple開頭的路由,queue2的綁定路由是banana.#,就是匹配以banana開頭的路由,那么demo.direct交換機會將路由是以apple開頭的的消息轉發到queue1,將路由是以banana開頭的的消息轉發到queue2,從后台可以看每個隊列中已經有5個消息准備好了:

  

    


 

  封裝

  其實rabbitmq的使用還是比較簡單的,只需要多謝謝代碼嘗試一下就能熟悉

  一般的,像這種第三方插件的調用,我建議自己要做一層封裝,最好是根據自己的需求去封裝,然后項目中只需要調用自己封裝的類就行了,下面貼出我自己封裝的類:  

  
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.ConsoleApp
{
    public class QueueOptions
    {
        /// <summary>
        /// 是否持久化
        /// </summary>
        public bool Durable { get; set; } = true;
        /// <summary>
        /// 是否自動刪除
        /// </summary>
        public bool AutoDelete { get; set; } = false;
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
    }
    public class ConsumeQueueOptions : QueueOptions
    {
        /// <summary>
        /// 是否自動提交
        /// </summary>
        public bool AutoAck { get; set; } = false;
        /// <summary>
        /// 每次發送消息條數
        /// </summary>
        public ushort? FetchCount { get; set; }
    }
    public class ExchangeConsumeQueueOptions : ConsumeQueueOptions
    {
        /// <summary>
        /// 路由值
        /// </summary>
        public string[] RoutingKeys { get; set; }
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
    }
    public class ExchangeQueueOptions : QueueOptions
    {
        /// <summary>
        /// 交換機類型
        /// </summary>
        public string Type { get; set; }
        /// <summary>
        /// 隊列及路由值
        /// </summary>
        public (string,string)[] QueueAndRoutingKey { get; set; } 
        /// <summary>
        /// 參數
        /// </summary>
        public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>();
    }
}
QueueOptions
  
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQ.ConsoleApp
{
    public static class RabbitMQExchangeType
    {
        /// <summary>
        /// 普通模式
        /// </summary>
        public const string Common = "";
        /// <summary>
        /// 路由模式
        /// </summary>
        public const string Direct = "direct";
        /// <summary>
        /// 發布/訂閱模式
        /// </summary>
        public const string Fanout = "fanout";
        /// <summary>
        /// 匹配訂閱模式
        /// </summary>
        public const string Topic = "topic";
    }
}
RabbitMQExchangeType
  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQ.ConsoleApp
{
    public abstract class RabbitBase : IDisposable
    {
        List<AmqpTcpEndpoint> amqpList;
        IConnection connection;

        protected RabbitBase(params string[] hosts)
        {
            if (hosts == null || hosts.Length == 0)
            {
                throw new ArgumentException("invalid hosts!", nameof(hosts));
            }

            this.amqpList = new List<AmqpTcpEndpoint>();
            this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port)));
        }
        protected RabbitBase(params (string, int)[] hostAndPorts)
        {
            if (hostAndPorts == null || hostAndPorts.Length == 0)
            {
                throw new ArgumentException("invalid hosts!", nameof(hostAndPorts));
            }

            this.amqpList = new List<AmqpTcpEndpoint>();
            this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2)));
        }

        /// <summary>
        /// 端口
        /// </summary>
        public int Port { get; set; } = 5672;
        /// <summary>
        /// 賬號
        /// </summary>
        public string UserName { get; set; } = ConnectionFactory.DefaultUser;
        /// <summary>
        /// 密碼
        /// </summary>
        public string Password { get; set; } = ConnectionFactory.DefaultPass;
        /// <summary>
        /// 虛擬機
        /// </summary>
        public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost;

        /// <summary>
        /// 釋放
        /// </summary>
        public virtual void Dispose()
        {
            //connection?.Close();
            //connection?.Dispose();
        }
        /// <summary>
        /// 關閉連接
        /// </summary>
        public void Close()
        {
            connection?.Close();
            connection?.Dispose();
        }

        #region Private
        /// <summary>
        /// 獲取rabbitmq的連接
        /// </summary>
        /// <returns></returns>
        protected IModel GetChannel()
        {
            if (connection == null)
            {
                lock (this)
                {
                    if (connection == null)
                    {
                        var factory = new ConnectionFactory();
                        factory.Port = Port;
                        factory.UserName = UserName;
                        factory.VirtualHost = VirtualHost;
                        factory.Password = Password;
                        connection = factory.CreateConnection(this.amqpList);
                    }
                }
            }
            return connection.CreateModel();
        }

        #endregion
    }
}
RabbitBase
  
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace RabbitMQ.ConsoleApp
{
    public class RabbitMQProducer : RabbitBase
    {
        public RabbitMQProducer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQProducer(params (string,int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        #region 普通模式、Work模式
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="message"></param>
        /// <param name="options"></param>
        public void Publish(string queue, string message, QueueOptions options = null)
        {
            options = options ?? new QueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var buffer = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", queue, null, buffer);
            channel.Close();
        }
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="message"></param>
        /// <param name="configure"></param>
        public void Publish(string queue, string message, Action<QueueOptions> configure)
        {
            QueueOptions options = new QueueOptions();
            configure?.Invoke(options);
            Publish(queue, message, options);
        }
        #endregion
        #region 訂閱模式、路由模式、Topic模式
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        /// <param name="options"></param>
        public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null)
        {
            options = options ?? new ExchangeQueueOptions();
            var channel = GetChannel();
            channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.QueueAndRoutingKey != null)
            {
                foreach (var t in options.QueueAndRoutingKey)
                {
                    if (!string.IsNullOrEmpty(t.Item1))
                    {
                        channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>());
                    }
                }
            }
            var buffer = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange, routingKey, null, buffer);
            channel.Close();
        }
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="message"></param>
        /// <param name="configure"></param>
        public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure)
        {
            ExchangeQueueOptions options = new ExchangeQueueOptions();
            configure?.Invoke(options);
            Publish(exchange, routingKey, message, options);
        }
        #endregion
    }
}
RabbitMQProducer
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    public class RabbitMQConsumer : RabbitBase
    {
        public RabbitMQConsumer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        public event Action<RecieveResult> Received;

        /// <summary>
        /// 構造消費者
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                try
                {
                    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                    if (!options.AutoAck)
                    {
                        cancellationTokenSource.Token.Register(() =>
                        {
                            channel.BasicAck(e.DeliveryTag, false);
                        }); 
                    }
                    Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
                }
                catch { }
            };
            if (options.FetchCount != null)
            {
                channel.BasicQos(0, options.FetchCount.Value, false);
            }
            return consumer;
        }

        #region 普通模式、Work模式
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
        {
            options = options ?? new ConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
        {
            ConsumeQueueOptions options = new ConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(queue, options);
        }
        #endregion
        #region 訂閱模式、路由模式、Topic模式
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
        {
            options = options ?? new ExchangeConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
            {
                foreach (var key in options.RoutingKeys)
                {
                    channel.QueueBind(queue, exchange, key, options.BindArguments);
                }
            }
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
        {
            ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(exchange, queue, options);
        }
        #endregion
    }
    public class RecieveResult
    {
        CancellationTokenSource cancellationTokenSource;
        public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
        {
            this.Body = Encoding.UTF8.GetString(arg.Body);
            this.ConsumerTag = arg.ConsumerTag;
            this.DeliveryTag = arg.DeliveryTag;
            this.Exchange = arg.Exchange;
            this.Redelivered = arg.Redelivered;
            this.RoutingKey = arg.RoutingKey;
            this.cancellationTokenSource = cancellationTokenSource;
        }

        /// <summary>
        /// 消息體
        /// </summary>
        public string Body { get; private set; }
        /// <summary>
        /// 消費者標簽
        /// </summary>
        public string ConsumerTag { get; private set; }
        /// <summary>
        /// Ack標簽
        /// </summary>
        public ulong DeliveryTag { get; private set; }
        /// <summary>
        /// 交換機
        /// </summary>
        public string Exchange { get; private set; }
        /// <summary>
        /// 是否Ack
        /// </summary>
        public bool Redelivered { get; private set; }
        /// <summary>
        /// 路由
        /// </summary>
        public string RoutingKey { get; private set; }

        public void Commit()
        {
            if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;

            cancellationTokenSource.Cancel();
            cancellationTokenSource.Dispose();
            cancellationTokenSource = null;
        }
    }
    public class ListenResult
    {
        CancellationTokenSource cancellationTokenSource;

        /// <summary>
        /// CancellationToken
        /// </summary>
        public CancellationToken Token { get { return cancellationTokenSource.Token; } }
        /// <summary>
        /// 是否已停止
        /// </summary>
        public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }

        public ListenResult()
        {
            cancellationTokenSource = new CancellationTokenSource();
        }

        /// <summary>
        /// 停止監聽
        /// </summary>
        public void Stop()
        {
            cancellationTokenSource.Cancel();
        }
    }
}
RabbitMQConsumer

  測試Demo  

  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue = "queue1";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    producer.Publish(queue, message, options => { options.Arguments = arguments; });

                } while (true);
            }
        }
    }
}
普通模式
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue = "queue1";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者1
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者1接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                        options.FetchCount = 1;
                    });
                }
            }).Start();

            //消費者2
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者2接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                        options.FetchCount = 2;
                    });
                }
            }).Start();

            //消息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    producer.Publish(queue, message, options => { options.Arguments = arguments; });

                } while (true);
            }
        }
    }
}
Work模式
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue1 = "queue1";
            string queue2 = "queue2";
            string exchange = "demo.fanout";
            string exchangeType = RabbitMQExchangeType.Fanout;
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者1
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者1接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue1, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消費者2
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者2接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue2, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    producer.Publish(exchange, "", message, options => { options.Type = exchangeType; });

                } while (true);
            }
        }
    }
}
發布訂閱模式
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue1 = "queue1";
            string queue2 = "queue2";
            string exchange = "demo.direct";
            string exchangeType = RabbitMQExchangeType.Direct;
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者1
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者1接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue1, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消費者2
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者2接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue2, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                int index = 1;
                string[] routes = new string[] { "apple", "banana" };
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    var route = routes[index++ % 2];
                    producer.Publish(exchange, route, message, options => { options.Type = exchangeType; });

                } while (true);
            }
        }
    }
}
路由模式
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue1 = "queue1";
            string queue2 = "queue2";
            string exchange = "demo.topic";
            string exchangeType = RabbitMQExchangeType.Topic;
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者1
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者1接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue1, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消費者2
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者2接收到數據:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue2, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //消息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                int index = 1;
                string[] routes = new string[] { "apple.", "banana." };
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    var route = routes[index % 2] + index++;
                    producer.Publish(exchange, route, message, options => { options.Type = exchangeType; });

                } while (true);
            }
        }
    }
}
主題模式

  上面是我自己做的封裝,因為RabbitMQ.Client功能齊全,但是使用比較麻煩,需要編寫的代碼多一些,推薦一下第三方對rabbitmq的封裝插件:EasyNetQ,它是建立在RabbitMQ.Client上的,多數時候可以直接通過EasyNetQ就可以完成消息發布與消費,感興趣的可以了解一下


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM