第三節:RabbitMq四種路由模式詳解和剖析持久化機制


一. Direct-Exchange模式

1. 含義

 交換機類型設置為:ExchangeType.Direct

 交換機和隊列通過routingKey(路由key)進行綁定,發消息的時候每條消息也要指定routingKey(路由key),然后交換機根據該路由key進行匹配,該key綁定了幾個Queue那么該條消息就同時發送到幾個隊列中

2. 使用場景

 通過消息隊列來寫日志;

 Info debug error warn :記錄下來

 error: 除了記錄下來,還需要特殊處理,可能需要發送一個信息,發送一個郵件;

解決方案:通過路由key匹配不同的隊列

 隊列1:專門用來記錄日志

 隊列2:專門用來發郵件,發信息

3. 代碼分享

生產者

 /// <summary>
    /// DirectExchange路由
    /// </summary>
    public class DirectExchange
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //聲明兩個隊列
                    channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
                    channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //1個路由
                    channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //4種路由key統一綁定DirectExchangeLogAllQueue隊列,
                    string[] logtypes = new string[] { "debug", "info", "warn", "error" };
                    foreach (string logtype in logtypes)
                    {
                        channel.QueueBind(queue: "DirectExchangeLogAllQueue",
                                exchange: "DirectExChange",
                                routingKey: logtype);
                    }
                    //路由key“error”再次綁定DirectExchangeErrorQueue隊列
                    channel.QueueBind(queue: "DirectExchangeErrorQueue",
                              exchange: "DirectExChange",
                              routingKey: "error");
                    List<LogMsgModel> logList = new List<LogMsgModel>();
                    for (int i = 1; i <=20; i++)
                    {
                        if (i % 4 == 0)
                        {
                            logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}條信息") });
                        }
                        if (i % 4 == 1)
                        {
                            logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}條信息") });
                        }
                        if (i % 4 == 2)
                        {
                            logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}條信息") });
                        }
                        if (i % 4 == 3)
                        {
                            logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}條信息") });
                        }
                    }
                     
                    Console.WriteLine("生產者發送20條日志信息");
                    //發送日志信息
                    foreach (var log in logList)
                    {
                        channel.BasicPublish(exchange: "DirectExChange",
                                            routingKey: log.LogType,
                                            basicProperties: null,
                                            body: log.Msg);
                        Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)}  已發送~~");
                    }
                     
                }
            }
        }


        public class LogMsgModel
        {
            public string LogType { get; set; }
            public byte[] Msg { get; set; }
        }
    }
View Code
DirectExchange.Show();

