一.消息目的地策略
在節點destinationPolicy配置策略,可以對單個或者所有的主題和隊列進行設置,使用流量監控,當消息達到memoryLimit的時候,ActiveMQ會減慢消息的產生甚至阻塞,destinationPolicy的配置如下:
<!-- Destination specific policies using destination names or wildcards --> <!-- wildcards意義見http://activemq.apache.org/wildcards.html --> <destinationPolicy> <policyMap> <policyEntries> <!-- 這里使用了wildcards,表示所有以Msg開頭的topic --> <policyEntry topic="Msg.>" producerFlowControl="false" memoryLimit="10mb"> <!-- 分發策略 --> <dispatchPolicy> <!-- 按順序分發 --> <strictOrderDispatchPolicy/> </dispatchPolicy> <!-- 恢復策略--> <subscriptionRecoveryPolicy> <!-- 只恢復最后一個message --> <lastImageSubscriptionRecoveryPolicy/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
producerFlowControl表示是否監控流量,默認為true,如果設置為false,消息就會存在磁盤中以防止內存溢 出;memoryLimit表示在producerFlowControl=”true”的情況下,消息存儲在內存中最大量,當消息達到這個值 時,ActiveMQ會減慢消息的產生甚至阻塞。
隊列分發策略如下:
property | default | description |
---|---|---|
useConsumerPriority | true | use the priority of a consumer when dispatching messages from a Queue |
strictOrderDispatch | false | if true queue will not round robin consumers, but it'll use a single one until its prefetch buffer is full |
optimizedDispatch | false | don't use a separate thread for dispatching from a Queue |
lazyDispatch | false | only page in from store the number of messages that can be dispatched at time |
consumersBeforeDispatchStarts | 0 | when the first consumer connects, wait for specified number of consumers before message dispatching starts |
timeBeforeDispatchStarts | 0 | when the first consumer connects, wait for specified time (in ms) before message dispatching starts |
queuePrefetch | n/a | sets the prefetch for consumers that are using the default value |
expireMessagesPeriod | 30000 | the period (in ms) of checks for message expiry on queued messages, value of 0 disables |
簡單翻譯如下:
useConsumerPriority:默認策略,按照用戶優先級設置發送消息
strictOrderDispatchPolicy:保證每個topic consumer會以相同的順序接收消息,代價是性能上的損失
policyEntry的屬性參考:http://activemq.apache.org/per-destination-policies.html
當producer發送的持久化消息到達broker之后,broker首先會把它保存在持久存儲中。接下來,如果發現當前有活躍的 consumer,如果這個consumer消費消息的速度能跟上producer生產消息的速度,那么ActiveMQ會直接把消息傳遞給broker 內部跟這個consumer關聯的dispatch queue;如果當前沒有活躍的consumer或者consumer消費消息的速度跟不上producer生產消息的速度,那么ActiveMQ會使用 Pending Message Cursors保存對消息的引用。在需要的時候,Pending Message Cursors把消息引用傳遞給broker內部跟這個consumer關聯的dispatch queue。以下是兩種Pending Message Cursors:
VM Cursor:在內存中保存消息的引用。
File Cursor:首先在內存中保存消息的引用,如果內存使用量達到上限,那么會把消息引用保存到臨時文件中。
在缺省情況下,ActiveMQ 會根據使用的Message Store來決定使用何種類型的Message Cursors,但是你可以根據destination來配置Message Cursors。
對於topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有
vmDurableCursor 和 fileDurableSubscriberCursor;對於queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。
Message Cursors的使用參考:http://activemq.apache.org/message-cursors.html
二.存儲配置
設置消息在內存、磁盤中存儲的大小,配置如下:
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage>
memoryUsage表示ActiveMQ使用的內存,這個值要大於等於destinationPolicy中設置的所有隊列的內存之和。
storeUsage表示持久化存儲文件的大小。
tempUsage表示非持久化消息存儲的臨時內存大小。
三.主備配置
ActiveMQ的主備有三種方式:純Master/Slave、文件共享方式、數據庫共享方式。
1、純Master/Slave
這種方式的主備不需要對Master Broker做特殊的配置,只要在Slave Broker中指定他的Master就可以了,指定Master有兩種方式,最簡單的配置就是在broker節點中添加 masterConnectorURI=”tcp://localhost:61616″即可,還有一種方式就是添加一個services節點,可以指定 連接的用戶名和密碼,配置如下:
<services> <masterConnector remoteURI= "tcp://localhost:61616" userName="system" password="manager"/> </services>
純Master/Slave只允許一個Slave連接到Master上面,也就是說只能有2台MQ做集群,同時當Master掛了之后需要停止Slave來恢復負載。
2、數據庫共享方式
這種方式的主備采用數據庫做消息的持久化,支持多個Slave,所有broker持久化數據源配置成同一個數據源,當一個broker獲取的數據庫鎖之 后,其他的broker都成為slave並且等待獲取鎖,當master掛了之后,其中的一個slave將會立刻獲得數據庫鎖成為master,重啟之前 掛掉的master之后,這個master也就成了slave,不需要停止slave來恢復。由於采用的是數據庫做為持久化,它的性能是有限的。
3、文件共享方式
這種方式的主備具有和數據庫共享方式的負載一樣的特性,不同的是broker的持久化采用的是文件(我這里用KahaDB),slave等待獲取的鎖是文件鎖,它具有更高的性能,但是需要文件共享系統的支持。
Window下共享KahaDB持久化的目錄,配置如下:
<persistenceAdapter> <kahaDB directory="//172.16.1.202/mqdata/kahadb"/> </persistenceAdapter>
Linux下需要開啟NFS服務,具體操作如下:
創建共享目錄(192.168.0.1):
1、 修改etc/exports,添加需要共享的目錄:/opt/mq/data *(rw,no_root_squash)
2、 啟動NFS服務 service nfs start/restart
3、 查看共享 showmount –e
4、 NFS服務自啟動 chkconfig –level 35 nfs on
掛載共享目錄(192.168.0.2):
1、 掛載:mount –t nfs 192.168.0.1:/opt/mq/data /opt/mq/data
2、 啟動自動掛載:在etc/fstab文件添加10.175.40.244:/opt/mq/data /opt/mq/data nfs defaults 0 0
然后指定KahaDB的持久化目錄為/opt/mq/data即可。
AIX系統的文件共享和Linux類似,也是啟動NFS服務。
注意:如果Master服務器宕機了,Slave是不會獲得文件鎖而啟動,直到Master服務器重啟。
Window下Master上有Slave連接時如圖:
客戶端連接的brokerURL為failover:(tcp://localhost:61616,tcp://localhost:61617)。用 第三部分的代碼測試,先向Master Broker發送一個消息,然后關閉master,運行獲取消息的方法,即可獲取之前發送的消息。
四.負載均衡配置
ActiveMQ可以實現多個mq之間進行路由,假設有兩個mq,分別為brokerA和brokerB,當有一條消息發送到brokerA的隊列 test中,有一個客戶端連接到brokerB上,並且要求獲取test隊列的消息時,brokerA中隊列test的消息就會路由到brokerB上, 反之brokerB的消息也會路由到brokerA。
靜態路由配置,brokerA不需要特別的配置,brokerB需要配置networkConnectors節點,具體配置如下:
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/> </networkConnectors>
靜態路由支持failover,如:static:failover://(tcp://host1:61616,tcp://host2:61616)。
動態路由配置,每個mq都需要配置如下:
<networkConnectors> <networkConnector uri="multicast://default" /> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" /> </transportConnectors>
注意:networkConnectors需要配置在persistenceAdapter之前。
重啟ActiveMQ,可以看到brokerA的日志如圖:
1、AMQ
AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲在一個個文件中,文件的默認大小為32兆,如果一條消息的大小超過了32 兆,那么這個值必須設置大點。當一個存儲文件中的消息已經全部被消費,那么這個文件將被標識為可刪除,在下一個清除階段,這個文件被刪除。默認配置如下:
<persistenceAdapter> <amqPersistenceAdapter directory="activemq-data" maxFileLength="32mb"/> </persistenceAdapter>
AMQ的屬性:
屬性名稱 | 默認值 | 描述 |
directory | activemq-data | 消息文件和日志的存儲目錄 |
useNIO | true | 使用NIO協議存儲消息 |
syncOnWrite | false | 同步寫到磁盤,這個選項對性能影響非常大 |
maxFileLength | 32mb | 一個消息文件的大小 |
persistentIndex | true | 消息索引的持久化,如果為false,那么索引保存在內存中 |
maxCheckpointMessageAddSize | 4kb | 一個事務允許的最大消息量 |
cleanupInterval | 30000 | 清除操作周期,單位ms |
indexBinSize | 1024 | 索引文件緩存頁面數,缺省為1024,當amq擴充或者縮減存儲時,會鎖定整個broker,導致一定時間的阻塞,所以這個值應該調整到比較大,但是代碼中實現會動態伸縮,調整效果並不理想。 |
indexKeySize | 96 | 索引key的大小,key是消息ID |
indexPageSize | 16kb | 索引的頁大小 |
directoryArchive | archive | 存儲被歸檔的消息文件目錄 |
archiveDataLogs | false | 當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
2、KahaDB
KahaDB是基於文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復的時間比AMQ短,從5.4版本之后KahaDB做為默認的持久化方式。默認配置如下:
<persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/> </persistenceAdapter>
KahaDB的屬性:
property name | default value | Comments |
directory | activemq-data | 消息文件和日志的存儲目錄 |
indexWriteBatchSize | 1000 | 一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中 |
indexCacheSize | 10000 | 內存中,索引的頁大小 |
enableIndexWriteAsync | false | 索引是否異步寫到消息文件中 |
journalMaxFileLength | 32mb | 一個消息文件的大小 |
enableJournalDiskSyncs | true | 是否講非事務的消息同步寫入到磁盤 |
cleanupInterval | 30000 | 清除操作周期,單位ms |
checkpointInterval | 5000 | 索引寫入到消息文件的周期,單位ms |
ignoreMissingJournalfiles | false | 忽略丟失的消息文件,false,當丟失了消息文件,啟動異常 |
checkForCorruptJournalFiles | false | 檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復 |
checksumJournalFiles | false | 產生一個checksum,以便能夠檢測journal文件是否損壞。 |
5.4版本之后有效的屬性: | ||
archiveDataLogs | false | 當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
directoryArchive | null | 存儲被歸檔的消息文件目錄 |
databaseLockedWaitDelay | 10000 | 在使用負載時,等待獲得文件鎖的延遲時間,單位ms |
maxAsyncJobs | 10000 | 同個生產者產生等待寫入的異步消息最大量 |
concurrentStoreAndDispatchTopics | false | 當寫入消息的時候,是否轉發主題消息 |
concurrentStoreAndDispatchQueues | true | 當寫入消息的時候,是否轉發隊列消息 |
5.6版本之后有效的屬性: | ||
archiveCorruptedIndex | false | 是否歸檔錯誤的索引 |
從5.6版本之后,有可能發布通過多個kahadb持久適配器來實現分布式目標隊列存儲。什么時候用呢?如果有一個快速的生產者和消費者,當某一個 時刻生產者發生了不規范的消費,那么有可能產生一條消息被存儲在兩個消息文件中,同時,有些目標隊列是危險的並且要求訪問磁盤。在這種情況下,你應該用通 配符來使用mKahaDB。如果目標隊列是分布的,事務是可以跨越多個消息文件的。
每個KahaDB的實例都可以配置單獨的適配器,如果沒有目標隊列提交給filteredKahaDB,那么意味着對所有的隊列有效。如果一個隊列沒有對應的適配器,那么將會拋出一個異常。配置如下:
<persistenceAdapter> <mKahaDB directory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!-- match all queues --> <filteredKahaDB queue=">"> <persistenceAdapter> <kahaDB journalMaxFileLength="32mb"/> </persistenceAdapter> </filteredKahaDB> <!-- match all destinations --> <filteredKahaDB> <persistenceAdapter> <kahaDB enableJournalDiskSyncs="false"/> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter>
如果filteredKahaDB的perDestination屬性設置為true,那么匹配的目標隊列將會得到自己對應的KahaDB實例。配置如下:
<persistenceAdapter> <mKahaDB directory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!-- kahaDB per destinations --> <filteredKahaDB perDestination="true" > <persistenceAdapter> <kahaDB journalMaxFileLength="32mb" /> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter>
3、JDBC
配置JDBC適配器:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> </persistenceAdapter>
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認值是true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之后改成false。
MYSQL持久化bean
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="poolPreparedStatements" value="true"/> </bean>
SQL Server持久化bean
<bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close"> <property name="serverName" value="SERVERNAME"/> <property name="portNumber" value="PORTNUMBER"/> <property name="databaseName" value="DATABASENAME"/> <property name="user" value="USER"/> <property name="password" value="PASSWORD"/> </bean>
Oracle持久化bean
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
DB2持久化bean
<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/> <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>