一. Direct-Exchange模式
1. 含義
交換機類型設置為:ExchangeType.Direct
交換機和隊列通過routingKey(路由key)進行綁定,發消息的時候每條消息也要指定routingKey(路由key),然后交換機根據該路由key進行匹配,該key綁定了幾個Queue,那么該條消息就同時發送到幾個隊列中。
2. 使用場景
通過消息隊列來寫日志;
Info debug error warn :記錄下來
error: 除了記錄下來,還需要特殊處理,可能需要發送一個信息,發送一個郵件;
解決方案:通過路由key匹配不同的隊列
隊列1:專門用來記錄日志
隊列2:專門用來發郵件,發信息
3. 代碼分享
生產者

/// <summary> /// DirectExchange路由 /// </summary> public class DirectExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //聲明兩個隊列 channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //1個路由 channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //4種路由key統一綁定DirectExchangeLogAllQueue隊列, string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string logtype in logtypes) { channel.QueueBind(queue: "DirectExchangeLogAllQueue", exchange: "DirectExChange", routingKey: logtype); } //路由key“error”再次綁定DirectExchangeErrorQueue隊列 channel.QueueBind(queue: "DirectExchangeErrorQueue", exchange: "DirectExChange", routingKey: "error"); List<LogMsgModel> logList = new List<LogMsgModel>(); for (int i = 1; i <=20; i++) { if (i % 4 == 0) { logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}條信息") }); } if (i % 4 == 1) { logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}條信息") }); } if (i % 4 == 2) { logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}條信息") }); } if (i % 4 == 3) { logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}條信息") }); } } Console.WriteLine("生產者發送20條日志信息"); //發送日志信息 foreach (var log in logList) { channel.BasicPublish(exchange: "DirectExChange", routingKey: log.LogType, basicProperties: null, body: log.Msg); Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已發送~~"); } } } } public class LogMsgModel { public string LogType { get; set; } public byte[] Msg { get; set; } } }
DirectExchange.Show();
消費者

1 public class DirectExchange 2 { 3 /// <summary> 4 /// 隊列1--用於各種類型日志信息 5 /// </summary> 6 public static void Show1() 7 { 8 Console.ForegroundColor = ConsoleColor.Green; 9 10 var factory = new ConnectionFactory(); 11 factory.HostName = "localhost";//RabbitMQ服務在本地運行 12 factory.UserName = "guest";//用戶名 13 factory.Password = "guest";//密碼 14 using (var connection = factory.CreateConnection()) 15 { 16 using (IModel channel = connection.CreateModel()) 17 { 18 //channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 19 //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); 20 //string[] logtypes = new string[] { "debug", "info", "warn", "error" }; 21 //foreach (string logtype in logtypes) 22 //{ 23 // channel.QueueBind(queue: "DirectExchangeLogAllQueue", 24 // exchange: "DirectExChange", 25 // routingKey: logtype); 26 //} 27 //消費隊列中的所有消息; 28 var consumer = new EventingBasicConsumer(channel); 29 consumer.Received += (model, ea) => 30 { 31 var body = ea.Body; 32 var message = Encoding.UTF8.GetString(body.ToArray()); 33 Console.WriteLine($"【{message}】,寫入文本~~"); 34 }; 35 //處理消息 36 channel.BasicConsume(queue: "DirectExchangeLogAllQueue", autoAck: true, consumer: consumer); 37 Console.ReadLine(); 38 } 39 } 40 } 41 42 43 /// <summary> 44 /// 隊列2--用於error類型日志進行單獨處理 45 /// </summary> 46 public static void Show2() 47 { 48 Console.ForegroundColor = ConsoleColor.Green; 49 50 var factory = new ConnectionFactory(); 51 factory.HostName = "localhost";//RabbitMQ服務在本地運行 52 factory.UserName = "guest";//用戶名 53 factory.Password = "guest";//密碼 54 using (var connection = factory.CreateConnection()) 55 { 56 using (IModel channel = connection.CreateModel()) 57 { 58 //channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 59 //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); 60 //路由key“error”再次綁定DirectExchangeErrorQueue隊列 61 //channel.QueueBind(queue: "DirectExchangeErrorQueue", 62 // exchange: "DirectExChange", 63 // routingKey: "error"); 64 65 66 //消費隊列中的所有消息; 67 var consumer = new EventingBasicConsumer(channel); 68 consumer.Received += (model, ea) => 69 { 70 var body = ea.Body; 71 var message = Encoding.UTF8.GetString(body.ToArray()); 72 Console.WriteLine($"【{message}】,發送郵件~~"); 73 }; 74 //處理消息 75 channel.BasicConsume(queue: "DirectExchangeErrorQueue", autoAck: true, consumer: consumer); 76 Console.ReadLine(); 77 } 78 } 79 } 80 81 }
{ Thread.Sleep(2000); Task.Run(() => { DirectExchange.Show1(); }); Task.Run(() => { DirectExchange.Show2(); }); }
運行結果
二. Fanout-Exchange模式
1.含義
交換機類型設置為:ExchangeType.Fanout
這種模式忽略routingKey,消息從客戶端發出,只要queue與exchange有綁定,那么不管你的Routingkey是什么,都會將消息分發給所有與該exchang綁定的隊列中。
2. 使用場景
典型的發布訂閱模式,也可以叫做觀察者模式. 比如博主有很多粉絲,博主每發一條消息,所有關注的粉絲都能收到推送(每個粉絲對應一個隊列)
3. 代碼分享
生產者

public class FanoutExchange { /// <summary> /// 博主發博客 /// </summary> public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); int i = 1; while (true) { var message = $"博客{i}"; var body = Encoding.UTF8.GetBytes(message); //基本發布 channel.BasicPublish(exchange: "FanoutExchange", routingKey: string.Empty, //這里忽略路由key設置什么,都會向所有的隊列發送 basicProperties: null, body: body); Console.WriteLine($"{message}已發送到隊列"); i++; Thread.Sleep(1000); } } } } }
FanoutExchange.Show();
消費者

public class FanoutExchange { /// <summary> /// 粉絲1的隊列 /// </summary> public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"粉絲1收到推送:{message}"); }; Console.WriteLine("粉絲1在瀏覽博客中.........."); //處理消息 channel.BasicConsume(queue: "fansQueue1", autoAck: true, consumer: consumer); Console.ReadLine(); } } } /// <summary> /// 粉絲2的隊列 /// </summary> public static void Show2() { Console.ForegroundColor = ConsoleColor.Yellow; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { //創建通道channel using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"粉絲2收到推送:{message}"); }; Console.WriteLine("粉絲2在瀏覽博客中.........."); //處理消息 channel.BasicConsume(queue: "fansQueue2", autoAck: true, consumer: consumer); Console.ReadLine(); } } } }
{ Thread.Sleep(2000); Task.Run(() => { FanoutExchange.Show1(); }); Task.Run(() => { FanoutExchange.Show2(); }); }
運行結果
三. Topic-Exchange模式
1. 含義
交換機類型設置為:ExchangeType.Topic
這種模式可以定制key,相當於在DirectExchange的基礎上增加了對key的模糊搜索,規則如下,主要是兩個關鍵符號
*,代表任意的一個詞。例如topic.zlh.*,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
#,代表任意多個詞。例如topic.#,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
2. 使用場景
分組
3. 代碼分享
生產者

public class TopicExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null); //多個詞匹配 channel.QueueBind(queue: "newsQueue", exchange: "TopicExchange", routingKey: "*.news", arguments: null); //1個詞匹配 { string message = "來自中國的新聞消息1"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.news", basicProperties: null, body: body); //同時匹配ChineQueue和newQueue Console.WriteLine($"消息【{message}】已發送到隊列"); } { string message = "來自中國的天氣消息1"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.weather.news", basicProperties: null, body: body); //僅匹配ChinaQueue Console.WriteLine($"消息【{message}】已發送到隊列"); } { string message = "來自中國的新聞消息2"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "msg.news", basicProperties: null, body: body); //僅匹配newsQueue Console.WriteLine($"消息【{message}】已發送到隊列"); } { string message = "來自美國的天氣消息2"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.weather", basicProperties: null, body: body); //誰都不匹配 Console.WriteLine($"消息【{message}】已發送到隊列"); } } } } }
TopicExchange.Show();
消費者

public class TopicExchange { /// <summary> /// 讀取ChinaQueue隊列消息 /// </summary> public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"ChinaQueue隊列中消費成功:{message}"); }; //處理消息 channel.BasicConsume(queue: "ChinaQueue", autoAck: true, consumer: consumer); } } } /// <summary> /// 讀取newsQueue隊列消息 /// </summary> public static void Show2() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"newsQueue隊列中消費成功:{message}"); }; //處理消息 channel.BasicConsume(queue: "newsQueue", autoAck: true, consumer: consumer); } } } }
{ Thread.Sleep(2000); Task.Run(() => { TopicExchange.Show1(); }); Task.Run(() => { TopicExchange.Show2(); }); }
運行結果
四. Header-Exchange模式
1. 含義
交換機類型設置為:ExchangeType.Headers
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。在綁定Queue與Exchange時指定一組鍵值對以及x-match參數,x-match參數是字符串類型,可以設置為any或者all。
A. 如果設置為any,只要匹配到了headers表中的任何一對鍵值即可
B. 如果設置為all,則代表需要全部匹配
2.使用場景
如下案例,All隊列中存儲了1條消息test1,Any隊列中存儲了2條消息,test3和test4
3. 代碼分享
生產者

public class HeaderExchange { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "HeaderExchange", type: ExchangeType.Headers, durable: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "HeaderExchangeAllqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "HeaderExchangeAnyqueue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: "HeaderExchangeAllqueue", exchange: "HeaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","all"}, { "teacher","ypf"}, { "pass","123"}}); Console.WriteLine("生產者准備就緒...."); { string message = "x-match=all,teacher和pass都相同時發送的消息,test1"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() {{ "teacher","ypf"}, { "pass","123"}}; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{message}】已發送"); //存入HeaderExchangeAllqueue隊列成功 } { string message = "x-match=all,teacher和pass有一個不相同時發送的消息,test2"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() { { "teacher","ypf"}, { "pass","456"} }; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{message}】已發送"); //存入HeaderExchangeAllqueue隊列失敗 } Console.WriteLine("*****************************888888888*********************************"); { channel.QueueBind(queue: "HeaderExchangeAnyqueue", exchange: "HeaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","any"}, { "teacher","lmr"}, { "pass","123456"},}); string msg = "x-match=any,teacher和pass完全相同時發送的消息,test3"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() { { "teacher","lmr"}, { "pass","123456"} }; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{msg}】已發送"); //存入HeaderExchangeAnyqueue隊列成功 } { string msg = "x-match=any,teacher和pass有一個不相同時發送的消息,test4"; var props = channel.CreateBasicProperties(); props.Headers = new Dictionary<string, object>() { { "teacher","lmr"}, { "pass","456"} }; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty, basicProperties: props, body: body); Console.WriteLine($"消息【{msg}】已發送"); //存入HeaderExchangeAnyqueue隊列成功 } } } Console.ReadKey(); } }
HeaderExchange.Show();
消費者

public class HeaderExchange { /// <summary> /// 讀取HeaderExchangeAllqueue隊列消息 /// </summary> public static void Show1() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"HeaderExchangeAllqueue隊列中消費成功:{message}"); }; //處理消息 channel.BasicConsume(queue: "HeaderExchangeAllqueue", autoAck: true, consumer: consumer); } } } /// <summary> /// 讀取HeaderExchangeAnyqueue隊列消息 /// </summary> public static void Show2() { Console.ForegroundColor = ConsoleColor.Green; var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服務在本地運行 factory.UserName = "guest";//用戶名 factory.Password = "guest";//密碼 using (var connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"HeaderExchangeAnyqueue隊列中消費成功:{message}"); }; //處理消息 channel.BasicConsume(queue: "HeaderExchangeAnyqueue", autoAck: true, consumer: consumer); } } } }
{ Thread.Sleep(2000); Task.Run(() => { HeaderExchange.Show1(); }); Task.Run(() => { HeaderExchange.Show2(); }); }
運行結果
五. 剖析持久化機制
1. 觸發條件
需要設置交換機、隊列、消息均為持久化。
2. 現象
(1). 下面路徑存放的是virtual,有幾個virtual,就有幾個文件夾
C:\Users\DELL\AppData\Roaming\RabbitMQ\db\rabbit@DESKTOP-DR3FU9S-mnesia\msg_stores\vhosts\
(2). 下面的msg_store_persistent文件夾存放的是持久化數據,
關閉RabbitMQ服務,會有很多文件,打開后,又沒了,說明啟動服務,磁盤上的數據又被加載到了硬盤
!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。