ActiveMQ第六彈:設置多個並行的消費者


消息隊列本來就是一種經典的生產者與消費者模式。生產者向消息隊列中發送消息,消費者從消息隊列中獲取消息來消費。

消息的傳送一般由一個代理來實現的,那就是Message broker(即消息代理)。Message broker有兩大職責,一是消息路由,二是數據轉換。這就好比A給B寄信,如果不使用郵局的話,就要自己想辦法送達,費時費力,而通過郵局的話,只要B的地址在郵局中注冊過,那么天涯海角也能送達。這里的郵局扮演的角色就像消息系統中的Message broker。

眾所周知,消息隊列是典型的’send and forget’原則的體現,生產者只管發送,不管消息的后續處理。為了最大效率的完成對消息隊列中的消息的消費,一般可以同時起多個一模一樣的消費者,以並行的方式來拉取消息隊列中的消息。這樣的好處有多個:

  1. 加快處理消息隊列中的消息。

  2. 增強穩定性,如果一個消費者出現問題,不會影響對消息隊列中消息的處理。

使用Spring JMS來配置多個Listener實例其實也相當簡單,只需要配置下MessageListenerContainer就行。

1
2
3
4
5
6
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">  <property name="connectionFactory" ref="connectionFactory"/>  <property name="destinationName" value="${jms.queue.name}"/>  <property name="messageListener" ref="messageReceiver"/>  <property name="concurrentConsumers" value="4"/> </bean> 

多配置一個屬性concurrentConsumers,設置值為4,就是同時啟動4個Listener實例來消費消息。

使用MessageSender來發送100條消息,可以檢查消息處理的順序會發生變化。

1
2
3
 for (int i = 0; i < 100; i++) {  messageSender.send(String.format("message %d",i));  } 
1
2
3
4
5
6
7
8
9
... Received: message 4 Received: message 7 Received: message 6 Received: message 5 Received: message 8 Received: message 10 Received: message 9 

除了設置一個固定的Listener數量,也可以設置一個Listener區間,這樣MessageListenerContainer可以根據消息隊列中的消息規模自動調整並行數量。

1
2
3
4
5
6
 <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">  <property name="connectionFactory" ref="connectionFactory"/>  <property name="destinationName" value="${jms.queue.name}"/>  <property name="messageListener" ref="messageReceiver"/>  <property name="concurrency" value="4-8"/>  </bean> 

這次使用的是concurrency屬性,4-8表示最小並發數是4,最大並發數為8,當然也可以給一個固定值,比如5,這樣就相當於concurrentConsumers屬性了。

本章中的完整源碼可從完整代碼可從https://github.com/huangbowen521/SpringJMSSample下載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM