學習在.NET Core中使用RabbitMQ進行消息傳遞之持久化(二)


前言

上一節我們簡單介紹了RabbitMQ和在安裝后啟動所出現的問題,本節我們開始正式進入RabbitMQ的學習,對於基本概念請從官網或者其他前輩博客上查閱,我這里不介紹基礎性東西,只會簡單提一下,請知悉。

RabbitMQ持久化

在RabbitMQ中存在四種交換機,一是直連交換機(Direct Exchange),二是廣播交換機(Fantout Exchange),三是主題交換機(Topic Exchange),四是頭交換機(Header Exchange),每種交換機都有其對應場景,后面會一一講到。任何處理消息的開源庫都離不開兩個角色,一是生產者(Producer),二是消費者(Consumer),消息從生產者到達消費者需經過三個階段也就是我們需要知道的三個概念,一是交換機,二是隊列,三是綁定,初次見此三者概念感覺很高大上是不是,其實不過是高度抽象了而已,同時看到很多博客上對此三者解釋的還是非常深刻,我這里就不過多細說。我們先看別人博客上對整個RabbitMQ架構用圖來進行標識,然后對照我對交換機、隊列、綁定一言以蔽之進行如下概括您就明白他們具體是干什么了。

交換機是生產者發布消息的入口點,隊列是消費者獲取消息的容器,綁定是將交換機連接到隊列的規則

我們簡單過一下消息從生產者到達消費者的大概過程,生產者也就是上述圖中的ClientA和ClientB,發送消息到交換機,這里交換機可以是一個或者多個,然后通過路由鍵綁定到隊列,那么消費者如何知道綁定到了哪個隊列呢?然后消費者也就是上述Client1、Client2、Client3也要聲明交換機、隊列、綁定,這樣整個過程就串起來了,原理大概就是這樣。接下來我們通過代碼來實現生產者發送消息(請通過NuGet安裝RabbitMQ客戶端)。

    public class RabbitMQService
    {
        public IConnection GetRabbitMQConnection()
        {
            var connectionFactory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            return connectionFactory.CreateConnection();
        }
    }

第一步當然是需要創建連接,連接到RabbitMQ服務,接下來則是在我們創建連接的基礎上創建通道,然后上述我們所說消息從生產者到達消費者的過程就是在此通道上進行。

            var rabbitMQService = new RabbitMQService();
            var connection = rabbitMQService.GetRabbitMQConnection();
            var model = connection.CreateModel();

接下來則是聲明交換機、隊列、通過路由鍵將交換機和隊列綁定在一起。

        static void InitialTopicQueue(IModel model)
        {
            model.QueueDeclare("queueDeclare", true, false, false, null);
            model.ExchangeDeclare("exchangeDeclare", ExchangeType.Topic);
            model.QueueBind("queueDeclare", "exchangeDeclare", "routeKey");
        }

如上我們聲明隊列名稱為queueDeclare且該隊列持久化,然后聲明交換機名稱為exchangeDeclare,交換機類型為主題,最后通過QueueBind方法通過routekey路由鍵將聲明的交換機和隊列綁定在一起。接下來我們開始發布消息。

            var basicProperties = model.CreateBasicProperties();
            basicProperties.DeliveryMode = 1;
            var payload = Encoding.UTF8.GetBytes("這是來自運行VS2017控制台發出的消息");
            var address = new PublicationAddress(ExchangeType.Topic, "exchangeDeclare", "routeKey");
            model.BasicPublish(address, basicProperties, payload);

然后我們在控制台中運行上述程序,然后在RabbitMQ UI上來查看聲明的交換機、隊列、綁定的路由鍵以及承載的消息。

 

 

一切如正常運行,其我們發布的消息在隊列中處於Ready狀態,接下來我們在服務中關閉RabbitMQ服務代理,然后重啟模擬宕機的情況。

 

因為在聲明隊列中第二個參數可指定該隊列是否可持久化,我們指定為True即持久化,所以即使RabbitMQ Broker重啟隊列依然還在,這個時候我們會發現在隊列中的數據被扔掉了,也就是說此時的隊列為空。那是因為我們指定消息為非持久化,如下指定傳輸模式即DeliveryModel屬性為1,1代表非持久化,2為可持久化。所以如果我們指定DeliveryMode等於2即使RabbitMQ代理宕機,重啟后消息依然還在。

            var basicProperties = model.CreateBasicProperties();
            basicProperties.DeliveryMode = 2;

 同理切換到ExchangeType界面時,此時我們聲明的交換機不存在了,如下:

因為聲明交換機時ExchangeDeclare方法有重載,第三個參數可指定是否持久化,默認為非持久化,如果我們進行如下指定為持久化,那么和在隊列中的消息一樣即使RabbitMQ代理重啟交換機依然還在,這個就不需要我再過多啰嗦了。

 model.ExchangeDeclare("exchangeDeclare", ExchangeType.Topic, true);

 

從如上演示我們可看出在RabbitMQ中關於持久化,可對交換機(ExChange)、隊列(Queue)、消息(Message)分別指定持久化,而且在大部分情況下這也是我們想要的。因為這至少可以保證消息從生產者傳遞消費者的過程中不會丟失。那么對於持久化和非持久化RabbitMQ是如何進行處理的呢?

持久化:將消息保存到磁盤上,因此即使在服務器重新啟動后它們仍然可用,只不過在讀取和保存消息時會產生一些額外的開銷罷了。

非持久化:將消息被保存在內存中,雖然它們在服務器重啟后將會消失,但提供更快的消息處理。

總結

今天我們詳細講解了RabbitMQ中的持久化,我們可從交換機、隊列、消息三個層面去設置持久化。這只是小試牛刀,下節開始討論細枝末節,學習的過程也是發現問題的過程,我們下節再會。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM