QOS:服務質量保證功能
Prefetch count (預取數目)
prefetch是指單一消費者最多能消費的unacked messages數目。
mq為每一個 consumer設置一個緩沖區,大小就是prefetch。每次收到一條消息,MQ會把消息推送到緩存區中,然后再推送給客戶端。當收到一個ack消息時(consumer 發出baseack指令),mq會從緩沖區中空出一個位置,然后加入新的消息。但是這時候如果緩沖區是滿的,MQ將進入堵塞狀態。
更具體點描述,假設prefetch值設為10,共有兩個consumer。也就是說每個consumer每次會從queue中預抓取 10 條消息到本地緩存着等待消費。同時該channel的unacked數變為20。而Rabbit投遞的順序是,先為consumer1投遞滿10個message,再往consumer2投遞10個message。如果這時有新message需要投遞,先判斷channel的unacked數是否等於20,如果是則不會將消息投遞到consumer中,message繼續呆在queue中。之后其中consumer對一條消息進行ack,unacked此時等於19,Rabbit就判斷哪個consumer的unacked少於10,就投遞到哪個consumer中。
前面我們提到了如果有多個消費者同時訂閱同一個Quque中的消息,Quque中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能導致某些消費者一直很忙,而另一些消費者很快處理完手頭上工作,並一直空閑的情況下。我們可以通過設置prefetch count=1,則Quque每次給每個消費者發送一條消息;消費者處理完這條消息后Quque會再給消費者發送一條消息。
試想一下,如果我們單個消費者1分鍾最多處理60條消息,但是生產者1分鍾可能會發送300條消息,如果我們一台消費者客戶端,1分鍾同時要接收到300條消息,已經超過我們最大的負載,這時,就可能導致,服務器資源被耗盡,消費者客戶端卡死等情況。
RabbitMQ提供了一種qos(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(通過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息。
PrefetchCount:預取數量,在接收到該Consumer的ack前,他它不會將新的Message分發給它
啟用Qos和手動Ack可能導致隊列阻塞:
如果consumer接收到消息后沒有ack或者unack,並且使用Qos限制了prefetch數量,擋unacked消息太多就會造成堵塞
應答方式:
basicAck:成功消費,消息從隊列中刪除
basicNack:拒絕消息,requeue=true,消息重新進入隊列,false被刪除
basicReject:等同於basicNack
basicRecover:消息重入隊列,requeue=true,發送給新的consumer,false發送給相同的consumer
basicQoc:
設置服務端每次發送給消費者的消息數量
/**
* prefetchSize:服務器傳送最大內容量(以八位字節計算),如果沒有限制,則為0
* prefetchCount:服務器每次傳遞的最大消息數,如果沒有限制,則為0;
* global:如果為true,則當前設置將會應用於整個Channel(頻道)
**/
void basicQos(int prefetchSize, int prefetchCount, boolean global)
消息回復:
Ack(確認)收到一個或者多個消息
/**
* 消費者確認收到一個或者多個消息
* deliveryTag:服務器端向消費者推送消息,消息會攜帶一個deliveryTag參數,也可以成此參數為消息的唯一標識,是一個遞增的正整數
* multiple:true表示確認所有消息,包括消息唯一標識小於等於deliveryTag的消息,false只確認deliveryTag指定的消息
**/
void basicAck(long deliveryTag, boolean multiple)
basicRecover:重新發送
/**
* 要求代理重新發送未確認的消息
* requeue:如果為true,消息將會重新入隊,可能會被發送給其它的消費者;如果為false,消息將會發送給相同的消費者
**/
Basic.basicRecover(boolean requeue)
basicNack:拒絕消息
/**
* 拒絕接收到的一個或者多個消息
* deliveryTag:接收到消息的唯一標識
* multiple: true表示拒絕所有的消息,包括提供的deliveryTag;false表示僅拒絕提供的deliveryTag
* requeue:true 表示拒絕的消息應重新入隊,而不是否丟棄
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
basicReject:拒絕消息
參考:https://blog.csdn.net/james_searcher/article/details/70308565