Exclusive Consumer:
獨有消費者:Queue中的消息是按照順序被分發到consumer的,然而,當你有多個consumers同時從相同的queue中提取消息時,你將失去這個保證。因為這些消息是被多個線程並發的處理。有的時候,保證消息按照順序處理是很重要的。例如:你可能不希望在插入訂單操作結束之前執行更新這個訂單的操作。
ActiveMQ從4.x版本開始支持Exclusive Consumer。Broker會從多個Consumers中挑選一個consumer來處理queue中所有的消息,從而保證了消息的有序處理。如果這個consumer失效,那么broker會自動切換到其他的consumer。可以通過destination options來創建一個Exclusive Consumer,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
還可以給consumer設置優先級,以便針對網絡情況進行優化,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");
Consumer Dispatche Async:
在activemq4.0以后,你可以選擇broker同步或異步的把消息分發給消費者。可以設置dispatchAsync屬性,默認是true,通常情況下這是最佳的。
你也可以通過如下幾種方式修改:
- 在ConnectionFactory層設置
ActiveMQConnectionFactory.setDispatchAsync(false);
- 在Conection上設置,這個設置將會覆蓋ConnectionFactory上的設置
ActiveMQConnetion.setDispatchAsync(false);
-
在Consumer上設置
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false"); consumer = session.createConsumer(queue);
Consumer Priority:
JMS JMSPriority定義了十個消息優先級值,0是最低優先級,9是最高優先級,另外,客戶端應當將0-4看作普通優先級,5-9看作加急優先級。
自定義Consumer Priority優先級。配置如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
Consumer的Priority的划分為0~127個級別,127是最高的級別,0是最低的也是ActiveMQ默認的。這種配置可以讓Broker根據consumer的優先級來發送消息到較高的優先級的Consumer上,如果某個較高的Consumer的消息轉載慢,則Broker會把消息發送到僅次於它優先級的Consumer上。
Manage Durable Subscribers:
消息持久化,保證了消費者離線之后,再次進入系統,不會錯過消息,但是這也會消耗很多的資源,從5.6開始,可以對持久化進行如下管理:
Removing inactive subscribers:我們還希望可以刪除那些不活動的訂閱者,如下:
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
- offlineDurableSubscriberTimeout:離線多長時間就過期刪除,缺省是-1,就是不刪除。
- offlineDurableSubscriberTaskSchedule: 多長時間檢查一次,缺省300000,單位毫秒。
Message Groups:
Message Goups就是對消息分組,它是Exclusive Consumer功能的增強。
邏輯上Message Groups可以看成是一種並發的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS消息屬性的JMSXGroupID用來區分message group.
Message Group特性保證所有具有相同JMSXGroupID的消息 都會被分發到相同的consumer(只要這個consumer保持在線)。
另一方面,Message Groups特性也是一種負載均衡的機制。在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker會檢查是否有某個consumer擁有這個message group.如果沒有,那么broker會選擇一個consumer,並將它關聯到這個message group.此后,這個consumer會接收到這個message group的所有消息,直到:
- consumer被關閉
- Message group被關閉,通過發送一個消息,並設置這個消息的JMSXGroupSeq為-1
創建一個Message Groups,只需要在message對象上設置屬性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
關閉一個Message Groups,只需要在message對象上設置屬性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq",-1);
Message Selectors:
JMS Selectors 用在獲取消息的時候,可以基於消息屬性和Xpath語法對消息進行過濾。JMS Selectors有SQL92語義定義。以下是個Selectors的例子:
consumer = session.createConsumer(destination, "JMSType='car' AND weight > 2500");
-
JMS Selectors表達式中,可以使用IN, NOT IN, LIKE等
-
需要注意的是,JMS Selectors表達式中的日期和時間需要使用標准的Long型毫秒值。
-
表達式中的屬性不會自動進行類型轉換,例如:
myMessage.setStringProperty("NumberOfOrders","2"); 那么此時“NumberOfOrders > 1” 的結果就是會false
-
Message Groups雖然可以保證具有相同的message group的消息會被唯一的consumer順序處理,但是卻不能確定被哪個consumer處理,在某些情況下,Message Groups可以和JMS Selector一起工作。
例如:設想三個consumers分別是A,B,C,你可以在producer中為消息設置三個message groups分別為“A","B","C"。然后令Consumer A使用JMSXGroupID='A'作為selector,c和b也同理,這樣就保證了message group A的消息只會被A處理,需要注意的是,這種做法有以下缺點:
(1) producer必須直到當前正在運行的consumers,也就是說producer和consumer被耦合到一起。
(2) 如果某個consumer失效,那么應該被這個consumer消費的消息將會一直被積壓在broker上。
Redelivery Policy:
ActiveMQ在接收消息的Client有以下幾種操作的時候,需要重新傳遞消息:
- Client用了transactions,且在Session中調用了rollback();
- Client用了transactions,且在調用commit()之前關閉。
- Client在CLIENT_ACKNOWLEDGE的傳遞模式下,在session中調用了recover();
可以通過設置ActiveMQConnectionFactory和ActiveMQConnection來定制想要的再次傳送策略,可用的Redelivery屬性如下:
1). collisionAvoidanceFactor:設置防止沖突范圍的正負百分比,只有啟用了useCollisionAvoidance參數時才生效。也就是在延遲時間上再加一個時間波動范圍。默認值是0.15
2). maximumRedeliveries:最大重傳次數,達到最大重傳次數后拋出異常。為-1時不限制次數,為0時表示不進行重傳。默認值為
3) . maximumRedeliveryDelay:傳送延遲,旨在useExpoentialBackOff為true時有效(5.5之后),假設首次重間隔為10ms,倍數為2,那么第二次重連時間間隔為20ms,第三次重連時間間隔為40ms,當重連時間間隔大於最大重連時間間隔時,以后每次重連時間間隔都為最大重連時間間隔。默認為-1.
4). initialRedeliveryDelay:初始重發延遲時間,默認1000L
5). redeliveryDelay:重發延遲時間,當initialRedeliveryDelay=0時生效,默認1000L
6). useCollisionAvoidance:啟用防止沖突功能,默認false
7). useExponentialBackOff:啟用指數倍數遞增的方式增加延遲時間,默認false
8). backOffMultiplier:重連時間間隔遞增倍數,只有值大於1和啟用useExponentialBackOff參數時才生效,默認是5;
在接收的client可以如下設置:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randowize=false");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(policy);
當消息試圖被傳遞的次數超多配置中的maximumRedeliveries屬性的值時,那么,broker會認定該消息是一個死消息,並會把該消息發送到死隊列中。默認activeMQ中死隊列被聲明為”ActiveMQ.DLQ",所有不能消費的消息都被傳遞到該死隊列中。你可以在activemq.xml中配置individualDeadLetterStrategy屬性,示例如下:
<policyEntry queue=">">
<dealLetterStrategy>
<individualDeadLetterStategy queuePrefix="DLQ."
useQueueForQueueMessage="true"/>
</dealLetterStrategy>
</policyEntry>
自動刪除過期消息:有時需要直接刪除過期的消息而不需要發送到死隊列中,可以使用屬性processExpired=false來設置,示例如下:
<policyEntry queue=">">
<dealLetterStrategy>
<sharedDeadLetterStategy processExpired="false"/>
</dealLetterStrategy>
</policyEntry>
存放非持久消息到死隊列中:默認情況下,ActiveMQ不會把非持久的死消息發送到死隊列中。如果你想非持久的消息 發送到死隊列中,需要設置屬性processNonPersistent="true",示例如下:
<policyEntry queue=">">
<dealLetterStrategy>
<sharedDeadLetterStategy processNonPersistent="true"/>
</dealLetterStrategy>
</policyEntry>
RedeliveryPolicy per Destination:在5.7之后,你可以為每一個Destination配置一個Redelivery Policy,示例如:
ActiveMQConnection connection ... // Create a connection
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);
// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);
Slow Consumer Handling:
Prefetch機制:ActiveMQ通過Prefetch機制來提供性能,方式是在客戶端得內存里可能緩存一定數量得消息。緩存消息得數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那么broker不會再向consumer分發消息,知道consumer像broker發送消息的確認,確認后的消息將會從緩存中去掉。
可以通過在ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象來配置prefetch policy。也可以通過connection options或destination options來配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
或
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue)
等方式配置
prefetch size缺省的值如下:
- persistent queues (default value: 1000)
- non-persistent queues (default value: 1000)
- persistent topics (default value: 100)
- non-persistent topics (default value: Short.MAX_VALUE - 1)
慢Consumer處理
慢消費者會在非持久的topics上導致問題,一旦消息積壓起來,會導致broker把大量消息保存到內存中,broker也會因此而變慢,目前,ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限之后,新消息到來時會丟棄舊的消息。
通過在配置文件的destination map中配置pendingMessageLimitStrategy,可以為不同的topic message配置不同的策略。
Pending Message Limit Strategy(等待消息限制策略),目前有以下兩種“
- Constant Pending Message Limit Strategy
Limit 可以設置0, > 0, -1三種方式:0表示:不額外的增加其預存大小,> 0表示:在額外的增加其預存大小,-1表示:不增加預存也不丟棄舊的消息,這個策略使用常量限制,配置如下:
<constantPendingMessageLimitStrategy limit="50"/>
- Prefetch Rate Pending Message LimitStrategy
這種策略是利用Consumer的之前的預存的大小乘以其倍數等於現在的預存大小。比如:
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
- 說明:在以上兩種方式中,如果設置了0,意味着除了prefetch之外不再緩存消息,如果設置了-1意味着禁止丟棄消息。
配置消息的丟棄策略,目前有三種方式:
oldestMessageEvictionStrategy:這個策略丟棄最舊的消息。
oldestMessageWithLowestPriorityEvictionStrategy: 這個策略丟棄最舊的,而且具有最低優先級的消息。
uniquePropertyMessageEvictionStrategy:從5.6開始,可以根據自定義的屬性來進行拋棄,比如<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>
表示要拋棄屬性名稱為Stock的消息。
配置方式: