RabbmitMQ-工作隊列及相關概念


工作隊列-WorkQueue

實現功能:

  將耗時的任務分發給多個工作者

設計思想:

  避免直接去做一件資源密集型的任務,並且還得等它完成。因此將任務安排后再去做。將任務封裝為一個消息,發到隊列中。一個工作進程將在后台取出任務並最終完成。如果開啟多個工作進程,任務將在這個多工作進程間共享

 

消息分發:

  一、循環分發(Message acknowledement):

    默認情況下,rabbitmq是輪流發消息給下一個消費者,平均每個cusumer接到的消息數量相等。  

   那么問題來了,當一個消息十分耗時,那么獲得到這個消息的cusumer,在執行時崩潰導致任務未完成,那么這個消息就丟失了。為了保證出現類似問題,消息不消失。rabbitmq有一個消息確認機制。

  二、消息確認機制(ack):

    rabbitmq支持消息確認,為保證消息永不丟失,cusumer會發送一個確認消息告訴rabbitmq。代表我已接收消息,並處理完成。可以隨時刪除。

    當一個cusumer在發送確認消息前死亡(連接或通道關閉,tcp連接丟失等),rabbitmq會認為該消息沒有被完全處理並將其重新加入隊列。如果此時有其他cusumer,rabbitmq很快會重新發送該消息到其他cusumer。通過這個方式保證沒有消息丟失,及時某個cusumer意外死亡。

    開啟消息確認機制,rabbitmq默認打開。代碼:autoAck = true(關掉)/false(打開)

      

  注:對於rabbitmq而言,沒有消息超時

  當我們開啟消息確認機制之后,可以保證cusumer死亡時不會丟失消息。但當rabbitmq服務關閉或崩潰后,會丟失所有的隊列和消息。

  那如何解決因服務關閉或崩潰造成的消息丟失呢?我們需要做三件事情:

  持久化exchange(交換機):聲明時指定durable = true

  持久化queue(隊列):聲明時指定durable = true

  持久化message(消息):在消息投遞時指定delivery_mode = 2(1是非持久化)或將MessageProperties的值設置為PERSISTENT_TEXT_PALIN

  注:rabbitmq不允許重新定義已經存在的隊列的持久化,如上一章中我們設置的MyQueue隊列。如果設置了該隊列不持久化,那么我們不能再聲明它持久化。不然會報錯。我們必須重新聲明一個新隊列並聲明持久化。

以上我們可以做一個小結:

  1、rabbitmq在服務端沒有聲明隊列和消息持久化時,隊列和消息存在內存中,服務端宕機后都丟失。

  2、服務端聲明持久化,客戶端想要接收消息,必須聲明queue同時聲明持久化,不然客戶端執行報錯。

  三、公平分發(Fair dispatch):

    在循環分發機制中,可能會發生一個cusumer接收的消息處理非常耗時,而另一個cusumer接收的消息非常快處理完。這會導致有的cusumer很忙有的很閑。

    rabbitmq對此一概不知。因為它只是當消息進入隊列就分發出去,並沒有查看每個cusumer未返回消息確認的數量。

    為了改變這種情況,rabbitmq提供了公平分發機制。使用basicQos()方法。將其參數prefetchCount設置為1。這樣cusumer會告訴rabbitmq,不要同時發多個消息給我。每次只發一個,當我處理完消息並給你確認信息后,再發給我下一個。這時候rabbitmq會查看cusumer返回的確認,尋找空閑的cusumer發送消息。

  注:當所有的cusumer都很忙,隊列可能會被裝滿。這個情況必須留意。要么增加更多cusumer要么采取其他策略。

  

  
    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM