ActiveMQ.xml 配置詳解



<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <!-- Allows log searching in hawtio console --> <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="127.0.0.1" dataDirectory="${activemq.data}">
<destinationPolicy> <policyMap> <policyEntries> <!-- 訂閱/發布--> <policyEntry topic=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb"> <!-- 消息限制策略,面向Slow Consumer的 此策略只對Topic有效,只對nondurable訂閱者有效,當通道中有大量的消息積壓時,broker可以保留的消息量。 為了防止Topic中有慢速消費者,導致整個通道消息積壓。(對於Topic而言,一條消息只有所有的訂閱者都消費才 會被刪除) --> <pendingMessageLimitStrategy> <!-- ConstantPendingMessageLimitStrategy: 保留固定條數的消息,如果消息量超過limit,將使用 “MessageEvictionStrategy”移除消息 PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍數條消息。 --> <!-- 如果prefetchSize為100,則保留10 * 100條消息 --> <prefetchRatePendingMessageLimitStrategy multiplier="10"/> </pendingMessageLimitStrategy> <!-- 消息剔除策略 面向Slow Consumer的 配合PendingMessageLimitStrategy,只對Topic有效,只對nondurable訂閱者有效。當PendingMessage的數量超過 限制時,broker該如何剔除多余的消息。當Topic接收到信息消息后,會將消息“Copy”給每個訂閱者,在保存這 個消息時(保存策略"PendingSubscriberMessageStoragePolicy"),將會檢測pendingMessages的數量是否超過限 制(由"PendingMessageLimitStrategy"來檢測),如果超過限制,將會在pendingMessages中使用 MessageEvicationStrategy移除多余的消息,此后將新消息保存在PendingMessages中。 --> <messageEvictionStrategy> <!-- OldestMessageEvictionStrategy: 移除舊消息,默認策略。 OldestMessageWithLowestPriorityEvictionStrategy: 舊數據中權重較低的消息,將會被移除。 UniquePropertyMessageEvictionStrategy: 移除具有指定property的舊消息。開發者可以指定property的名稱 ,從此屬性值相同的消息列表中移除較舊的(根據消息的創建時間)。 --> <OldestMessageWithLowestPriorityEvictionStrategy /> </messageEvictionStrategy> <!-- 慢速消費者策略 Broker將如何處理慢消費者。Broker將會啟動一個后台線程用來檢測所有的慢速消費者,並定期關閉關閉它們。 --> <slowConsumerStrategy> <!-- AbortSlowConsumerStrategy: 中斷慢速消費者,慢速消費將會被關閉。abortConnection是否關閉連接 AbortSlowConsumerStrategy: 如果慢速消費者最后一個ACK距離現在的時間間隔超過閥maxTimeSinceLastAck, 則中斷慢速消費者。 --> <abortSlowConsumerStrategy abortConnection="false"/><!-- 不關閉底層鏈接 --> </slowConsumerStrategy> <!--轉發策略 將消息轉發給消費者的方式--> <dispatchPolicy> <!-- RoundRobinDispatchPolicy: “輪詢”,消息將依次發送給每個“訂閱者”。“訂閱者”列表默認按照訂閱的先后 順序排列,在轉發消息時,對於匹配消息的第一個訂閱者,將會被移動到“訂閱者 ”列表的尾部,這也意味着“下一條”消息,將會較晚的轉發給它。 StrictOrderDispatchPolicy: 嚴格有序,消息依次發送給每個訂閱者,按照“訂閱者”訂閱的時間先后。它和 RoundRobin最大的區別是,沒有移動“訂閱者”順序的操作。 PriorityDispatchPolicy: 基於“property”權重對“訂閱者”排序。它要求開發者首先需要對每個訂閱者指定 priority,默認每個consumer的權重都一樣。 SimpleDispatchPolicy: 默認值,按照當前“訂閱者”列表的順序。其中PriorityDispatchPolicy是其子類。 --> <strictOrderDispatchPolicy/> </dispatchPolicy> <!--恢復策略 ActiveMQ重啟如何恢復數據--> <subscriptionRecoveryPolicy> <!-- FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker將為此Topic開辟定額的RAM用來保存 最新的消息。使用maximumSize屬性指定保存的size數量 FixedCountSubscriptionRecoveryPolicy: 保存一定條數的消息。 使用maximumSize屬性指定保存的size數量 LastImageSubscriptionRecoveryPolicy: 只保留最新的一條數據 QueryBasedSubscriptionRecoveryPolicy: 符合置頂selector的消息都將被保存,具體能夠“恢復”多少消息 ,由底層存儲機制決定;比如對於非持久化消息,只要內存中還 存在,則都可以恢復。 TimedSubscriptionRecoveryPolicy: 保留最近一段時間的消息。使用recoverDuration屬性指定保存時間 單位 毫秒 NoSubscriptionRecoveryPolicy: 關閉“恢復機制”。默認值。 --> <!--恢復最近30分鍾內的信息--> <timedSubscriptionRecoveryPolicy recoverDuration="1800000"/> </subscriptionRecoveryPolicy> <!--"死信"策略 如何處理過去消息 缺省死信隊列(Dead Letter Queue)叫做ActiveMQ.DLQ;所有的未送達消息都會被發送到這個隊列,以致會非常 難於管理。 默認情況下,無論是Topic還是Queue,broker將使用Queue來保存DeadLeader,即死信通道通常為 Queue;不過開發者也可以指定為Topic。 --> <deadLetterStrategy> <!-- IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,queuePrefix自定義死信前綴 ,useQueueForQueueMessages使用隊列保存死信,還有一個屬性為“useQueueForTopicMessages”,此值表示是否 將Topic的DeadLetter保存在Queue中,默認為true。 <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> SharedDeadLetterStrategy: 將所有的DeadLetter保存在一個共享的隊列中,這是ActiveMQ broker端默認的策略 。共享隊列默認為“ActiveMQ.DLQ”,可以通過“deadLetterQueue”屬性來設定。還有2個很重要的可選參數 ,“processExpired”表示是否將過期消息放入死信隊列,默認為true;“processNonPersistent”表示是否將“ 非持久化”消息放入死信隊列,默認為false。 <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/> DiscardingDeadLetterStrategy: broker將直接拋棄DeadLeatter。如果開發者不需要關心DeadLetter,可以使用 此策略。AcitveMQ提供了一個便捷的插件:DiscardingDLQBrokerPlugin,來拋棄DeadLetter。 下面這個必須配置plugins節點中才對, 丟棄所有死信 <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> 丟棄指定死信 <discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000" /> 使用丟棄正則匹配到死信 <discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000" /> --> <individualDeadLetterStrategy queuePrefix="DLQ.TOPIC." useQueueForQueueMessages="true"/> </deadLetterStrategy> <!--非耐久待處理消息處理策略 類似於:pendingQueuePolicy(在下面自己找找)--> <pendingSubscriberPolicy> <!--支持三種策略:storeCursor, vmCursor和fileCursor。--> <fileCursor/> </pendingSubscriberPolicy> <!--耐久待處理消息處理策略 類似於:pendingQueuePolicy(在下面自己找找)--> <pendingDurableSubscriberPolicy> <!--支持三種策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。--> <storeDurableSubscriberCursor/> </pendingDurableSubscriberPolicy> </policyEntry> <!--消息隊列--> <policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb"> <pendingMessageLimitStrategy> <prefetchRatePendingMessageLimitStrategy multiplier="10"/> </pendingMessageLimitStrategy> <messageEvictionStrategy> <OldestMessageWithLowestPriorityEvictionStrategy /> </messageEvictionStrategy> <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="false"/> </slowConsumerStrategy> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="1800000"/> </subscriptionRecoveryPolicy> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ.QUEUE." useQueueForQueueMessages="true"/> </deadLetterStrategy> <!-- pendingQueuePolicy 待消費消息策略 通道中有大量Slow Consumer時,Broker該如何優化消息的轉發,以及在此情況下,“非持久化”消息達到內存 限制時該如何處理。 當Broker接受到消息后,通常將最新的消息寫入內存以提高消息轉發的效率,提高消息ACK的效率,減少對對底 層Store的操作;如果Consumer非常快速,那么消息將會立即轉發給Consumer,不需要額外的操作;但當遇到 Slow Consumer時,情況似乎並沒有那么美好。 持久化消息,通常為:寫入Store->線程輪詢,從Store中pageIn數據到PendingStorage->轉發給Consumer->從 PendingStorage中移除->消息ACK后從Store中移除。 對於非持久化數據,通常為:寫入內存->如果內存足夠,則PendingStorage直接以內存中的消息轉發->如果內 存不足,則將內存中的消息swap到臨時文件中->從臨時文件中pageIn到內存,轉發給Consumer。 AcitveMQ提供了幾個的Cursor機制,它就是用來保存Pending Messages。 1) vmQueueCursor: 將待轉發消息保存在額外的內存(JVM linkeList)的存儲結構中。是“非持久化消息”的默 認設置,如果Broker不支持Persistent,它是任何類型消息的默認設置。有OOM風險。 2) fileQueueCursor: 將消息保存到臨時文件中。文件存儲方式有broker的tempDataStore屬性決定。是“持久 化消息”的默認設置。 3) storeCursor: “綜合”設置,對於非持久化消息,將采用vmQueueCursor存儲,對於持久化消息采用 fileQueueCursor。這是強烈推薦的策略,也是效率最好的策略。 --> <pendingQueuePolicy> <storeCursor> <nonPersistent> <fileQueueCursor/> </nonPersistent> </storeCursor> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- ActiveMQ的特性之一是很好的支持JMX。通過JMX MBeans可以很方便的監聽和控制ActiveMQ的broker。

官方網站提供的JMX特性說明對於遠程訪問的配置流程坑爹,如果想使用jconsole對ActiveMQ進行監控, 無密碼訪問> 需要在borker節點設置useJmx屬性為true,且managementContext節點的createConnector屬性為true。 通過jconsole訪問地址service:jmx:rmi:///jndi/rmi://ip:1099/jmxrmi進行連接, 默認端口為1099,可以通過connectorPort屬性修改連接端口,遠程訪問需要設置connectorHost屬性 為本機ip以供遠程訪問 有密碼訪問> 需要在borker節點設置useJmx屬性為true,且managementContext節點的createConnector屬性為false。 然后在${actviemq.base}/conf目錄下的jmx.access和jmx.password中添加用戶權限和密碼, 最后修改${activemq.base}/bin/activemq文件,找到下面的內容然后去掉注釋,保存退出,重啟activemq即可 # ACTIVEMQ  SUNJMXSTART="-Dcom.sun.management.jmxremote.port=11099" # ACTIVEMQ  SUNJMX START="$ACTIVEMQ  SUNJMX START-Dcom.sun.management.jmxremote.password.file=${ACTIVEMQ  CONF}/ jmx.password" # ACTIVEMQ SUNJMX  START="$ACTIVEMQ SUNJMX  START-Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ CONF}/ jmx.access" # ACTIVEMQ  SUNJMX START="$ACTIVEMQ  SUNJMX START -Dcom.sun.management.jmxremote.ssl=false" 

-->
    <managementContext> <managementContext createConnector="false"/> </managementContext> <!--持久化存儲--> <persistenceAdapter> <!-- 官方默認的持久化方案 AMQ Message Store 是 ActiveMQ5.0 缺省的持久化存儲。Message commands 被保存到 transactional journal(由 rolling data logs 組成)。Messages 被保存到 data logs 中,同時被 reference store 進行索引以提高存取速度。 Date logs由一些單獨的 data log 文件組成,缺省的文件大小是 32M,如果某個消息的大小超過了 data log 文件的大 小,那么可以修改配置以增加 data log 文件的大小。 如果某個 data log 文件中所有的消息都被成功消費了,那么這個 data log 文件將會被標記,以便在下一輪的清理中 被刪除或者歸檔。 --> <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/> <!--Kaha Persistence 是一個專門針對消息持久化的解決方案。它對典型的消息使用模式進行了優化。在 Kaha 中,數據被 追加到 data logs 中。當不再需要 log文件中的數據的時候,log 文件會被丟棄。--> <!-- <kahaDB directory="${activemq.data}/kahadb"/>--> <!-- 支持的數據庫有Apache Derby,Axion,DB2,HSQL,Informix,MaxDB,MySQL,Oracle,Postgresql,SQLServer,Sybase。 如果你使用的數據庫不被支持,那么可以調整 StatementProvider 來保證使 用正確的 SQL 方言(flavour of SQL)。通常絕大多數數據庫支持以下 adaptor:
 1、org.activemq.store.jdbc.adapter.BlobJDBCAdapter  2、org.activemq.store.jdbc.adapter.BytesJDBCAdapter  3、org.activemq.store.jdbc.adapter.DefaultJDBCAdapter  4、org.activemq.store.jdbc.adapter.ImageJDBCAdapter 也可以在配置文件中直接指定 JDBC adaptor  參考下面的的“jdbc持久化配置” -->
 
2、總結
  
   想要使用異步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true, 如果設置了alwaysSyncSend=true系統將會忽略useAsyncSend設置的值都采用同步:
     1) 當alwaysSyncSend=false時,“NON_PERSISTENT”(非持久化)、事務中的消息將使用“異步發送”
     2) 當alwaysSyncSend=false時,如果指定了useAsyncSend=true,“PERSISTENT”持久化類型的消息使用異步發送。如果useAsyncSend=false,“PERSISTENT”類型的消息使用同步發送。
  默認情況(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事務內的消息均采用異步發送;對於持久化消息采用同步發送。
 
  轉自:http://www.tuicool.com/articles/rmmAbyu


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM