SpringBoot2.x下RabbitMQ的並發參數(concurrency和prefetch)


 

RabbitMQ消費端配置

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: # acknowledge-mode: manual # 手動確定(默認自動確認) concurrency: 1 # 消費端的監聽個數(即@RabbitListener開啟幾個線程去處理數據。) max-concurrency: 10 # 消費端的監聽最大個數 prefetch: 10 connection-timeout: 15000 # 超時時間 

在消費端,配置prefetchconcurrency參數便可以實現消費端MQ並發處理消息,那么這兩個參數到底有什么含義??

1. prefetch

每個customer會在MQ預取一些消息放入內存的LinkedBlockingQueue中,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。從2.0開始默認為250;設置為1將還原為以前的行為。

prefetch默認值以前是1,這可能會導致高效使用者的利用率不足。從spring-amqp 2.0版開始,默認的prefetch值是250,這將使消費者在大多數常見場景中保持忙碌,從而提高吞吐量。

不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內存中大量堆積,消耗大量內存;以及對於一些嚴格要求順序的消息,prefetch的值應當設置為1。

對於低容量消息和多個消費者的情況(也包括單listener容器的concurrency配置)希望在多個使用者之間實現更均勻的消息分布,建議在手動ack下並設置prefetch=1

模擬:
生產者每次生產10條消息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
@RestController public class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; //直接向隊列中發送數據 @GetMapping("send") public String send() { for (int i = 0; i < 10; i++) { String content = "Date:" + System.currentTimeMillis(); content = content + ":::" + i; rabbitTemplate.convertAndSend("kinson", content); } return "success"; } } 

控制頁面:


 
消費端直接預取了10條消息.png

2. concurrency

上面配置中,concurrency =1,即每個Listener容器將開啟一個線程去處理消息。

在2.0版本后,可以在注解中配置該參數:

@Component @Slf4j public class CustomerRev { //會覆蓋配置文件中的參數。 @RabbitListener(queues = {"kinson"},concurrency = "2") public void receiver(Message msg, Channel channel) throws InterruptedException { // Thread.sleep(10000); byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) { //打印數據 String message = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("【消3】:{}", message); } } } 

啟動服務:

 
可以看到MQ有兩個消費者.png

即該Listener容器產生了兩個線程去消費queue。如果在Listener配置了exclusive參數,即確定此容器中的單個customer是否具有對隊列的獨占訪問權限。如果為true,則容器的並發性必須為1。

 





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM