一. RabbitMq基本使用
1. 條件准備
(1).通過指令【net start rabbitmq】啟動服務
(2).准備1個生產者程序Producer, 1個消費者程序Consumer01
(3).通過Nuget給三個程序安裝 【RabbitMQ.Client 6.2.1】
(4).通過地址:http://127.0.0.1:15672 訪問RabbitMq的管理系統,進行監控,賬號和密碼都是guest
(5).設置程序的啟動順序,先啟動Producer,然后延遲2s啟動Consumer01
2. 核心代碼剖析
(1). 創建連接工廠ConnectionFactory,指定HostName、UserName、Password(連接地址、賬號、密碼),也可以指定VirtualHost。
PS:默認情況向,RabbitMq的信息都是在“/”這一虛擬機中,比如我可以指定Virtual為“/ypf”,當然需要先去可視化界面中創建/ypf,否則程序會報錯
(關於 RabbitMq、Queue、Exchange、Virtual之間的關系,詳見第一節:xxxxx)
(2).創建連接 factory.CreateConnection() 和 創建傳輸信道 connection.CreateModel()
(3).創建隊列: QueueDeclare
channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
A. queue: 隊列名稱
B. durable:是否持久化到硬盤, true 則設置隊列為持久化,持久化的隊列會存磁盤,在服務器重啟的時候可以保證不丟失相關信息。
C.exclusive:設置隊列是否排他。為 true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。
這里需要注意 3點:
① 排他隊列是基於連接( Connection) 可見的,同 個連接的不同信道 (Channel) 是可以同時訪問同一連接創建的排他隊列;
② "首次"是指如果1個連接己經聲明了 排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:
③ 即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景
D. autoDelete:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 自動刪除
E. arguments:設置隊列的其他一些參數,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes 等等。
(4).創建交換機(交換機):ExchangeDeclare
channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
A. exchange:交換機名稱。
B. type:交換機類型,主要有(Direct、Fanout、Topic、Header)
C. durable:是否持久化到磁盤
D. autoDelete:自動刪除 ,至少有一個隊列與這個交換機綁定,之后,所有與這個交換機綁定的隊列都與此解綁,才會觸發刪除
E. arguments:設置交換機的一些參數。
(5).隊列和交換機綁定 :QueueBind
channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);
A. queue:需要綁定的隊列名稱
B. exchange:需要綁定的交換機名稱
C. routingKey:路由key,用於指定發送到隊列的規則。
D. arguments:設置一些參數
(6).發送消息:BasicPublish
IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; //配置消息持久化 //basicProperties.DeliveryMode = 2; string message = $"ypf{i}"; byte[] body = Encoding.UTF8.GetBytes(message); //發消息(不指定路由key) channel.BasicPublish(exchange: "SimpleProducerExChange", routingKey: string.Empty, basicProperties: basicProperties,body: body);
A. queue:需要綁定的隊列名稱
B. exchange:需要綁定的交換機名稱
C. routingKey:綁定路由key,用於指定發送的規則。
D. arguments:設置一些參數
(7).接收消息:事件模式,BasicConsume+Received
//channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"消費者01 接收消息: {message}"); }; channel.BasicConsume(queue: "SimpleProducerQueue", autoAck: true, consumer: consumer); //進行消費
A. queue:消費的隊列(這里只能指定一個隊列哦)
B. autoAck:true 接收到傳遞過來的消息后acknowledged(應答服務器),false 接收到消息后不應答服務器.
C. consumer:指定消費者。
注意:消費者可以不用再次聲明 路由、交換機、綁定,前提是生產者已經執行,該消費的隊列在RabbitMq中已經存在了。
二. 幾個場景
1. 生產者-消費者
(1). 1個生產者-1個消費者
模擬:生產者生產的同時,消費者進行消費。
剖析:這里采用的是ExchangeType.Direct,但是綁定的時候不指定路由key

生產者代碼
{ //設置控制台的顏色 Console.ForegroundColor = ConsoleColor.Red; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 //factory.VirtualHost = "/ypf"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //創建隊列 channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //創建交換機(Direct路由) channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //隊列和路由綁定 channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null); Console.WriteLine("------------------------------下面是生產者開始生產消息(2s后開始)------------------------------------------"); Thread.Sleep(2000); for (int i = 1; i <= 100; i++) { IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; //basicProperties.DeliveryMode = 2; string message = $"ypf{i}"; byte[] body = Encoding.UTF8.GetBytes(message); //發消息(不指定路由key) channel.BasicPublish(exchange: "SimpleProducerExChange", routingKey: string.Empty, basicProperties: basicProperties, body: body); Console.WriteLine($"消息:{message} 已發送~"); Thread.Sleep(500); } } } }
消費者代碼
{ Thread.Sleep(2000); //休眠兩秒,等待生產者 Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { try { //channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"消費者01 接收消息: {message}"); }; channel.BasicConsume(queue: "SimpleProducerQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }
運行效果

(2). 多個生產者-多個消費者
模擬:1個隊列,利用多線程開始多個生產者生產的同時,多個消費者進行消費。
剖析:這里采用的是ExchangeType.Direct,但是綁定的時候不指定路由key

生產者代碼:
/// <summary> /// 模擬多個生產者 /// </summary> public class ManyProducer { /// <summary> /// 生產者 /// </summary> /// <param name="producerName">生產者名稱</param> /// <param name="num">模擬消息內容</param> public static void Show(string producerName, int num) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null); Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"生產者{producerName}已准備就緒~~~"); for (int i = num; i <= num + 100; i++) { string message = $"生產者{producerName}:消息{i}"; byte[] body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, basicProperties: null, body: body); Console.WriteLine($"消息:{message} 已發送~"); Thread.Sleep(800); } } } } }
{ //模擬多個生產者,向同一個隊列里生產消息(這里使用的一定一個隊列, 路由可以1個或多個) Task.Run(() => { //生產者ypfProducer1從10開始生產消息 ManyProducer.Show("ypfProducer1", 10); }); Task.Run(() => { //生產者ypfProducer2從500開始生產消息 ManyProducer.Show("ypfProducer2", 500); }); }
消費者代碼
/// <summary> /// 模擬多個消費者 /// </summary> public class ManyConsumer { /// <summary> /// 消費者01 /// </summary> public static void Show01() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Console.ForegroundColor = ConsoleColor.Green; try { //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"消費者001 接受消息: {message}"); }; channel.BasicConsume(queue: "ManyProducerConsumerQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit. 001"); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } } /// <summary> /// 消費者02 /// </summary> public static void Show02() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); Console.ForegroundColor = ConsoleColor.Green; try { //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"消費者002 接受消息: {message}"); }; channel.BasicConsume(queue: "ManyProducerConsumerQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit. 002"); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } /// <summary> /// 消費者03 /// </summary> public static void Show03() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); Console.ForegroundColor = ConsoleColor.Green; try { //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"消費者003 接受消息: {message}"); }; channel.BasicConsume(queue: "ManyProducerConsumerQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit 003."); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } }
{ Thread.Sleep(2000); //休眠2秒,等待生產者 Task.Run(() => { ManyConsumer.Show01(); }); Task.Run(() => { ManyConsumer.Show02(); }); Task.Run(() => { ManyConsumer.Show03(); }); }
運行效果

2. 優先級隊列
模擬:用戶購買東西,依次下單,進行排隊,但是svip級別最高,可以先拿到東西,vip級別次之,普通用戶最后。
剖析:這里采用的是ExchangeType.Direct,指定路由key,通過對了的arguments參數配置支持優先級隊列,然后發送消息的時候,通過Priority設置級別,數值越大級別越高。
生產者代碼:
/// <summary> /// 優先級隊列 /// </summary> public class PriorityQueue { public static void Show() { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { {"x-max-priority",10 } //指定隊列要支持優先級設置; }); channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey"); Console.ForegroundColor = ConsoleColor.Red; //下面發送消息,並設置消息的優先級 { string[] questionList = { "普通用戶A購買東西", "vip用戶B購買東西", "普通用戶C購買東西", "普通用戶D購買東西", "svip用戶F購買東西", "vip用戶G購買東西" }; //設置消息優先級 IBasicProperties props = channel.CreateBasicProperties(); foreach (string questionMsg in questionList) { //包含vip的優先級設置的高一些 if (questionMsg.StartsWith("svip")) { props.Priority = 9; //svip設置級別最高 channel.BasicPublish(exchange: "PriorityQueueExchange", routingKey: "PriorityKey", basicProperties: props, body: Encoding.UTF8.GetBytes(questionMsg)); } else if (questionMsg.StartsWith("vip")) { props.Priority = 5; //vip級別次之 channel.BasicPublish(exchange: "PriorityQueueExchange", routingKey: "PriorityKey", basicProperties: props, body: Encoding.UTF8.GetBytes(questionMsg)); } else { props.Priority = 1; //普通用戶最后 channel.BasicPublish(exchange: "PriorityQueueExchange", routingKey: "PriorityKey", basicProperties: props, body: Encoding.UTF8.GetBytes(questionMsg)); } Console.WriteLine($"{questionMsg} 已發送~~"); } } Console.Read(); } } } }
{
PriorityQueue.Show();
}
消費者代碼:
/// <summary> /// 優先級隊列,消費者 /// </summary> public class PriorityQueue { public static void Show() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { // {"x-max-priority",10 } //指定隊列要支持優先級設置; // }); //channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey"); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string msg = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(msg); Thread.Sleep(300); }; Console.WriteLine("消費者准備就緒...."); //處理消息 channel.BasicConsume(queue: "PriorityQueue", autoAck: true, consumer: consumer); Console.ReadKey(); } } } }
{ Thread.Sleep(3000); PriorityQueue.Show(); }
運行結果:svip雖然是第5個下單的,但是消費的時候是第1個消費的,然后vip次之,普通用戶最后。

3. 發布訂閱模式
也可以叫做觀察者模式,實質上就是一個交換機綁定多個隊列,每個隊列就是一個訂閱者,發布者每發布一條消息,同時向多個訂閱者的隊列中發送消息,然后每個訂閱者分別去自己的隊列中消費即可。
剖析:這里采用Fanout交換機的模式處理這個場景最為恰當(下一節會詳細介紹)
發布者代碼
/// <summary> /// 發布訂閱模式-發布者 /// </summary> public class PublishSubscribeConsumer { public static void Show() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "PublishSubscrib01", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "PublishSubscrib02", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "PublishSubscrib01", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null); channel.QueueBind(queue: "PublishSubscrib02", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null); Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("開始發布消息~~~~"); for (int i = 1; i <= 20; i++) { string message = $"發布第{i}條消息..."; byte[] body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "PublishSubscribExChange", routingKey: string.Empty, basicProperties: null, body: body); Console.WriteLine(message); Thread.Sleep(200); } } } } }
{
PublishSubscribeConsumer.Show();
}
訂閱者代碼
/// <summary> /// 發布訂閱-訂閱者 /// </summary> public class PublishSubscribeConsumer { /// <summary> /// 訂閱者1 /// </summary> public static void Show1() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Console.ForegroundColor = ConsoleColor.Green; //channel.QueueDeclare(queue: "PublishSubscrib01", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "PublishSubscrib01", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null); Console.WriteLine("訂閱者01 已經准備就緒~~"); try { var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"訂閱者01收到消息:{message} ~"); }; channel.BasicConsume(queue: "PublishSubscrib01", autoAck: true, consumer: consumer); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } } /// <summary> /// 訂閱者2 /// </summary> public static void Show2() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Console.ForegroundColor = ConsoleColor.Green; //channel.QueueDeclare(queue: "PublishSubscrib02", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "PublishSubscrib02", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null); Console.WriteLine("訂閱者02 已經准備就緒~~"); try { var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"訂閱者02收到消息:{message} ~"); }; channel.BasicConsume(queue: "PublishSubscrib02", autoAck: true, consumer: consumer); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } } }
{ Thread.Sleep(2000); Task.Run(() => { PublishSubscribeConsumer.Show1(); }); Task.Run(() => { PublishSubscribeConsumer.Show2(); }); }
運行結果 (訂閱者1和訂閱者2分別拿到自己的消息)

!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