消費者

 1  public class DirectExchange
 2     {
 3         /// <summary>
 4         /// 隊列1--用於各種類型日志信息
 5         /// </summary>
 6         public static void Show1()
 7         {
 8             Console.ForegroundColor = ConsoleColor.Green;
 9 
10             var factory = new ConnectionFactory();
11             factory.HostName = "localhost";//RabbitMQ服務在本地運行
12             factory.UserName = "guest";//用戶名
13             factory.Password = "guest";//密碼 
14             using (var connection = factory.CreateConnection())
15             {
16                 using (IModel channel = connection.CreateModel())
17                 {
18                     //channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
19                     //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
20                     //string[] logtypes = new string[] { "debug", "info", "warn", "error" };
21                     //foreach (string logtype in logtypes)
22                     //{
23                     //    channel.QueueBind(queue: "DirectExchangeLogAllQueue",
24                     //            exchange: "DirectExChange",
25                     //            routingKey: logtype);
26                     //}
27                     //消費隊列中的所有消息;                                   
28                     var consumer = new EventingBasicConsumer(channel);
29                     consumer.Received += (model, ea) =>
30                     {
31                         var body = ea.Body;
32                         var message = Encoding.UTF8.GetString(body.ToArray());
33                         Console.WriteLine($"【{message}】,寫入文本~~");
34                     };
35                     //處理消息
36                     channel.BasicConsume(queue: "DirectExchangeLogAllQueue", autoAck: true, consumer: consumer);
37                     Console.ReadLine();
38                 }
39             }
40         }
41 
42 
43         /// <summary>
44         /// 隊列2--用於error類型日志進行單獨處理
45         /// </summary>
46         public static void Show2()
47         {
48             Console.ForegroundColor = ConsoleColor.Green;
49 
50             var factory = new ConnectionFactory();
51             factory.HostName = "localhost";//RabbitMQ服務在本地運行
52             factory.UserName = "guest";//用戶名
53             factory.Password = "guest";//密碼 
54             using (var connection = factory.CreateConnection())
55             {
56                 using (IModel channel = connection.CreateModel())
57                 {
58                     //channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
59                     //channel.ExchangeDeclare(exchange: "DirectExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
60                     //路由key“error”再次綁定DirectExchangeErrorQueue隊列
61                     //channel.QueueBind(queue: "DirectExchangeErrorQueue",
62                     //          exchange: "DirectExChange",
63                     //          routingKey: "error");
64 
65 
66                     //消費隊列中的所有消息;                                   
67                     var consumer = new EventingBasicConsumer(channel);
68                     consumer.Received += (model, ea) =>
69                     {
70                         var body = ea.Body;
71                         var message = Encoding.UTF8.GetString(body.ToArray());
72                         Console.WriteLine($"【{message}】,發送郵件~~");
73                     };
74                     //處理消息
75                     channel.BasicConsume(queue: "DirectExchangeErrorQueue", autoAck: true, consumer: consumer);
76                     Console.ReadLine();
77                 }
78             }
79         }
80 
81     }
View Code
            {
                Thread.Sleep(2000);

                Task.Run(() =>
                {
                    DirectExchange.Show1();
                });
                Task.Run(() =>
                {
                    DirectExchange.Show2();
                });
            }

運行結果

 

二. Fanout-Exchange模式

1.含義

 交換機類型設置為:ExchangeType.Fanout

 這種模式忽略routingKey,消息從客戶端發出,只要queue與exchange有綁定,那么不管你的Routingkey是什么,都會將消息分發給所有與該exchang綁定的隊列中。

2. 使用場景

 典型的發布訂閱模式,也可以叫做觀察者模式. 比如博主有很多粉絲,博主每發一條消息,所有關注的粉絲都能收到推送(每個粉絲對應一個隊列)

3. 代碼分享

生產者

  public class FanoutExchange
    {
        /// <summary>
        /// 博主發博客
        /// </summary>
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
                    channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);
                    int i = 1;
                    while (true)
                    {
                        var message = $"博客{i}";
                        var body = Encoding.UTF8.GetBytes(message);
                        //基本發布
                        channel.BasicPublish(exchange: "FanoutExchange",
                                             routingKey: string.Empty,    //這里忽略路由key設置什么,都會向所有的隊列發送
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"{message}已發送到隊列");
                        i++;
                        Thread.Sleep(1000);
                    }

                }
            }
        }
    }
View Code
 FanoutExchange.Show();

消費者

 public class FanoutExchange
    {
        /// <summary>
        /// 粉絲1的隊列
        /// </summary>
        public static void Show1()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //channel.QueueDeclare(queue: "fansQueue1", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "fansQueue1", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);

                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"粉絲1收到推送:{message}");
                    };
                    Console.WriteLine("粉絲1在瀏覽博客中..........");
                    //處理消息
                    channel.BasicConsume(queue: "fansQueue1", autoAck: true, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }

        /// <summary>
        /// 粉絲2的隊列
        /// </summary>
        public static void Show2()
        {
            Console.ForegroundColor = ConsoleColor.Yellow;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //channel.QueueDeclare(queue: "fansQueue2", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "fansQueue2", exchange: "FanoutExchange", routingKey: string.Empty, arguments: null);

                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"粉絲2收到推送:{message}");
                    };
                    Console.WriteLine("粉絲2在瀏覽博客中..........");
                    //處理消息
                    channel.BasicConsume(queue: "fansQueue2", autoAck: true, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }




    }
View Code
            {
                Thread.Sleep(2000);

                Task.Run(() =>
                {
                    FanoutExchange.Show1();
                });
                Task.Run(() =>
                {
                    FanoutExchange.Show2();
                });
            }

運行結果

 

三. Topic-Exchange模式

1. 含義

 交換機類型設置為:ExchangeType.Topic

 這種模式可以定制key,相當於在DirectExchange的基礎上增加了對key的模糊搜索,規則如下,主要是兩個關鍵符號

 *,代表任意的一個詞。例如topic.zlh.*,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

 #,代表任意多個詞。例如topic.#,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

2. 使用場景

 分組

3. 代碼分享

生產者

 public class TopicExchange
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                { 
                    channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); 
                    channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); 
                    channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false,  arguments: null); 

                    channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);   //多個詞匹配
                    channel.QueueBind(queue: "newsQueue", exchange: "TopicExchange", routingKey: "*.news", arguments: null);     //1個詞匹配

                    {
                        string message = "來自中國的新聞消息1";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.news", basicProperties: null, body: body);  //同時匹配ChineQueue和newQueue
                        Console.WriteLine($"消息【{message}】已發送到隊列");
                    }

                    {
                        string message = "來自中國的天氣消息1";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.weather.news", basicProperties: null, body: body); //僅匹配ChinaQueue
                        Console.WriteLine($"消息【{message}】已發送到隊列");
                    }
                    {
                        string message = "來自中國的新聞消息2";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "TopicExchange", routingKey: "msg.news", basicProperties: null, body: body);   //僅匹配newsQueue
                        Console.WriteLine($"消息【{message}】已發送到隊列");
                    } 
                    {
                        string message = "來自美國的天氣消息2";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.weather", basicProperties: null, body: body);  //誰都不匹配
                        Console.WriteLine($"消息【{message}】已發送到隊列");
                    } 
                }
            }
        }
    }
View Code
TopicExchange.Show();

消費者

 public class TopicExchange
    {
        /// <summary>
        /// 讀取ChinaQueue隊列消息
        /// </summary>
        public static void Show1()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"ChinaQueue隊列中消費成功:{message}");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "ChinaQueue", autoAck: true, consumer: consumer);
                }
            }
        }

        /// <summary>
        /// 讀取newsQueue隊列消息
        /// </summary>
        public static void Show2()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"newsQueue隊列中消費成功:{message}");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "newsQueue", autoAck: true, consumer: consumer);
                }
            }
        }

    }
View Code
{
                Thread.Sleep(2000);

                Task.Run(() =>
                {
                    TopicExchange.Show1();
                });
                Task.Run(() =>
                {
                    TopicExchange.Show2();
                });
 }

運行結果

 

 

四. Header-Exchange模式

1. 含義

 交換機類型設置為:ExchangeType.Headers

 headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。在綁定Queue與Exchange時指定一組鍵值對以及x-match參數,x-match參數是字符串類型,可以設置為any或者all。

 A. 如果設置為any,只要匹配到了headers表中的任何一對鍵值即可

 B. 如果設置為all,則代表需要全部匹配

2.使用場景

 如下案例,All隊列中存儲了1條消息test1,Any隊列中存儲了2條消息,test3和test4

3. 代碼分享

生產者

  public class HeaderExchange
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Red;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "HeaderExchange", type: ExchangeType.Headers, durable: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "HeaderExchangeAllqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "HeaderExchangeAnyqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

                    channel.QueueBind(queue: "HeaderExchangeAllqueue", exchange: "HeaderExchange", routingKey: string.Empty,
                                      arguments: new Dictionary<string, object> {
                                                                    { "x-match","all"},
                                                                    { "teacher","ypf"},
                                                                    { "pass","123"}});
                    Console.WriteLine("生產者准備就緒....");
                    {
                        string message = "x-match=all,teacher和pass都相同時發送的消息,test1";
                        var props = channel.CreateBasicProperties();
                        props.Headers = new Dictionary<string, object>() {{ "teacher","ypf"},
                                                                           { "pass","123"}};
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "HeaderExchange",
                                             routingKey: string.Empty,
                                             basicProperties: props,
                                             body: body);
                        Console.WriteLine($"消息【{message}】已發送");        //存入HeaderExchangeAllqueue隊列成功
                    }
                    {
                        string message = "x-match=all,teacher和pass有一個不相同時發送的消息,test2";
                        var props = channel.CreateBasicProperties();
                        props.Headers = new Dictionary<string, object>() {
                                                                           { "teacher","ypf"},
                                                                           { "pass","456"}
                                                                          };
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "HeaderExchange",
                                             routingKey: string.Empty,
                                             basicProperties: props,
                                             body: body);
                        Console.WriteLine($"消息【{message}】已發送");     //存入HeaderExchangeAllqueue隊列失敗
                    }


                    Console.WriteLine("*****************************888888888*********************************");
                    {
                        channel.QueueBind(queue: "HeaderExchangeAnyqueue", exchange: "HeaderExchange", routingKey: string.Empty,
                        arguments: new Dictionary<string, object> {
                                            { "x-match","any"},
                                            { "teacher","lmr"},
                                            { "pass","123456"},});

                        string msg = "x-match=any,teacher和pass完全相同時發送的消息,test3";
                        var props = channel.CreateBasicProperties();
                        props.Headers = new Dictionary<string, object>() {
                                                 { "teacher","lmr"},
                                                 { "pass","123456"}
                                            };
                        var body = Encoding.UTF8.GetBytes(msg);
                        channel.BasicPublish(exchange: "HeaderExchange",
                                             routingKey: string.Empty,
                                             basicProperties: props,
                                             body: body);
                        Console.WriteLine($"消息【{msg}】已發送");      //存入HeaderExchangeAnyqueue隊列成功
                    }

                    {
                        string msg = "x-match=any,teacher和pass有一個不相同時發送的消息,test4";
                        var props = channel.CreateBasicProperties();
                        props.Headers = new Dictionary<string, object>() {
                                                 { "teacher","lmr"},
                                                 { "pass","456"}
                                            };
                        var body = Encoding.UTF8.GetBytes(msg);
                        channel.BasicPublish(exchange: "HeaderExchange",
                                             routingKey: string.Empty,
                                             basicProperties: props,
                                             body: body);
                        Console.WriteLine($"消息【{msg}】已發送");    //存入HeaderExchangeAnyqueue隊列成功
                    }

                }
            }
            Console.ReadKey();
        }
    }
View Code
 HeaderExchange.Show();

消費者

public class HeaderExchange
    {
        /// <summary>
        /// 讀取HeaderExchangeAllqueue隊列消息
        /// </summary>
        public static void Show1()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"HeaderExchangeAllqueue隊列中消費成功:{message}");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "HeaderExchangeAllqueue", autoAck: true, consumer: consumer);
                }
            }
        }

        /// <summary>
        /// 讀取HeaderExchangeAnyqueue隊列消息
        /// </summary>
        public static void Show2()
        {
            Console.ForegroundColor = ConsoleColor.Green;
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"HeaderExchangeAnyqueue隊列中消費成功:{message}");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "HeaderExchangeAnyqueue", autoAck: true, consumer: consumer);
                }
            }
        }



    }
View Code
            {
                Thread.Sleep(2000);

                Task.Run(() =>
                {
                    HeaderExchange.Show1();
                });
                Task.Run(() =>
                {
                    HeaderExchange.Show2();
                });
            }

運行結果

 

 

 

五. 剖析持久化機制

1. 觸發條件

 需要設置交換機、隊列、消息均為持久化。

 

 

 

 

2. 現象

(1). 下面路徑存放的是virtual,有幾個virtual,就有幾個文件夾

 C:\Users\DELL\AppData\Roaming\RabbitMQ\db\rabbit@DESKTOP-DR3FU9S-mnesia\msg_stores\vhosts\

(2). 下面的msg_store_persistent文件夾存放的是持久化數據,

 關閉RabbitMQ服務,會有很多文件,打開后,又沒了,說明啟動服務,磁盤上的數據又被加載到了硬盤

 

 

 

 

 

 

 

 

!

  • 作       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM