RabbitMQ消費端配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: # acknowledge-mode: manual # 手動確定(默認自動確認) concurrency: 1 # 消費端的監聽個數(即@RabbitListener開啟幾個線程去處理數據。) max-concurrency: 10 # 消費端的監聽最大個數 prefetch: 10 connection-timeout: 15000 # 超時時間
在消費端,配置prefetch
和concurrency
參數便可以實現消費端MQ並發處理消息,那么這兩個參數到底有什么含義??
1. prefetch
每個customer會在MQ預取一些消息放入內存的LinkedBlockingQueue中,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。從2.0開始默認為250;設置為1將還原為以前的行為。
prefetch
默認值以前是1,這可能會導致高效使用者的利用率不足。從spring-amqp 2.0版開始,默認的prefetch
值是250,這將使消費者在大多數常見場景中保持忙碌,從而提高吞吐量。
不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內存中大量堆積,消耗大量內存;以及對於一些嚴格要求順序的消息,prefetch
的值應當設置為1。
對於低容量消息和多個消費者的情況(也包括單listener容器的concurrency配置)希望在多個使用者之間實現更均勻的消息分布,建議在手動ack下並設置prefetch=1
。
模擬:
生產者每次生產10條消息:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
@RestController public class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; //直接向隊列中發送數據 @GetMapping("send") public String send() { for (int i = 0; i < 10; i++) { String content = "Date:" + System.currentTimeMillis(); content = content + ":::" + i; rabbitTemplate.convertAndSend("kinson", content); } return "success"; } }
控制頁面:

2. concurrency
上面配置中,concurrency =1
,即每個Listener
容器將開啟一個線程去處理消息。
在2.0版本后,可以在注解中配置該參數:
@Component @Slf4j public class CustomerRev { //會覆蓋配置文件中的參數。 @RabbitListener(queues = {"kinson"},concurrency = "2") public void receiver(Message msg, Channel channel) throws InterruptedException { // Thread.sleep(10000); byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) { //打印數據 String message = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("【消3】:{}", message); } } }
啟動服務:

即該Listener容器產生了兩個線程去消費queue。如果在Listener配置了exclusive
參數,即確定此容器中的單個customer是否具有對隊列的獨占訪問權限。如果為true,則容器的並發性必須為1。
