工作隊列-WorkQueue
實現功能:
將耗時的任務分發給多個工作者
設計思想:
避免直接去做一件資源密集型的任務,並且還得等它完成。因此將任務安排后再去做。將任務封裝為一個消息,發到隊列中。一個工作進程將在后台取出任務並最終完成。如果開啟多個工作進程,任務將在這個多工作進程間共享
消息分發:
一、循環分發(Message acknowledement):
默認情況下,rabbitmq是輪流發消息給下一個消費者,平均每個cusumer接到的消息數量相等。
那么問題來了,當一個消息十分耗時,那么獲得到這個消息的cusumer,在執行時崩潰導致任務未完成,那么這個消息就丟失了。為了保證出現類似問題,消息不消失。rabbitmq有一個消息確認機制。
二、消息確認機制(ack):
rabbitmq支持消息確認,為保證消息永不丟失,cusumer會發送一個確認消息告訴rabbitmq。代表我已接收消息,並處理完成。可以隨時刪除。
當一個cusumer在發送確認消息前死亡(連接或通道關閉,tcp連接丟失等),rabbitmq會認為該消息沒有被完全處理並將其重新加入隊列。如果此時有其他cusumer,rabbitmq很快會重新發送該消息到其他cusumer。通過這個方式保證沒有消息丟失,及時某個cusumer意外死亡。
開啟消息確認機制,rabbitmq默認打開。代碼:autoAck = true(關掉)/false(打開)
注:對於rabbitmq而言,沒有消息超時
當我們開啟消息確認機制之后,可以保證cusumer死亡時不會丟失消息。但當rabbitmq服務關閉或崩潰后,會丟失所有的隊列和消息。
那如何解決因服務關閉或崩潰造成的消息丟失呢?我們需要做三件事情:
持久化exchange(交換機):聲明時指定durable = true
持久化queue(隊列):聲明時指定durable = true
持久化message(消息):在消息投遞時指定delivery_mode = 2(1是非持久化)或將MessageProperties的值設置為PERSISTENT_TEXT_PALIN
注:rabbitmq不允許重新定義已經存在的隊列的持久化,如上一章中我們設置的MyQueue隊列。如果設置了該隊列不持久化,那么我們不能再聲明它持久化。不然會報錯。我們必須重新聲明一個新隊列並聲明持久化。
以上我們可以做一個小結:
1、rabbitmq在服務端沒有聲明隊列和消息持久化時,隊列和消息存在內存中,服務端宕機后都丟失。
2、服務端聲明持久化,客戶端想要接收消息,必須聲明queue同時聲明持久化,不然客戶端執行報錯。
三、公平分發(Fair dispatch):
在循環分發機制中,可能會發生一個cusumer接收的消息處理非常耗時,而另一個cusumer接收的消息非常快處理完。這會導致有的cusumer很忙有的很閑。
rabbitmq對此一概不知。因為它只是當消息進入隊列就分發出去,並沒有查看每個cusumer未返回消息確認的數量。
為了改變這種情況,rabbitmq提供了公平分發機制。使用basicQos()方法。將其參數prefetchCount設置為1。這樣cusumer會告訴rabbitmq,不要同時發多個消息給我。每次只發一個,當我處理完消息並給你確認信息后,再發給我下一個。這時候rabbitmq會查看cusumer返回的確認,尋找空閑的cusumer發送消息。
注:當所有的cusumer都很忙,隊列可能會被裝滿。這個情況必須留意。要么增加更多cusumer要么采取其他策略。