1.消息發送
1.異步發送
消息生產者使用持久(persistent)傳遞模式發送消息的時候,Producer.send() 方法會被阻塞,直到 broker 發送一個確認消息給生產者,這個確認消息暗示生產者 broker已經成功地將它發送的消息路由到目標目的並把消息保存到二級存儲中。這個過程通常稱為同步發送。但有一個例外,當發送方法在一個事物上下文中時,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被寫到二級存儲中。
同步發送持久消息能夠提供更好的可靠性,但這潛在地影響了程序的相應速度,因為在接受到 broker 的確認消息之前應用程序或線程會被阻塞。如果應用程序能夠容忍一些消息的丟失,那么可以使用異步發送。異步發送不會在受到 broker 的確認之前一直阻塞 Producer.send 方法。如果想啟動異步傳送可以把 connector uri 的 jms.useAsyncSend 選項設為 true,如下所示:
tcp://localhost:61616?jms.useAsyncSend=true
從 ActiveMQ 5 開始可以控制異步發送流。也就是說,在受到 broker 的確認應答之前,生產者能夠傳送消息給 broker 的最大信息量。即使是異步發送消息,生產者也是在收到 broker 的確認應答后才把下一條消息傳送給 broker。當使用異步傳送的時候,可以設置jms.producerWindowSize(單位為字節)屬性,當生產者中等待發送的信息量到達設置的值時,即使沒有收到 broker 的應答消息,生產者同樣會把這些消息發給 broker。如下面的示例設置:
tcp://localhost:61616?jms.useAsyncSend=true&jms.producerWindowSize=1024000
org.apache.activemq.ActiveMQConnectionFactory中引入: <property name="useAsyncSend" value="true" /> <!-- false:同步 true:異步 --> |
2. ActiveMQ訊息策略
只對Topic有效 |
||||
DispatchPolcicy: 轉發策略 |
||||
RoundRobinDispatchPolicy |
輪詢 |
|||
StrictOrderDispatchPolicy |
嚴格有序 |
|||
PriorityDispatchPolicy |
基於“property”權重對“訂閱者”排序 |
|||
SimpleDispatchPolicy(默認值) |
按照當前“訂閱者”列表的順序 |
|||
SubscriptionRecoveryPolicy: 恢復策略 |
||||
FixedSizedSubscriptionRecoveryPolicy |
保存一定size的消息 |
|||
FixedCountSubscriptionRecoveryPolicy |
保存一定條數的消息 |
|||
LastImageSubscriptionRecoveryPolicy |
只保留最新的一條數據 |
|||
QueryBasedSubscriptionRecoveryPolicy |
符合置頂selector的消息都將被保存 |
|||
TimedSubscriptionRecoveryPolicy |
保留最近一段時間的消息 |
|||
NoSubscriptionRecoveryPolicy(默認值) |
關閉“恢復機制” |
|||
PendingMessageLimitStrategy: 消息限制策略(面向Slow Consumer) |
||||
ConstantPendingMessageLimitStrategy |
保留固定條數的消息 |
|||
PrefetchRatePendingMessageLimitStrategy |
保留prefetchSize倍數條消息 |
|||
MessageEvictionStrategy: 消息剔除策略(面向Slow Consumer) |
||||
OldestMessageEvictionStrategy(默認值) |
移除舊消息 |
|||
OldestMessageWithLowestPriorityEvictionStrategy |
舊數據中權重較低的消息,將會被移除 |
|||
UniquePropertyMessageEvictionStrategy |
移除具有指定property的舊消息 |
|||
PendingSubscriberMessageStoragePolicy: 待消息轉存策略(針對非耐久) |
||||
vmCursor (默認值) |
將消息轉存到基於內存 |
|||
storeCursor |
將消息轉存到storeEngine中 |
|||
fileCursor |
將消息轉存到臨時文件中 |
|||
PendingSubscriberMessageStoragePolicy: 待消息轉存策略(針對耐久) |
||||
vmDurableCursor |
將消息轉存到基於內存 |
|||
storeDurableSubscriberCursor |
將消息轉存到storeEngine中 |
|||
fileDurableSubscriberCursor |
將消息轉存到臨時文件中 |
|||
只對Queue有效 |
||||
PendingQueueMessageStoragePolicy:待消息轉存策略 |
||||
vmQueueCursor(默認值) |
將消息轉存到基於內存 |
|||
storeCursor |
將消息轉存到storeEngine中 |
|||
fileQueueCursor |
將消息轉存到臨時文件中 |
|||
對Topic和Queue都有效 |
||||
DeadLetterStrategy:“死信”策略 |
||||
IndividualDeadLetterStrategy |
把DeadLetter放入各自的死信通道中 |
|||
SharedDeadLetterStrategy(默認值) |
將所有的DeadLetter保存在一個共享的隊列中 |
|||
DiscardingDeadLetterStrategy |
broker將直接拋棄DeadLeatter |
|||
SlowConsumerStrategy:慢速消費者策略 |
||||
AbortSlowConsumerStrategy |
中斷慢速消費者 |
|||
AbortSlowConsumerStrategy |
如果慢速消費者最后一個ACK距離現在的時間間隔超過閥值,則中斷慢速消費者。 |
|||
3. 消息重發策略
1.在使用事務的Session中,調用rollback()方法;
2.在使用事務的Session中,調用commit()方法之前就關閉了Session;
3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式,並且調用了recover()方法。
<!-- 重發策略 --> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重發次數,默認為6次 這里設置為1次 --> <property name="maximumRedeliveries" value="4"></property> <!--重發時間間隔,默認為1秒 --> <property name="initialRedeliveryDelay" value="2000"></property> </bean> class="org.apache.activemq.ActiveMQConnectionFactory"中引入: <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> org.springframework.jms.listener.DefaultMessageListenerContainer中引入: <property name="sessionTransacted" value="true" /> |
4.消息持久化與非持久化
采用持久傳輸時,傳輸的消息會保存到磁盤中(messages are persisted to disk/database),即“存儲轉發”方式。先把消息存儲到磁盤中,然后再將消息“轉發”給訂閱者。
采用非持久傳輸時,發送的消息不會存儲到磁盤中。
采用持久傳輸時,當Borker宕機 恢復后,消息還在。采用非持久傳輸,Borker宕機重啟后,消息丟失。比如,當生產者將消息投遞給Broker后,Broker將該消息存儲到磁盤中,在Broker將消息發送給Subscriber之前,Broker宕機了,如果采用持久傳輸,Broker重啟后,從磁盤中讀出消息再傳遞給Subscriber;如果采用非持久傳輸,這條消息就丟失了。
org.springframework.jms.core.JmsTemplate中引入: <property name="explicitQosEnabled" value="true" /> <!-- 是否啟用下列配置 --> <property name="deliveryMode" value="1"/> <!-- 1:非持久化2:持久化 --> <property name="deliveryPersistent" value="false" /> <!-- true:持久化false:非持久化 --> 其中deliveryPersistent寫不寫均可 |
5.死信隊列
DLQ-死信隊列(Dead Letter Queue)用來保存處理失敗或者過期的消息.缺省的死信隊列是ActiveMQ.DLQ,如果沒有特別指定,死信都會被發送到這個隊列。默認情況下持久消息過期,會被送到DLQ,非持久消息不會送到DLQ
消息進入死信隊列條件:
1.給ActiveMQConnectionFactory配上重發機制;
2.給DefaultMessageListenerContainer配置事務;
在activemq.xml的policyEntries節點中增加如下策略配置。 <policyEntry queue=">"> <deadLetterStrategy> <individualDeadLetterStrategy useQueueForQueueMessages="true" processNonPersistent="true" queuePrefix="DLQ." processExpired="true"/> </deadLetterStrategy> </policyEntry> useQueueForQueueMessages:true隊列;false主題 processNonPersistent:true非持久化的消息放入死信隊列 processExpired:true過期消息放入死信隊列 queuePrefix:隊列名稱前綴, DLQ.+隊列名稱 |
6. Consumer特性詳解與優化
asyncDispatch(默認為true) |
broker端是否允許使用異步轉發 |
alwaysSessionAsync(默認為true) |
客戶端session是否使用異步轉發 |
maxThreadPoolSize (默認Integer.MAX_VALUE) |
Session異步轉發線程池大小,Connection下所有session共享 |
useDedicatedTaskRunner(默認為false) |
為每個session設置單獨的線程池 |
messagePrioritySupported(默認為true) |
Consumer是否開啟消息權重,即是否支持消息優先級的屬性設置 |
Priority(默認為0) |
消費者權重,broker會優先級把消息轉發給權重更大的Consumer |
prefetchSize(默認為1000) |
Broker批量發送prefetchSize條消息給consumer |
noLocal(默認為false) |
consumer是否接受本地消息,本地消息,就是為同一個connection創建的producer發送的消息 |
exclusiveConsumer (僅對Queue有效, 默認值為false) |
如果Queue中,有任意一個Consumer是“排他的”,那么消息只會轉發給“exclusiveConsumer=true”的消費者;如果全部的消費者都是“排他的”,那么最新創建的consumer將會獲取消息 |
allConsumersExclusiveByDefault |
如果此參數為true,那么當前Queue中的所有消費者默認為exclusive |
Retroactive (僅對Topic有效, 默認值為false) |
retroactive類型的訂閱者可以獲取這些原本不屬於自己但broker上還保存的舊消息, 如果此訂閱者不是durable(耐久的),它可以獲取最近發布的一些消息;如果是durable,它可以獲取存儲器中尚未刪除的所有的舊消息 |
selector (選擇器) |
Broker通過selector在篩選滿足條件的Consumer, 如果你使用了selector,你一定要讓全局中所有的selector覆蓋所有的消息,或者至少有一個沒有selector的consumer |
Group(消息分組)(僅對Queue有效) |
Producer在發送消息時為多條消息設定group,它們將會被同一個consumer消費 |
Duplicate(重復消息) |
consumer在將消息消費之前,都會檢測消息是否重復,對於重復消息,將不會消費而是直接發送poisonAck(毒丸),broker端會將消息直接刪除。 |
7.慢速消費者(Slow Consumer)
慢速,是相對於producer而言;簡單來說,producer不斷產生新的消息,broker端在內存中已經積壓的足夠多(比如cacheLimit已滿),但是在轉發給某個consumer時,發現此consumer仍然有大量的消息尚沒有消費(ACK),broker會認為此consumer是慢速的。
在Queue中,如果已發送(dispatched)但沒有消費(unAck)的消息條數 > prefetchSize時,此consumer被標記為Slow。
在Topic中,如果cacheLimit已滿,但是需要向此訂閱者發送的消息量 > prefetchSize時,此訂閱者被標記為Slow。
簡單描述為: 快速的producer生產的消息,不能被消費者及時的消費,而導致在broker端積壓。
解決方案:
1) 關閉Slow Consumer
brorker端一旦發現slow consumer,就將它注冊到慢速消費者列表中,此后將有額外的線程掃描並關閉它們,其中abortConnection表示,是否關閉底層的transport,默認為false,此時將會通過transport向client端發送一個指令(其中包括consumerId),當client端(Session)接收之后,將會調用consumer.close()方法;如果此值為true,將會導致底層的transport鏈接被關閉,這是很粗暴的辦法,不過如果client端多個consumer共享一個connection的話,會導致所有的consumer被關閉,還是那句話:豬一樣的隊友,害了整個團隊。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb"> <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="false"/> </slowConsumerStrategy> </policyEntry> |
2) 拋棄舊消息(僅對Topic有效,僅對nondurable訂閱者有效)
topic支持支持3種移除策略:
(1) oldestMessageEvictionStrategy表示移除最舊的消息
(2) uniquePropertyMessageEvictionStrategy表示移除根據屬性值篩選消息並移除最舊的
(3) OldestMessageWithLowestPriorityEvictionStrategy表示在舊消息中移除權重最低的。
3) 寫入臨時文件
對於Queue而言,支持storeCursor,vmQueueCursor , fileQueueCursor。其中storeCursor是一個“綜合”策略,持久化消息使用fileQueueCurosr支持,非持久化消息使用vmQueueCursor支持。vmQueueCursor基於內存,fileQueueCursor表示將數據寫入本地臨時文件(由tempDataStore決定)。
對於Topic而言,我們也可以根據訂閱者的類型,來決定如果處理那些滯留的非持久化消息。
4) offlineDurableSubscriberTimeout
對於durable訂閱者,如果訂閱者“離線”,那么Broker將會一直保存屬於它的消息,因此消息也會以為它而不能刪除,導致積壓。通常,訂閱者離線的時間是無法預估的,有可能此訂閱者永遠都不會再上線(可能因為durable訂閱者本來應該cancel,但是開發者卻忘記了),這對broker來說是致命的。我們需要限制durable訂閱者離線的時間,如果超過時間,那么訂閱者將會被移除,消息也會因此而刪除。
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000"> |
8. Activemq消息確認機制
optimizeAck表示是否開啟“優化ACK”, 只有在為true的情況下,prefetchSize(下文中將會簡寫成prefetch)以及optimizeAcknowledgeTimeout參數才會有意義, "optimizeAcknowledgeTimeout"選項只能在brokerUrl中配置, 在destinationUri中指定prefetchSize(預獲取)選項, 其中brokerUrl參數選項是全局的,如果同時指定,brokerUrl中的參數選項值將會被覆蓋;
當consumer端使用MessageListener異步獲取消息時: prefetch>=1
當consumer端使用receive()方法同步獲取消息時: prefetch>=0
prefetch=0, receive()方法將會首先發送一個PULL指令並阻塞,直到broker端返回消息為止;
prefetch>0, broker端將會批量push給client 一定數量的消息(<= prefetch),client端會把這些消息(unconsumedMessage)放入到本地的隊列中;