消息隊列的地位越來越重要,幾乎是面試的必問問題了,不會使用幾種消息隊列都顯得尷尬,正好本文使用C#來帶你認識rabbitmq消息隊列
首先,我們要安裝rabbitmq,當然,如果有現成的,也可以使用,不知道曾幾何時,我喜歡將數據庫等等軟件安裝在linux虛擬機,如果沒現成的rabbitmq,按照下面的來吧,嘿嘿
rabbitmq安裝:https://www.cnblogs.com/shanfeng1000/p/11951703.html
如果要實現rabbitmq集群,參考:https://www.cnblogs.com/shanfeng1000/p/12097054.html
我這里使用的是rabbitmq集群,但是沒有比較,只是已經安裝好了,就直接使用算了
虛擬機集群地址:192.168.209.133,192.168.209.134,192.168.209.135
端口使用的默認端口,都是5672,也就是AMQP協議端口
Rabbitmq的工作模式
先說說幾個概念
生產者(producer):負責生產消息,可以有多個生產者,可以理解為生成消息的那部分邏輯
消費者(consumer):從隊列中獲取消息,對消息處理的那部分邏輯
隊列(queue):用於存放消息,可以理解為先進先出的一個對象
交換機(exchange):顧名思義,就是個中介的角色,將接收到的消息按不同的規則轉發到其他交換機或者隊列中
路由(route):就是交換機分發消息的規則,交換機可以指定路由規則,生產者在發布消息時也可以指定消息路由,比如交換機中設置A路由表示將消息轉發到隊列1,B路由表示將消息轉發到隊列2,那么當交換機接收到消息時,如果消息的路由滿足A路由,則將消息轉發到隊列1,如果滿足B路由則將消息轉發到隊列2
虛擬主機(virtual host):虛擬地址,用於進行邏輯隔離,一個虛擬主機里面可以有若干個 exchange 和 queue,但是里面不能有相同名稱的 exchange 或 queue
再看看rabbitmq的幾種工作模式,具體可參考rabbitmq官網給出的Demo:https://www.rabbitmq.com/getstarted.html
其中,第6中類似我們常用的請求-響應模式,但是使用的RPC請求響應,用的比較少,這里就不過多解釋,感興趣的可以參考官網文檔:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html。
總的來說,就是生產者將消息發布到rabbitmq上,然后消費者連接rabbitmq,獲取到消息就消費,但是有幾點說明一下
1、rabbitmq中的消息是可被多次消費的,因為rabbitmq提供了ack機制,當消費者在消費消息時,如果將自動ack設置成false,那么需要手動提交ack才能告訴rabbitmq消息已被使用,否則當通道關閉時,消息會繼續呆在隊列中等待消費
2、當存在多個消費者時,默認情況下,一個消費者獲取一個消息,處理完成后再獲取下一個,但是rabbitmq消費一次性獲取多個,當然后當這些消息消費完成后,再獲取下一批,這也就是rabbitmq的Qos機制
C#使用rabbitmq
如果感興趣的人多,到時候再單獨開一篇博文,現在就介紹其中的1-5種,也可以分類成兩種:不使用交換機和使用交換機,所以下面就分這兩種來說明
首先,我們創建了兩個Demo項目:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分別使用使用nuget安裝RabbitMQ.Client:
其中RabbitMQ.PublishConsole是用來生產消息,RabbitMQ.ConsumeConsole用來消費消息
這里我們安裝的是最新版本,舊版本和新版本在使用上可能會有一些區別
不使用交換機情形
不使用交換機有兩種模式:簡單模式和工作模式
這里先貼上生產者生成消息的代碼,簡單模式和工作模式這部分測試代碼是一樣的:

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.PublishConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); string queue = "queue1";//隊列名稱 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯 var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); //發布10條消息 for (var i = 0; i < 10; i++) { var buffer = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish("", queue, null, buffer); } channel.Close(); Console.ReadKey(); } } }
上述代碼執行完成后,隊列queue1中就有了10條消息,可以在rabbitmq的后台管理中看到:
代碼中提到,通道在申明隊列時,如果隊列已經存在,則申明的參數一定要對上,否則會拋出異常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10
比如這里,我實現在rabbitmq后台創建了隊列,那么他們的對應關系如下圖:
簡單模式
這個模式很簡單,其實就是只有一個消費者,簡單的保證操作的順序性
接着貼上消費者代碼:

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsumeConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); string queue = "queue1";//隊列名稱 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯 var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); //channel.BasicQos(2, 2, false);//設置QOS //在通道中定義一個事件消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { string message = Encoding.UTF8.GetString(e.Body); Console.WriteLine($"接收到消息:{message}"); Thread.Sleep(500);//暫停一下 //通知消息已被處理,如果沒有,那么消息將會被重復消費 channel.BasicAck(e.DeliveryTag, false); }; //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息 channel.BasicConsume(queue, false, consumer); Console.ReadKey(); } } }
上述代碼執行完成后,在后台管理中可以看到消息被消費掉了
工作模式
工作模式是簡單模式的拓展,如果業務簡單,對消息的消費是一個耗時的過程,這個模式是一個好的選擇。
接着調用生產者代碼生產10條消息,下面是消費者的測試代碼

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsumeConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); Consumer(connection, 1);//消費者1 Consumer(connection, 2);//消費者2 Console.ReadKey(); } static void Consumer(IConnection connection, ushort prefetch) { //使用多線程來執行,可以模擬多個消費者 new Thread(() => { int threadId = Thread.CurrentThread.ManagedThreadId;//線程Id,用於區分消費者 string queue = "queue1";//隊列名稱 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯 var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); //設置消費者每次獲取的消息數,可以用來設置消費者消息的權重 //必須等獲取的消息都消費完成后才能重新獲取 channel.BasicQos(0, prefetch, true); //在通道中定義一個事件消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { string message = Encoding.UTF8.GetString(e.Body); Console.WriteLine($"ThreadId:【{threadId}】 接收到消息:{message}"); Thread.Sleep(500); //通知消息已被處理,如果沒有,那么消息將會被重復消費 channel.BasicAck(e.DeliveryTag, false); }; //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息 channel.BasicConsume(queue, false, consumer); }).Start(); } } }
另外說明一下,代碼中提到rabbitmq的QOS機制,這里簡單解釋一下,當生產者將消息發布到rabbitmq之后,如果在未配置QOS的情況下,rabbitmq盡可能快速地發送隊列中的所有消息到消費者端,如果消息比較多,消費者來不及處理,就會緩存這些消息,當消息堆積過多,可能導致服務器內存不足而影響其他進程,rabbitmq的QOS可以很好的解決這類問題,QOS就是限制消費者一次性從rabbitmq中獲取消息的個數,而不是獲取所有消息。比如設置rabbitmq的QOS為10,也就是prefetch=10,就是說,哪怕rabbitmq中有100條消息,消費者也只是一次性獲取10條,然后消費者消費這10條消息,剩下的交給其他消費者,當10條消息中的unacked個數少於prefetch * 消費者數目時,會繼續從rabbitmq獲取消息,如果在工作模式中,不使用QOS,你會發現,所有的消息都被一個消費者消費了
使用交換機情形
使用交換機的情形有3種:發布訂閱模式,路由模式,主題模式
上面說了,交換機是一個中介的角色,當一個交換機創建后,可以將其他隊列或者交換機與當前交換機綁定,綁定時需要指定綁定路由規則,這個和交換機類型有關。
當我們不使用交換機時,那么生產者是直接將消息發布到隊列中去的,生產者只需要指定消息接收的隊列即可,而使用交換機做中轉時,生產者只需要將消息發布到交換機,然后交換機根據接收到的消息,按與交換機綁定的路由規則,將消息轉發到其他交換機或者隊列中,這個處理過程和交換機的類型有關,交換機一般分為4類:
direct:直連類型,就是將消息的路由和交換機的綁定路由作比較,當兩者一致時,則匹配成功,然后消息就會被轉發到這個綁定路由后的隊列或者交換機
fanout:這種類型的交換機是不需要指定路由的,當交換機接收到消息時,會將消息廣播到所有綁定到它的所有隊列或交換機中
topic:主題類型,類似direct類型,只不過在將消息的路由和綁定路由做比較時,是通過特定表達式去比較的,其中# 匹配一個或多個,* 匹配一個
headers:頭部交換機,允許使用消息頭中的信息來做匹配規則,這個用的少,基本上不用,這里也就不過多介紹了
到這里,你應該發覺,使用交換機的三種情形,無非就是使用交換機的類型不一樣,發布訂閱模式--fanout,路由模式--direct,主題模式--topic
現在我們先去rabbitmq的后台中,創建這幾種交換機:
交換機的創建及綁定都可以在代碼中實現,如IModel類的QueueBind,ExchangeBind等方法,用多了就自然熟了,這里為了方便截圖,就到后台去創建了
然后我們創建兩個隊列,並按指定類型分別綁定到這3個交換機中:
隊列:
demo.direct綁定隊列規則:
demo.fanout綁定隊列規則:
demo.topic綁定隊列規則:
上面所描述的,無非就是三種模式中發布消息方式的不一樣,消費者當然還是從隊列獲取消息消費的,這里我們就先貼出消費者的代碼:

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsumeConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); Consumer(connection, "queue1");//消費者1 Consumer(connection, "queue2");//消費者2 Console.ReadKey(); } static void Consumer(IConnection connection, string queue) { //使用多線程來執行,可以模擬多個消費者 new Thread(() => { int threadId = Thread.CurrentThread.ManagedThreadId;//線程Id,用於區分消費者 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個隊列,隊列如果不存在,則會創建新隊列,如果隊列已存在,那么參數一定要正確,特別是arguments參數,否則會報錯 var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); //在通道中定義一個事件消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { string message = Encoding.UTF8.GetString(e.Body); Console.WriteLine($"ThreadId:【{threadId}】 接收到消息:{message}"); Thread.Sleep(500); //通知消息已被處理,如果沒有,那么消息將會被重復消費 channel.BasicAck(e.DeliveryTag, false); }; //ack設置成false,表示不自動提交,那么就需要在消息被消費后,手動調用BasicAck去提交消息 channel.BasicConsume(queue, false, consumer); }).Start(); } } }
這里我們使用了兩個隊列,每個隊列我們這里只用了一個消費者,對於下面幾種模式,這個消費者代碼都能消費到
發布訂閱模式
發布訂閱模式使用的是fanout類型的交換機,這個類型無需指定路由,交換機會將消息廣播到每個綁定到交換機的隊列或者交換機

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.PublishConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); string exchange = "demo.fanout";//交換機名稱 string exchangeType = "fanout";//交換機類型 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列 var arguments = new Dictionary<string, object>() { }; channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments); //發布10條消息 for (var i = 0; i < 10; i++) { var buffer = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish(exchange, "", null, buffer); } channel.Close(); Console.ReadKey(); } } }
代碼中,我們往交換機發布了10條消息,交換機接收到消息后,會將消息轉發到queue1和queue2,因此,queue1和queue2都會收到10條消息:
路由模式
路由模式使用的是direct類型的交換機,也即在進行路由匹配時,需要匹配的路由一直才算匹配成功,我們把發布訂閱模式的代碼稍作修改即可,貼出生產者部分代碼:

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.PublishConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); string exchange = "demo.direct";//交換機名稱 string exchangeType = "direct";//交換機類型 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列 var arguments = new Dictionary<string, object>() { }; channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments); string[] routes = new string[] { "apple", "banana" }; //發布10條消息 for (var i = 0; i < 10; i++) { var buffer = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish(exchange, routes[i % 2], null, buffer); } channel.Close(); Console.ReadKey(); } } }
代碼中,我們往demo.direct交換機發布了10條消息,其中5條消息的路由是apple,另外5條消息的路由是banana,demo.direct交換機綁定的兩個隊列中,queue1的綁定路由是apple,queue2的綁定路由是banana,那么demo.direct交換機會將路由是apple的消息轉發到queue1,將路由是banana的消息轉發到queue2,從后台可以看每個隊列中已經有5個消息准備好了:
接下來可以使用消費者將它們消費掉
主題模式
主題模式使用的topic類型的交換機,在進行匹配時,是根據表達式去匹配,# 匹配一個或多個,* 匹配一個,我們將路由模式的代碼稍作修改:

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.PublishConsole { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; //創建一個連接工廠 var factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.Port = port; factory.VirtualHost = virtualHost; //創建一個連接,此時可以在rabbitmq后台Web管理頁面中的Connections中看到一個連接生成 //一個連接可以創建多個通道 var connection = factory.CreateConnection(hosts); string exchange = "demo.topic";//交換機名稱 string exchangeType = "topic";//交換機類型 //創建一個通道 //此時可以在rabbitmq后台Web管理頁面中的Channels中看到一個新通道生成 var channel = connection.CreateModel(); //給通道綁定一個交換機,交換機如果不存在,則會創建新交換機,如果交換機已存在,那么參數一定要正確,特別是arguments參數,各參數類似隊列 var arguments = new Dictionary<string, object>() { }; channel.ExchangeDeclare(exchange: exchange, type: exchangeType, durable: true, autoDelete: false, arguments: arguments); string[] routes = new string[] { "apple.", "banana." }; //發布10條消息 for (var i = 0; i < 10; i++) { var buffer = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish(exchange, routes[i % 2] + i, null, buffer); } channel.Close(); Console.ReadKey(); } } }
代碼中,我們往demo.topic交換機中發布了10條消息,其中5條消息的路由是以apple開頭的,另外5條消息的路由是以banana開頭的,demo.direct交換機綁定的兩個隊列中,queue1的綁定路由是apple.#,就是匹配以apple開頭的路由,queue2的綁定路由是banana.#,就是匹配以banana開頭的路由,那么demo.direct交換機會將路由是以apple開頭的的消息轉發到queue1,將路由是以banana開頭的的消息轉發到queue2,從后台可以看每個隊列中已經有5個消息准備好了:
封裝
其實rabbitmq的使用還是比較簡單的,只需要多謝謝代碼嘗試一下就能熟悉
一般的,像這種第三方插件的調用,我建議自己要做一層封裝,最好是根據自己的需求去封裝,然后項目中只需要調用自己封裝的類就行了,下面貼出我自己封裝的類:

using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.ConsoleApp { public class QueueOptions { /// <summary> /// 是否持久化 /// </summary> public bool Durable { get; set; } = true; /// <summary> /// 是否自動刪除 /// </summary> public bool AutoDelete { get; set; } = false; /// <summary> /// 參數 /// </summary> public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>(); } public class ConsumeQueueOptions : QueueOptions { /// <summary> /// 是否自動提交 /// </summary> public bool AutoAck { get; set; } = false; /// <summary> /// 每次發送消息條數 /// </summary> public ushort? FetchCount { get; set; } } public class ExchangeConsumeQueueOptions : ConsumeQueueOptions { /// <summary> /// 路由值 /// </summary> public string[] RoutingKeys { get; set; } /// <summary> /// 參數 /// </summary> public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>(); } public class ExchangeQueueOptions : QueueOptions { /// <summary> /// 交換機類型 /// </summary> public string Type { get; set; } /// <summary> /// 隊列及路由值 /// </summary> public (string,string)[] QueueAndRoutingKey { get; set; } /// <summary> /// 參數 /// </summary> public IDictionary<string, object> BindArguments { get; set; } = new Dictionary<string, object>(); } }

using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.ConsoleApp { public static class RabbitMQExchangeType { /// <summary> /// 普通模式 /// </summary> public const string Common = ""; /// <summary> /// 路由模式 /// </summary> public const string Direct = "direct"; /// <summary> /// 發布/訂閱模式 /// </summary> public const string Fanout = "fanout"; /// <summary> /// 匹配訂閱模式 /// </summary> public const string Topic = "topic"; } }

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace RabbitMQ.ConsoleApp { public abstract class RabbitBase : IDisposable { List<AmqpTcpEndpoint> amqpList; IConnection connection; protected RabbitBase(params string[] hosts) { if (hosts == null || hosts.Length == 0) { throw new ArgumentException("invalid hosts!", nameof(hosts)); } this.amqpList = new List<AmqpTcpEndpoint>(); this.amqpList.AddRange(hosts.Select(host => new AmqpTcpEndpoint(host, Port))); } protected RabbitBase(params (string, int)[] hostAndPorts) { if (hostAndPorts == null || hostAndPorts.Length == 0) { throw new ArgumentException("invalid hosts!", nameof(hostAndPorts)); } this.amqpList = new List<AmqpTcpEndpoint>(); this.amqpList.AddRange(hostAndPorts.Select(tuple => new AmqpTcpEndpoint(tuple.Item1, tuple.Item2))); } /// <summary> /// 端口 /// </summary> public int Port { get; set; } = 5672; /// <summary> /// 賬號 /// </summary> public string UserName { get; set; } = ConnectionFactory.DefaultUser; /// <summary> /// 密碼 /// </summary> public string Password { get; set; } = ConnectionFactory.DefaultPass; /// <summary> /// 虛擬機 /// </summary> public string VirtualHost { get; set; } = ConnectionFactory.DefaultVHost; /// <summary> /// 釋放 /// </summary> public virtual void Dispose() { //connection?.Close(); //connection?.Dispose(); } /// <summary> /// 關閉連接 /// </summary> public void Close() { connection?.Close(); connection?.Dispose(); } #region Private /// <summary> /// 獲取rabbitmq的連接 /// </summary> /// <returns></returns> protected IModel GetChannel() { if (connection == null) { lock (this) { if (connection == null) { var factory = new ConnectionFactory(); factory.Port = Port; factory.UserName = UserName; factory.VirtualHost = VirtualHost; factory.Password = Password; connection = factory.CreateConnection(this.amqpList); } } } return connection.CreateModel(); } #endregion } }

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace RabbitMQ.ConsoleApp { public class RabbitMQProducer : RabbitBase { public RabbitMQProducer(params string[] hosts) : base(hosts) { } public RabbitMQProducer(params (string,int)[] hostAndPorts) : base(hostAndPorts) { } #region 普通模式、Work模式 /// <summary> /// 發布消息 /// </summary> /// <param name="queue"></param> /// <param name="message"></param> /// <param name="options"></param> public void Publish(string queue, string message, QueueOptions options = null) { options = options ?? new QueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, buffer); channel.Close(); } /// <summary> /// 發布消息 /// </summary> /// <param name="queue"></param> /// <param name="message"></param> /// <param name="configure"></param> public void Publish(string queue, string message, Action<QueueOptions> configure) { QueueOptions options = new QueueOptions(); configure?.Invoke(options); Publish(queue, message, options); } #endregion #region 訂閱模式、路由模式、Topic模式 /// <summary> /// 發布消息 /// </summary> /// <param name="exchange"></param> /// <param name="routingKey"></param> /// <param name="message"></param> /// <param name="options"></param> public void Publish(string exchange, string routingKey, string message, ExchangeQueueOptions options = null) { options = options ?? new ExchangeQueueOptions(); var channel = GetChannel(); channel.ExchangeDeclare(exchange, string.IsNullOrEmpty(options.Type) ? RabbitMQExchangeType.Fanout : options.Type, options.Durable, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.QueueAndRoutingKey != null) { foreach (var t in options.QueueAndRoutingKey) { if (!string.IsNullOrEmpty(t.Item1)) { channel.QueueBind(t.Item1, exchange, t.Item2 ?? "", options.BindArguments ?? new Dictionary<string, object>()); } } } var buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange, routingKey, null, buffer); channel.Close(); } /// <summary> /// 發布消息 /// </summary> /// <param name="exchange"></param> /// <param name="routingKey"></param> /// <param name="message"></param> /// <param name="configure"></param> public void Publish(string exchange, string routingKey, string message, Action<ExchangeQueueOptions> configure) { ExchangeQueueOptions options = new ExchangeQueueOptions(); configure?.Invoke(options); Publish(exchange, routingKey, message, options); } #endregion } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { public class RabbitMQConsumer : RabbitBase { public RabbitMQConsumer(params string[] hosts) : base(hosts) { } public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts) { } public event Action<RecieveResult> Received; /// <summary> /// 構造消費者 /// </summary> /// <param name="channel"></param> /// <param name="options"></param> /// <returns></returns> private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options) { EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { try { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); if (!options.AutoAck) { cancellationTokenSource.Token.Register(() => { channel.BasicAck(e.DeliveryTag, false); }); } Received?.Invoke(new RecieveResult(e, cancellationTokenSource)); } catch { } }; if (options.FetchCount != null) { channel.BasicQos(0, options.FetchCount.Value, false); } return consumer; } #region 普通模式、Work模式 /// <summary> /// 消費消息 /// </summary> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string queue, ConsumeQueueOptions options = null) { options = options ?? new ConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消費消息 /// </summary> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure) { ConsumeQueueOptions options = new ConsumeQueueOptions(); configure?.Invoke(options); return Listen(queue, options); } #endregion #region 訂閱模式、路由模式、Topic模式 /// <summary> /// 消費消息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null) { options = options ?? new ExchangeConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange)) { foreach (var key in options.RoutingKeys) { channel.QueueBind(queue, exchange, key, options.BindArguments); } } var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消費消息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure) { ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions(); configure?.Invoke(options); return Listen(exchange, queue, options); } #endregion } public class RecieveResult { CancellationTokenSource cancellationTokenSource; public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource) { this.Body = Encoding.UTF8.GetString(arg.Body); this.ConsumerTag = arg.ConsumerTag; this.DeliveryTag = arg.DeliveryTag; this.Exchange = arg.Exchange; this.Redelivered = arg.Redelivered; this.RoutingKey = arg.RoutingKey; this.cancellationTokenSource = cancellationTokenSource; } /// <summary> /// 消息體 /// </summary> public string Body { get; private set; } /// <summary> /// 消費者標簽 /// </summary> public string ConsumerTag { get; private set; } /// <summary> /// Ack標簽 /// </summary> public ulong DeliveryTag { get; private set; } /// <summary> /// 交換機 /// </summary> public string Exchange { get; private set; } /// <summary> /// 是否Ack /// </summary> public bool Redelivered { get; private set; } /// <summary> /// 路由 /// </summary> public string RoutingKey { get; private set; } public void Commit() { if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return; cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); cancellationTokenSource = null; } } public class ListenResult { CancellationTokenSource cancellationTokenSource; /// <summary> /// CancellationToken /// </summary> public CancellationToken Token { get { return cancellationTokenSource.Token; } } /// <summary> /// 是否已停止 /// </summary> public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } } public ListenResult() { cancellationTokenSource = new CancellationTokenSource(); } /// <summary> /// 停止監聽 /// </summary> public void Stop() { cancellationTokenSource.Cancel(); } } }
測試Demo

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue = "queue1"; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } producer.Publish(queue, message, options => { options.Arguments = arguments; }); } while (true); } } } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue = "queue1"; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者1 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者1接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue, options => { options.AutoAck = false; options.Arguments = arguments; options.FetchCount = 1; }); } }).Start(); //消費者2 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者2接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue, options => { options.AutoAck = false; options.Arguments = arguments; options.FetchCount = 2; }); } }).Start(); //消息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } producer.Publish(queue, message, options => { options.Arguments = arguments; }); } while (true); } } } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue1 = "queue1"; string queue2 = "queue2"; string exchange = "demo.fanout"; string exchangeType = RabbitMQExchangeType.Fanout; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者1 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者1接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue1, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消費者2 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者2接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue2, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } producer.Publish(exchange, "", message, options => { options.Type = exchangeType; }); } while (true); } } } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue1 = "queue1"; string queue2 = "queue2"; string exchange = "demo.direct"; string exchangeType = RabbitMQExchangeType.Direct; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者1 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者1接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue1, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消費者2 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者2接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue2, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; int index = 1; string[] routes = new string[] { "apple", "banana" }; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } var route = routes[index++ % 2]; producer.Publish(exchange, route, message, options => { options.Type = exchangeType; }); } while (true); } } } }

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue1 = "queue1"; string queue2 = "queue2"; string exchange = "demo.topic"; string exchangeType = RabbitMQExchangeType.Topic; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者1 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者1接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue1, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消費者2 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者2接收到數據:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue2, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //消息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; int index = 1; string[] routes = new string[] { "apple.", "banana." }; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } var route = routes[index % 2] + index++; producer.Publish(exchange, route, message, options => { options.Type = exchangeType; }); } while (true); } } } }
上面是我自己做的封裝,因為RabbitMQ.Client功能齊全,但是使用比較麻煩,需要編寫的代碼多一些,推薦一下第三方對rabbitmq的封裝插件:EasyNetQ,它是建立在RabbitMQ.Client上的,多數時候可以直接通過EasyNetQ就可以完成消息發布與消費,感興趣的可以了解一下