RabbitMQ系列
RabbitMQ(五)——發布訂閱模式
前言
上一章的工作隊列模式中,生產者發布的一堆消息進入隊列,消費者接收隊列中的消息,每條消息只能發給一個消費者。
本章要做的是吧一條消息發送給多個消費者,這種模式就是Fanout Exchange(扇形交換機)“發布/訂閱模式”,它會將消息路由給綁定到它身上的所有隊列。
注意:該模式定義隊列時durable:false沒有存儲消息功能,如果消息發送到沒有綁定消費隊列的交換器,消息丟失,也就是說,當消息生產者發送消息時,消費者還沒有綁定此交換器,則沒有接收到消息,且接收不到了;durable:true只要沒被消費就一直存在隊列中。
交換器:
在RabbitMQ中完整的消息傳遞並不是生產者直接將消息發送到隊列,生產者甚至不不知道消息是否會進入隊列。正確的模型是生產者把消息發送給交換器(Exchange),交換器會將接受到的消息轉發到隊列中,交換器必須確定將消息發送到哪些隊列。
交換器類型(詳情):
-
-
- Fanout
- Direct
- Topic
- Header
-
fanout類型即發布訂閱模式,它會把收到的消息廣播到它綁定的隊列中。
channel.ExchangeDeclare("exchange", "fanout");
之前我們發布的做法是這樣的:
channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
前面我們並沒有聲明交換器,之所以還能把消息發送到隊列是因為用了“”空的字符串標識了默認或匿名交換器。BasicPublish方法第一個參數就是交換器,第二個參數是routekey,消息通過本交換器進入到routekey隊列。
發送消息到指定交換器:
channel.BasicPublish("exchange", "simple", null, Encoding.UTF8.GetBytes(msg));
創建好fanout交換器后,定義多個隊列,然后將交換器和隊列綁定:
channel.QueueBind("simple", "exchange", "");
代碼
生產者:

static void Main(string[] args) { Console.WriteLine("FanoutServer發布服務器啟動..."); //1.創建連接工廠 ConnectionFactory factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.創建連接 using (var connection = factory.CreateConnection()) //3.創建通道 using (var channel = connection.CreateModel()) { //4.創建交換器 channel.ExchangeDeclare("exchange", "fanout"); string msg = ""; for (int i = 0; i < 100; i++) { msg = $"發布消息{i}"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("exchange", "", null, body); Console.WriteLine($"發布成功:{msg}"); Thread.Sleep(600); } Console.ReadKey(); } }
消費者:

static void Main(string[] args) { Console.WriteLine("FanoutClient接收客戶端啟動..."); //1.創建連接工廠 ConnectionFactory factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest" }; //2.創建連接 using (var connection = factory.CreateConnection()) { //3.創建通道 using (var channel = connection.CreateModel()) { //4.定義交換器 channel.ExchangeDeclare("exchange", "fanout"); //5.創建匿名隊列,綁定交換器 //var queueName = channel.QueueDeclare("simple"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, "exchange", ""); //6.創建消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { //接收消息 var body = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"接收消息:{body.ToString()}"); }; //7.消費消息 channel.BasicConsume(queueName, true, consumer); Console.ReadKey(); } } }
生產者創建完通道后,創建一個交換器,給它命名“exchange”以及設定“fanout”類型,最后發布消息到此交換器,與前面的模式不同的是不用設定隊列參數。
消費者創建完通道后,創建一個交換器,與生產者的交換器一致,定義一個隊列(可以指定,或者使用匿名),然后將此隊列與交換器綁定,消息會在此隊列中消費,最后創建消費者進行消費。
效果
先啟動2個消費者,再啟動生產者,消息完全接受到。
先啟動生產者,再分別啟動消費者,只能接受到消費者啟動之后生產發送的消息
發布訂閱模式就先到這,有什么不對的地方望斧正~
附上Demo地址:https://github.com/1164887865/RabbitMQDemo