根據RabbitMQ官方文檔描述,可以通過“預取數量”來限制未被確認的消息個數,本質上這也是一種對消費者進行流控的方法。
詳見:https://www.rabbitmq.com/consumer-prefetch.html#independent-consumers 。
由RabbitMQ的機制可知,當多個消費者訂閱同一個Queue時,這時Queue中的消息會被平均分攤給多個消費者進行處理,因此一定要對該參數設置合理的值。
需要針對具體的應用場景,適當增大或減小該參數值(默認值為0表示不限制),以提高消費者吞吐量和充分利用資源,參考策略如下:
1.針對訂單類消息,因為處理耗時很短,可以適當增大該參數值,這樣Broker在一次網絡通信中會盡可能多地推送一些數據給消費者,以提高消費吞吐量;
2.對於依賴CPU計算型的耗時任務,該參數值則不能設置過大,否則會出現消息被分配后因為耗時等待一直無法確認而產生堆積,此時即使有別的消費者已經空閑也無法再被分配這些已經堆積的消息,導致資源浪費。
RabbitMQ客戶端提供了相應設置方法:
// 設置預取消息數量,默認值為0,不限流
channel.basicQos(10);
在Spring Boot框架中可以直接通過如下配置參數進行設定:
// listener類型為direct,設置預取消息數量為10,默認值為250(在AbstractMessageListenerContainer中定義的常量:DEFAULT_PREFETCH_COUNT)
spring.rabbitmq.listener.direct.prefetch=10
落實到本項目中,線上曾出現過這樣的現象:K8S管理的Docker集群中,當RabbitMQ中出現消息堆積時,卻只有1個Docker實例的負載持續很高,而其他Docker實例都非常閑。這顯然不符合預期,應該大家都很忙才對。
經排查分析后得知:本項目的特點是每一個任務消息都是CPU耗時型,如果消費者每次都獲取到多個任務消息到本地,那么就會出現即使其他消費者已經空閑了也無法為自己分擔任務的情形。
解決辦法:限制每次給每個消費者只分派一個任務消息(prefetch=1),這樣如果某個消費者在處理任務時被“卡住”了,則不再分配新的任務給它,而是把剩下的任務消息分配給那些已經空閑的消費者執行。