ActiveMq配置解析


一.消息目的地策略

在節點destinationPolicy配置策略,可以對單個或者所有的主題和隊列進行設置,使用流量監控,當消息達到memoryLimit的時候,ActiveMQ會減慢消息的產生甚至阻塞,destinationPolicy的配置如下:

 

  <!-- Destination specific policies using destination names or wildcards -->
    <!-- wildcards意義見http://activemq.apache.org/wildcards.html -->
    <destinationPolicy>
      <policyMap>
        <policyEntries>
       <!-- 這里使用了wildcards,表示所有以Msg開頭的topic -->
          <policyEntry topic="Msg.>" producerFlowControl="false" memoryLimit="10mb">
            <!-- 分發策略 -->
        <dispatchPolicy>
          <!-- 按順序分發 -->
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
        <!--  恢復策略-->
            <subscriptionRecoveryPolicy>
          <!-- 只恢復最后一個message -->
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>

 

producerFlowControl表示是否監控流量,默認為true,如果設置為false,消息就會存在磁盤中以防止內存溢 出;memoryLimit表示在producerFlowControl=”true”的情況下,消息存儲在內存中最大量,當消息達到這個值 時,ActiveMQ會減慢消息的產生甚至阻塞。

隊列分發策略如下:

 

property default description
useConsumerPriority true use the priority of a consumer when dispatching messages from a Queue
strictOrderDispatch false if true queue will not round robin consumers, but it'll use a single one until its prefetch buffer is full
optimizedDispatch false don't use a separate thread for dispatching from a Queue
lazyDispatch false only page in from store the number of messages that can be dispatched at time
consumersBeforeDispatchStarts 0 when the first consumer connects, wait for specified number of consumers before message dispatching starts
timeBeforeDispatchStarts 0 when the first consumer connects, wait for specified time (in ms) before message dispatching starts
queuePrefetch n/a sets the prefetch for consumers that are using the default value
expireMessagesPeriod 30000 the period (in ms) of checks for message expiry on queued messages, value of 0 disables

 

簡單翻譯如下:

useConsumerPriority:默認策略,按照用戶優先級設置發送消息

 

strictOrderDispatchPolicy:保證每個topic consumer會以相同的順序接收消息,代價是性能上的損失

policyEntry的屬性參考:http://activemq.apache.org/per-destination-policies.html

當producer發送的持久化消息到達broker之后,broker首先會把它保存在持久存儲中。接下來,如果發現當前有活躍的 consumer,如果這個consumer消費消息的速度能跟上producer生產消息的速度,那么ActiveMQ會直接把消息傳遞給broker 內部跟這個consumer關聯的dispatch queue;如果當前沒有活躍的consumer或者consumer消費消息的速度跟不上producer生產消息的速度,那么ActiveMQ會使用 Pending Message Cursors保存對消息的引用。在需要的時候,Pending Message Cursors把消息引用傳遞給broker內部跟這個consumer關聯的dispatch queue。以下是兩種Pending Message Cursors:

VM Cursor:在內存中保存消息的引用。
File Cursor:首先在內存中保存消息的引用,如果內存使用量達到上限,那么會把消息引用保存到臨時文件中。
在缺省情況下,ActiveMQ 會根據使用的Message Store來決定使用何種類型的Message Cursors,但是你可以根據destination來配置Message Cursors。
對於topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有
vmDurableCursor 和 fileDurableSubscriberCursor;對於queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。
Message Cursors的使用參考:http://activemq.apache.org/message-cursors.html

二.存儲配置

設置消息在內存、磁盤中存儲的大小,配置如下:

 

<systemUsage>
  <systemUsage>
      <memoryUsage>
          <memoryUsage limit="20 mb"/>
      </memoryUsage>
      <storeUsage>
          <storeUsage limit="1 gb"/>
      </storeUsage>
      <tempUsage>
          <tempUsage limit="100 mb"/>
      </tempUsage>
  </systemUsage>
</systemUsage>

 

memoryUsage表示ActiveMQ使用的內存,這個值要大於等於destinationPolicy中設置的所有隊列的內存之和。

storeUsage表示持久化存儲文件的大小。

tempUsage表示非持久化消息存儲的臨時內存大小。

三.主備配置

ActiveMQ的主備有三種方式:純Master/Slave、文件共享方式、數據庫共享方式。
1、純Master/Slave
這種方式的主備不需要對Master Broker做特殊的配置,只要在Slave Broker中指定他的Master就可以了,指定Master有兩種方式,最簡單的配置就是在broker節點中添加 masterConnectorURI=”tcp://localhost:61616″即可,還有一種方式就是添加一個services節點,可以指定 連接的用戶名和密碼,配置如下:

 

<services>
  <masterConnector remoteURI= "tcp://localhost:61616" userName="system" password="manager"/>
</services>

 

純Master/Slave只允許一個Slave連接到Master上面,也就是說只能有2台MQ做集群,同時當Master掛了之后需要停止Slave來恢復負載。

2、數據庫共享方式
這種方式的主備采用數據庫做消息的持久化,支持多個Slave,所有broker持久化數據源配置成同一個數據源,當一個broker獲取的數據庫鎖之 后,其他的broker都成為slave並且等待獲取鎖,當master掛了之后,其中的一個slave將會立刻獲得數據庫鎖成為master,重啟之前 掛掉的master之后,這個master也就成了slave,不需要停止slave來恢復。由於采用的是數據庫做為持久化,它的性能是有限的。

3、文件共享方式
這種方式的主備具有和數據庫共享方式的負載一樣的特性,不同的是broker的持久化采用的是文件(我這里用KahaDB),slave等待獲取的鎖是文件鎖,它具有更高的性能,但是需要文件共享系統的支持。
Window下共享KahaDB持久化的目錄,配置如下:

<persistenceAdapter>
    <kahaDB directory="//172.16.1.202/mqdata/kahadb"/>
</persistenceAdapter>

Linux下需要開啟NFS服務,具體操作如下:
創建共享目錄(192.168.0.1):
1、 修改etc/exports,添加需要共享的目錄:/opt/mq/data *(rw,no_root_squash)
2、 啟動NFS服務 service nfs start/restart
3、 查看共享 showmount –e
4、 NFS服務自啟動 chkconfig –level 35 nfs on

掛載共享目錄(192.168.0.2):
1、 掛載:mount –t nfs 192.168.0.1:/opt/mq/data /opt/mq/data
2、 啟動自動掛載:在etc/fstab文件添加10.175.40.244:/opt/mq/data /opt/mq/data nfs defaults 0 0
然后指定KahaDB的持久化目錄為/opt/mq/data即可。

AIX系統的文件共享和Linux類似,也是啟動NFS服務。
注意:如果Master服務器宕機了,Slave是不會獲得文件鎖而啟動,直到Master服務器重啟。
Window下Master上有Slave連接時如圖:

客戶端連接的brokerURL為failover:(tcp://localhost:61616,tcp://localhost:61617)。用 第三部分的代碼測試,先向Master Broker發送一個消息,然后關閉master,運行獲取消息的方法,即可獲取之前發送的消息。

四.負載均衡配置

ActiveMQ可以實現多個mq之間進行路由,假設有兩個mq,分別為brokerA和brokerB,當有一條消息發送到brokerA的隊列 test中,有一個客戶端連接到brokerB上,並且要求獲取test隊列的消息時,brokerA中隊列test的消息就會路由到brokerB上, 反之brokerB的消息也會路由到brokerA。
靜態路由配置,brokerA不需要特別的配置,brokerB需要配置networkConnectors節點,具體配置如下:

 

     <networkConnectors>
            <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
        </networkConnectors>

 

靜態路由支持failover,如:static:failover://(tcp://host1:61616,tcp://host2:61616)。
動態路由配置,每個mq都需要配置如下:

<networkConnectors>
    <networkConnector uri="multicast://default" />
</networkConnectors>
 
<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />
</transportConnectors>

注意:networkConnectors需要配置在persistenceAdapter之前。
重啟ActiveMQ,可以看到brokerA的日志如圖:

 

 

 

 

networkConnector的屬性請參照:http://activemq.apache.org/networks-of-brokers.html
五.持久化配置
在broker中設置屬性persistent=”true”(默認是true),同時發送的消息也應該是persitent類型的。ActiveMQ消息持久化有三種方式:AMQ、KahaDB、JDBC。

1、AMQ

AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲在一個個文件中,文件的默認大小為32兆,如果一條消息的大小超過了32 兆,那么這個值必須設置大點。當一個存儲文件中的消息已經全部被消費,那么這個文件將被標識為可刪除,在下一個清除階段,這個文件被刪除。默認配置如下:

 <persistenceAdapter>
      <amqPersistenceAdapter directory="activemq-data" maxFileLength="32mb"/>
    </persistenceAdapter>

AMQ的屬性:

屬性名稱 默認值 描述
directory activemq-data 消息文件和日志的存儲目錄
useNIO true 使用NIO協議存儲消息
syncOnWrite false 同步寫到磁盤,這個選項對性能影響非常大
maxFileLength 32mb 一個消息文件的大小
persistentIndex true 消息索引的持久化,如果為false,那么索引保存在內存中
maxCheckpointMessageAddSize 4kb 一個事務允許的最大消息量
cleanupInterval 30000 清除操作周期,單位ms
indexBinSize 1024 索引文件緩存頁面數,缺省為1024,當amq擴充或者縮減存儲時,會鎖定整個broker,導致一定時間的阻塞,所以這個值應該調整到比較大,但是代碼中實現會動態伸縮,調整效果並不理想。
indexKeySize 96 索引key的大小,key是消息ID
indexPageSize 16kb 索引的頁大小
directoryArchive archive 存儲被歸檔的消息文件目錄
archiveDataLogs false 當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除                    

 

2、KahaDB

KahaDB是基於文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復的時間比AMQ短,從5.4版本之后KahaDB做為默認的持久化方式。默認配置如下:

<persistenceAdapter>
        <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
    </persistenceAdapter>

KahaDB的屬性:

property name default value Comments
directory activemq-data 消息文件和日志的存儲目錄
indexWriteBatchSize 1000 一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中
indexCacheSize 10000 內存中,索引的頁大小
enableIndexWriteAsync false 索引是否異步寫到消息文件中
journalMaxFileLength 32mb 一個消息文件的大小
enableJournalDiskSyncs true 是否講非事務的消息同步寫入到磁盤
cleanupInterval 30000 清除操作周期,單位ms
checkpointInterval 5000 索引寫入到消息文件的周期,單位ms
ignoreMissingJournalfiles false 忽略丟失的消息文件,false,當丟失了消息文件,啟動異常
checkForCorruptJournalFiles false 檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復
checksumJournalFiles false 產生一個checksum,以便能夠檢測journal文件是否損壞。
5.4版本之后有效的屬性:    
archiveDataLogs false 當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除
directoryArchive null 存儲被歸檔的消息文件目錄
databaseLockedWaitDelay 10000 在使用負載時,等待獲得文件鎖的延遲時間,單位ms
maxAsyncJobs 10000 同個生產者產生等待寫入的異步消息最大量
concurrentStoreAndDispatchTopics false 當寫入消息的時候,是否轉發主題消息
concurrentStoreAndDispatchQueues true 當寫入消息的時候,是否轉發隊列消息
5.6版本之后有效的屬性:    
archiveCorruptedIndex false 是否歸檔錯誤的索引

從5.6版本之后,有可能發布通過多個kahadb持久適配器來實現分布式目標隊列存儲。什么時候用呢?如果有一個快速的生產者和消費者,當某一個 時刻生產者發生了不規范的消費,那么有可能產生一條消息被存儲在兩個消息文件中,同時,有些目標隊列是危險的並且要求訪問磁盤。在這種情況下,你應該用通 配符來使用mKahaDB。如果目標隊列是分布的,事務是可以跨越多個消息文件的。

每個KahaDB的實例都可以配置單獨的適配器,如果沒有目標隊列提交給filteredKahaDB,那么意味着對所有的隊列有效。如果一個隊列沒有對應的適配器,那么將會拋出一個異常。配置如下:

<persistenceAdapter>
  <mKahaDB directory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- match all queues -->
      <filteredKahaDB queue=">">
        <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
      
      <!-- match all destinations -->
      <filteredKahaDB>
        <persistenceAdapter>
          <kahaDB enableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

如果filteredKahaDB的perDestination屬性設置為true,那么匹配的目標隊列將會得到自己對應的KahaDB實例。配置如下:

<persistenceAdapter>
  <mKahaDB directory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- kahaDB per destinations -->
      <filteredKahaDB perDestination="true" >
        <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb" />
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

3、JDBC

配置JDBC適配器:

 <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
    </persistenceAdapter>

dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認值是true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之后改成false。

MYSQL持久化bean

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

SQL Server持久化bean

 <bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close">
   <property name="serverName" value="SERVERNAME"/>
   <property name="portNumber" value="PORTNUMBER"/>
   <property name="databaseName" value="DATABASENAME"/>
   <property name="user" value="USER"/>
   <property name="password" value="PASSWORD"/>
</bean>

Oracle持久化bean

<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

DB2持久化bean

<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">
      <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
      <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
  </bean>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM