ActiveMQ簡介
1. 什么是ActiveMQ
ActiveMQ是一種開源的,實現了JMS1.1規范的,面向消息(MOM)的中間件,為應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實現代碼進行修改。
ActiveMQ的設計目標是提供標准的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。ActiveMQ實現了JMS標准並提供了很多附加的特性。這些附加的特性包括,JMX管理(java Management Extensions,即java管理擴展),主從管理(master/salve,這是集群模式的一種,主要體現在可靠性方面,當主中介(代理)出現故障,那么從代理會替代主代理的位置,不至於使消息系統癱瘓)、消息組通信(同一組的消息,僅會提交給一個客戶進行處理)、有序消息管理(確保消息能夠按照發送的次序被接受者接收)。消息優先級(優先級高的消息先被投遞和處理)、訂閱消息的延遲接收(訂閱消息在發布時,如果訂閱者沒有開啟連接,那么當訂閱者開啟連接時,消息中介將會向其提交之前的,其未處理的消息)、接收者處理過慢(可以使用動態負載平衡,將多數消息提交到處理快的接收者,這主要是對PTP消息所說)、虛擬接收者(降低與中介的連接數目)、成熟的消息持久化技術(部分消息需要持久化到數據庫或文件系統中,當中介崩潰時,信息不會丟失)、支持游標操作(可以處理大消息)、支持消息的轉換、通過使用Apache的Camel可以支持EIP、使用鏡像隊列的形式輕松的對消息隊列進行監控等。
2. ActiveMQ 特性
支持JMS規范:ActiveMQ完全實現了JMS1.1規范。
JMS規范提供了同步消息和異步消息投遞方式、有且僅有一次投遞語義(指消息的接收者對一條消息必須接收到一次,並且僅有一次)、訂閱消息持久接收等。如果僅使用JMS規范,表明無論您使用的是哪家廠商的消息代理,都不會影響到您的程序。
連接方式的多樣化:ActiveMQ提供了廣泛的連接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的連接模式表明了ActiveMQ具有較高的靈活性。
可插入式的持久化和安全:ActiveMQ提供了多種持久化方案,您可以根據實際需要進行選擇。同時,也提供了完整的客戶授權模式。
使用Java創建消息應用程序:最常見的使用ActiveMQ的方式就是使用Java程序來發送和接收消息。
與其他的Java容器緊密集成:ActiveMQ提供了和其它流行的Java容器的結合,包括Apache Geronimo、Apache Tomcat、JBoss、Jetty等。
客戶端API:ActiveMQ提供了多種客戶端可訪問的API,包括Java、C/C++,.NET,Perl、PHP、Python、Ruby等。當然,ActiveMQ中介必須運行在Java虛擬機中,但是使用它的客戶端可以使用其他的語言來實現。
中介集群:多個ActiveMQ中介可以一起協同工作,來完成某項復雜的工作,這被稱為網絡型中介(network of brokers),這種類型的中介將會支持多種拓撲類型。
3. 為什么使用ActiveMQ
在設計分布式應用程序時,應用程序間的耦合(或稱集成)方式很重要。耦合意味着兩個或者多個應用程序或系統的相互依賴關系。一種簡單的方式是在所有的應用程序中從架構上設計他們與其他應用程序間的交叉實現。這樣必然導致,一個應用程序的改變,直接導致另一個應用程序的改變。按照這種方式集成的應用是一種緊耦合的應用。一個應用的改變不會影響到其他應用的集成方式被稱為是松耦合的集成方式。簡單的說,松耦合應用程序集成能夠更容易的處理不可預見的應用變化。
像COM、CORBA、DCE和EJB等應用技術使用RPC(Remote Procedural Calls,即遠程過程調用)屬於緊耦合技術。使用RPC,一個應用程序調用另一個應用程序,調用者必須阻塞,直到被調用者執行結束返回結果信息為止。下圖給出了這種緊耦合技術的描述:
許多系統架構使用RPC,並且獲得了巨大的成功,但是,緊耦合的架構有着天生的缺陷。首先,這種架構將會造成系統維護管理上的巨大消費,因為,即使是很小的改動,很可能會波及到整個系統。其次,由於調用者必須阻塞式的等待被調用者返回,如果被調用者處理過程復雜,將會嚴重影響調用者的執行效率和資源使用率。此外,如果調用失敗,整個架構即失敗。
下圖給出一種松耦合的方式,進行架構設計:
應用程序1向消息中介(MOM)發送一條消息,很可能一段時間之后,應用程序2調用MOM來收取消息。任何一個應用程序都不知道對方是否存在也不需要阻塞等待。這種通信方式大大縮減了維護開銷,因為對於一個應用程序的修改,會對其他應用程序影響極小。
ActiveMQ就是采用了上面提到的松耦合方式,因此,我們經常說應用程序發送消息僅僅是觸發后忘卻。應用程序將消息發送給ActiveMQ而並不關心什么時間以何種方式消息投遞給接收者。同樣的,消息接收者也不會關心消息來源於哪里和消息是怎樣投遞給ActiveMQ的。對於多語言編寫的復雜應用環境中,允許客戶端使用不同的編程語言甚至不同的消息包裝協議。ActiveMQ作為消息的中間件,允許復雜的多語言應用程序以一種一步的方式集成和交互。所以說,ActiveMQ是一種好的,提供松散耦合的,能夠為多語言交叉應用提供集成的中間件。
4. 什么情況下使用ActiveMQ
正如前面提到的,緊耦合應用系統存在許多問題,但是,要將緊耦合系統重構成松耦合系統是一件值得但比較繁瑣的事情。使用松耦合的主要優勢體現在將同步改為異步。使用異步通信,應用程序將從接收者反饋的等待中解放出來,其他的任務可以得到執行,這樣提高了應用程序的效率。
只要是兩個應用程序間需要通信的情況,都可以考慮使用JMS,不論這種通信是在本地的(就是通信的兩個應用程序在同一台主機上),還是分布在不同機器上。盡管是在同一個主機上的兩個應用程序需要通信也可以使用ActiveMQ。ActiveMQ可以確保消息投遞成功並采用異步方式通信。
多個需要通信的應用程序在同一個機器上的情況下,您可以考慮在執行機上獨立運行ActiveMQ或者將ActiveMQ嵌入到Java應用服務中。無論采用哪種方式,都可以確保應用程序能夠發送和接收消息。您可以選擇訂閱模式(pub/sub)或者采用PTP(point to point)模式,這兩種模式都無需等待執行反饋信息。每一個應用程序都可以簡單的將消息發送給ActiveMQ,然后繼續做其他的工作;應用程序無需阻塞式等待消息的返回。
對於分布在多台主機上的應用程序來說,可以使用多種布置策略。主要包括單一ActiveMQ實例和多ActiveMQ實例。單一ActiveMQ實例是一個簡單解決方案。所有的應用程序都向同一個ActiveMQ中介發送和接收消息,這與上面提到的單機多服務雷同。單一的ActiveMQ可以布置到一台單獨的主機上,也可以和其中的一些服務布置在一起。重要的是,所有的應用必須能夠直接與ActiveMQ中介進行交互,所以,你必須考慮到你的網絡設計。
第二種情況比較復雜,但是有ActiveMQ來負責遠程通信,而不是應用程序自身。在這種場景下,每一個應用程序都會實例化一個ActiveMQ(無論是嵌入式的還是獨立式的),應用程序從其本地的ActiveMQ發送和接收消息。之后這些ActiveMQ實例將會以一種聯合的方式協同工作。消息將會基於每一個應用的要求在多個ActiveMQ中介間傳遞到遠程的處理者。在ActiveMQ中,這種模式被稱為netWork of brokers。采用這種模式對於處理大量的ActiveMQ消息是可行的,但是,我們往往需要減輕網絡拓撲的復雜性,這樣直接將消息投遞到遠程接收者的ActiveMQ是不可行的。在后一種情況下,不同的協議使用可以使ActiveMQ更輕松的傳遞消息。
5. ActiveMQ傳輸效率
計算機環境 |
CPU:Intel(R) Cpu G530 @ 2.40GHz 2.40 |
發送10萬條長度為25的消息耗時6~7秒鍾, cpu占用量很大。
如果持續發送不接受的話,服務器承受到30萬時容易卡住發送達到26秒之多
一次性接收所有的消息50萬,cpu占用100%占用時間50s左右,可以全部接收。
接收完成后在繼續發送10萬消息占用時間6~7s
在一次性發送50萬消息時出現問題
INFO | Usage Manager Memory Limit (524288000) reached on queue://FirstQueue. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See
http://activemq.apache.org/producer-flow-control.html for more info
在網上找原因說是配置了發送限制,修改xml 運行時間是53s 的樣子
繼續發送50萬,一分50秒
在發送50萬
在發送50萬2分多鍾的樣子
試着接收這200萬消息可以成功,但時間很長。
修改xml后在接受消息的同時發送10萬消息,
一次發送100萬耗時4分左右的樣子
接收方也可以全部接收。
模擬服務器斷電,非持久性模式時沒有被消費的消息不會繼續消費,全部丟失
程序會報一個連接關閉異常停止運行,繼續啟動服務器運行程序,不會接收任何消息。
模擬服務器斷電,持久性模式時沒有被消費的消息會繼續消費
定義了消息的持久性模式后,即使關閉了服務器,程序也會報連接關閉異常,但再次啟動服務器和程序后,接收方還能繼續原來的消息再次接收。
總結
總體看來,在配置好xml的情況下,activemq對消息傳輸上還是沒有問題的,發送的消息都可以全部接收,發送多少條就接收多少條,准確度上還是有保證的,持久模式支持斷電續傳功能。雖然功能上沒有什么問題但對cpu的占用率就比較大了,發送或接受消息的時候都達到了100%,內存到不會很大。這跟自己使用機子有關系,配置好點的機子可能情況會好些。
6. ActiveMQ配置傳輸連接
ActiveMQ提供了廣泛的連接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的連接模式表明了ActiveMQ具有較高的靈活性。
配置格式如下:
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61618?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="xmpp" uri="xmpp://0.0.0.0:61619?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> </transportConnectors> |
生產者和消費着可以使用不同的傳輸協議來傳輸信息。比如生產者用nio協議生產消息,消費者用tcp協議接收消息。
7. ActiveMQ配置網絡連接
當應用到Broker的集群時,Borker與Broker的通信就用到了網絡連接。
配置格式如下:
<networkConnectors> <!-- 動態連接方式 <networkConnector name="default-nc" uri="multicast://default" dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" /> --> <!-- 靜態連接方式 <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> --> </networkConnectors> |
8. ActiveMQ持久化存儲模式
ActiveMq主要實現了如下幾種存儲:
1.4.1. AMQ消息存儲—默認的消息存儲
它是一種基於文件存儲的消息數據庫並且不依賴第三方數據庫。配置如下
<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/> |
1.4.2. KahaDB 消息存儲—提供容量的提升和恢復能力
它是一種新的消息存儲機制,配置如下
<kahaDB directory="${activemq.data}/kahadb" /> |
1.4.3. JDBC 消息存儲—消息基於JDBC存儲
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> |
1.4.4. Memory 消息存儲—基於內容的消息存儲
ActiveMQ支持將消息保存到內存中,這種情況沒有動態的緩存存在。
這種情況的配置很簡單,只要將Broker的“prsistent” 屬性設置為“false”即可。
1. ActiveMQ負載均衡
ActiveMQ可以實現多個mq之間進行路由,假設有兩個mq,分別為brokerA和brokerB,當有一條消息發送到brokerA的隊列test中,有一個客戶端連接到brokerB上,並且要求獲取test隊列的消息時,brokerA中隊列test的消息就會路由到brokerB上,反之brokerB的消息也會路由到brokerA。分靜態和動態兩種配置方法,見《6 activemq的網絡連接》。下面給出動態配置:
<networkConnectors> <networkConnector uri="multicast://default" /> </networkConnectors> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" /> </transportConnectors> |
2. ActiveMQ主從配置
Master-Slave模式分為三類:Pure Master Slave、Shared File System Master Slave和JDBC Master Slave。以上三種方式的集群都不支持負載均衡,但可以解決單點故障的問題,以保證消息服務的可靠性。
2.1. PureMaster Slave
需要兩個Broker,一個作為Master,另一個作為Slave,運行時,Slave通過網絡實時從Master處復制數據,同時,如果Slave和Master失去連接,Slave就會自動升級為Master,繼續為客戶端提供消息服務,這種方式的Slave只能有一個。模型如圖所示:
這種方式的主備不需要對Master Broker做特殊的配置,只要在Slave Broker中指定他的Master就可以了,指定Master有兩種方式,最簡單的配置就是在broker節點中添加masterConnectorURI=”tcp://localhost:61616″即可,還有一種方式就是添加一個services節點,可以指定連接的用戶名和密碼,配置如下:
<services> <masterConnector remoteURI= "tcp://localhost:61616" userName="system" password="manager"/> </services> |
2.2. SharedFile System Master Slave
SharedFile System Master Slave就是利用共享文件系統做ActiveMQ集群,是基於ActiveMQ的默認數據庫kahaDB完成的,kahaDB的底層是文件系統。這種方式的集群,Slave的個數沒有限制,哪個ActiveMQ實例先獲取共享文件的鎖,那個實例就是Master,其它的ActiveMQ實例就是Slave,當當前的Master失效,其它的Slave就會去競爭共享文件鎖,誰競爭到了誰就是Master。這種模式的好處就是當Master失效時不用手動去配置,只要有足夠多的Slave。如果各個ActiveMQ實例需要運行在不同的機器,就需要用到分布式文件系統了。模式如圖所示:
2.3. JDBCMaster Slave
JDBCMaster Slave模式和Shared File Sysytem Master Slave模式的原理是一樣的,只是把共享文件系統換成了共享數據庫。我們只需在所有的ActiveMQ的主配置文件中添加數據源,所有的數據源都指向同一個數據庫,然后修改持久化適配器。這種方式的集群相對Shared File System Master Slave更加簡單,更加容易地進行分布式部署,但是如果數據庫失效,那么所有的ActiveMQ實例都將失效。
3. ActiveMQ攔截器使用
在ActiveMQ中使用攔截器和過濾器的使用多采用插件的形式實現,繼承BrokerFilter實現BrokerPlugin接口類。BrokerFilter實質一個實現Broker接口的類。
3.1. 日志攔截
日志攔截器是Broker的一個攔截器,默認的日志級別為INFO。你如你想改變日志的級別。這個日志攔截器支持Commons-log和Log4j兩種日志。
<plugins> <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/> </plugins> |
部分參數如下:
屬性名稱 |
默認值 |
描述 |
logAll |
false |
記錄所有事件的日志 |
logMessageEvents |
false |
記錄消息事件日志 |
logConnectionEvents |
True |
記錄連接事件日志 |
logTransactionEvents |
false |
記錄消息事務事件日志 |
logConsumerEvents |
false |
記錄消息消費者事件日志 |
logProducerEvents |
false |
記錄消息生產者事件日志 |
logInternalEvents |
false |
|
3.2. 統計攔截器
StatisticsPlugin插件被用作檢測Broker中統計的插件。注意消息必須包含replyTo的消息頭,如果是在JMS那么需要采用jmsReplyTo消息頭,否則消息將被統計忽略。ReplyTo消息頭包含了你想檢查統計的消息。統計消息是一個MapMessage.
檢查Broker的信息,僅僅需要一個名稱為ActiveMQ.Statistics.Broker並且有一個replyTo的消息頭的Destination。為了檢測所有destination,你需要一個名稱為ActiveMQ.Statistics.Destination.<destination-name>或者ActiveMQ.Statistics.Destination.<wildcard-expression>並且有一個replyTo的消息頭。如果許多Destination匹配相關的模糊匹配表達式,那么統計的消息將被發送到replyTo的Destination.
<plugins> <statisticsBrokerPlugin/> </plugins>
|
4. ActiveMQ安全配置
ActiveMQ也可以對各個主題和隊列設置用戶名和密碼,配置如下:
<plugins> <!-- Configure authentication; Username, passwords and groups --> <simpleAuthenticationPlugin> <users> <authenticationUser username="system" password="manager" groups="users,admins"/> <authenticationUser username="user" password="password" groups="users"/> <authenticationUser username="guest" password="password" groups="guests"/> <authenticationUser username="testUser" password="123456" groups="testGroup"/> </users> </simpleAuthenticationPlugin> <!-- Lets configure a destination based authorization mechanism --> <authorizationPlugin> <map> <authorizationMap> <authorizationEntries> <authorizationEntry queue="queue.group.uum" read="users" write="users" admin="users" /> <authorizationEntry queue=">" read="admins" write="admins" admin="admins" /> <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" /> <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> <authorizationEntry queue="TEST.Q" read="guests" write="guests" /> <authorizationEntry queue="test" read=" testGroup " write=" testGroup " /> <authorizationEntry topic=">" read="admins" write="admins" admin="admins" /> <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" /> <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users ,testGroup" write="guests,users ,testGroup " admin="guests,users ,testGroup "/> </authorizationEntries> </authorizationMap> </map> </authorizationPlugin> </plugins> |
5. ActiveMQ中NetWorkConnctor屬性
<networkConnector name="bridge" uri="static://(tcp://host1:61616)" duplex="true" conduitSubscriptions="true" decreaseNetworkConsumerPriority="false" > </networkConnector> |
屬性名稱 |
默認值 |
描述 |
Duplex |
True |
表示雙向可以通信 |
ConduitSubscriptions |
False |
表示每個Consumer 上都會收到所有的發送的消息 |
DynamicOnly |
false |
消息將被動態的轉接的在其他Broker的consumer上 |
PrefetchSize |
1000 |
指定消息的數量 |
ConduitSubscriptons |
true |
|
excludedDestinations |
|
指定排除的地址 |
DynamiccallyincludedDestinations |
|
包括的地址 |
StaticcallyincludedDestinations |
|
靜態的包括消息地址 |
DecreaseNetwordConsumerPriority |
false |
消費者優先權 |
NetworkTTl |
1 |
|
6. ActiveMQ消息游標
當producer 發送的持久化消息到達broker 之后,broker 首先會把它保存在持
久存儲中。接下來,如果發現當前有活躍的 consumer,而且這個consumer 消費消息的速度能跟上producer 生產消息的速度,那么ActiveMQ 會直接把消息傳遞給 broker 內部跟這個consumer關聯的 dispatch(派遣、調度) queue;如果當前沒有活躍的 consumer或者 consumer 消費消息的速度跟不上producer 生產消息的速度,那么 ActiveMQ 會使用Pending Message Cursors 保存對消息的引用。在需要的時候,Pending Message Cursors 把消息引用傳遞給broker 內部跟這個consumer關聯的 dispatch queue。以下是兩種Pending (未解決)Message Cursors:
VM Cursor。在內存中保存消息的引用。
File Cursor。首先在內存中保存消息的引用,如果內存使用量達到上限, 那么會把消息引用保存到臨時文件中。
對於 topic,可以使用的pendingSubscriberPolicy 有vmCursor 和 fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy 有 vmDurableCursor 和 fileDurableSubscriberCursor。以下是ActiveMQ 配置文件 的一個例子:
Xml 代碼
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="org.apache.>"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> <PendingDurableSubscriberMessageStoragePolicy>
<vmDurableCursor/> </PendingDurableSubscriberMessageStoragePolicy>
</policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
對於 queue,可以使用的pendingQueuePolicy 有vmQueueCursor 和 fileQueueCursor。以下是ActiveMQ 配置文件的一個例子:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue="org.apache.>"> <pendingQueuePolicy> <vmQueueCursor /> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
7. ActiveMQ嚴格調度策略
有時候需要保證不同的 topic consumer 以相同的順序接收消息。通常
ActiveMQ 會保證 topic consumer 以相同的順序接收來自同一個producer 的消息。
然而,由於多線程和異步處理,不同的 topic consumer可能會以不同的順序接收
來自不同producer 的消息。例如有兩個producer,分別是P 和Q。差不多是同一
時間內,P 發送了P1、P2 和P3 三個消息;Q 發送了Q1 和Q2 兩個消息。兩個不同
的consumer可能會以以下順序接收到消息:
consumer1: P1 P2 Q1 P3 Q2
consumer2: P1 Q1 Q2 P2 P3
Strict order dispatch policy 會保證每個 topic consumer會以相同的
順序接收消息,代價是性能上的損失。以下是采用了strict order dispatch policy
后,兩個不同的 consumer可能以以下的順序接收消息:
consumer1: P1 P2 Q1 P3 Q2
consumer2: P1 P2 Q1 P3 Q2
以下是ActiveMQ 配置文件的一個例子:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>"> <dispatchPolicy> <strictOrderDispatchPolicy /> </dispatchPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
8. ActiveMQ輪轉調度策略
介紹過 ActiveMQ 的prefetch(預讀取)機制,ActiveMQ 的缺省參數是針對處理大量消息時的高性能和高吞吐量而設置的。所以缺省的 prefetch參數比較大,而且缺省
的dispatch policies 會嘗試盡可能快的填滿 prefetch緩沖。然而在有些情況下,
例如只有少量的消息而且單個消息的處理時間比較長,那么在缺省的 prefetch和
dispatch policies下,這些少量的消息總是傾向於被分發到個別的consumer 上。
這樣就會因為負載的不均衡分配而導致處理時間的增加。
Round robin dispatch policy 會嘗試平均分發消息,以下是 ActiveMQ配
置文件的一個例子:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>"> <dispatchPolicy> <roundRobinDispatchPolicy /> </dispatchPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
9. ActiveMQ Async Sends
Acivemq 支持異步和同步發送消息。在 ActiveMQ4.0 以上,所有的異步或同步對
於 Consumer 來說是變得可配置了。默認是在 ConnectionFactory、Connection、
Connection URI等方面配置對於一個基於 Destination 的Consumer來說。
眾所周之,如果你想傳遞給 Slow Consumer 那么你可能使用異步的消息傳遞,但是對於 Fast Consumer 你可能使用同步發送消息。(這樣可以避免同步和上下文切換額外的增加Queue 堵塞花費。如果對於一個 Slow Consumer,你使用同步發送消息可能出現Producer 堵塞等顯現。
ActiveMQ默認設置 dispatcheAsync=true是最好的性能設置。如果你處理的是
Slow Consumer 則使用 dispatcheAsync=true,反之,那你使用的是 Fast Consumer則使用dispatcheAsync=false。
用Connection URI 來配置Async如下:
ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true"); |
用ConnectionFactory 配置Async如下:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true); |
用Connection 配置Async 如下:
((ActiveMQConnection)connection).setUseAsyncSend(true); |
10. ActiveMQ缺省支持批量確認消息
ActiveMQ缺省支持批量確認消息。由於批量確認會提高性能,因此這是缺省的確認方式。如果希望在應用程序中禁止經過優化的確認方式,那么可以采用如下方法:
cf = new ActiveMQConnectionFactory ("tcp://locahost:61616?jms.optimizeAcknowledge=false"); ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(false); ((ActiveMQConnection)connection).setOptimizeAcknowledge(false); |
11. ActiveMQ消息類型
(一)Blob Message
(二)Advisory Message
(三)ActiveMQ Stream
(四)Transformer message
(五)TextMessage
(六)MapMessage
(七)BytesMessage
(八)StreamMessage
(九)ObjectMessage
(十)Message
12. ActiveMQ destination
12.1. ActiveMQ的混合發送模式
允許一個虛擬的destination代表多個destinations,多個destination之間用“,”分割。
Java代碼:
Queue queue = new ActiveMQQueue("USERS.First,USERS.Sconder"); |
如果需要不同類型的destination,需要加上前綴queue:// 或topic://
Queue queue = new ActiveMQQueue("USERS.First,USERS.Sconder,topic://USERS.topic1"); |
配置如下:
<amq:destinationInterceptors> <amq:virtualDestinationInterceptor> <amq:virtualDestinations> <amq:compositeQueue name="MY.QUEUE.A"> <amq:forwardTo> <amq:queue physicalName="MY.QUEUE.B"></amq:queue> <amq:topic physicalName="MY.TOPIC.A"></amq:topic> </amq:forwardTo> </amq:compositeQueue> <amq:virtualTopic/> </amq:virtualDestinations> </amq:virtualDestinationInterceptor> </amq:destinationInterceptors> |
12.2. ActiveMQ的接收Mirrored模式
每個 queue 中的消息只能被一個consumer 消費。然而,有時候你可能希望能
夠監視生產者和消費者之間的消息流。你可以通過使用Virtual Destinations 來
建立一個virtual queue 來把消息轉發到多個 queues 中。但是 為系統中每個
queue 都進行如此的配置可能會很麻煩。
ActiveMQ 支持Mirrored Queues。Broker 會把發送到某個 queue 的所有消
息轉發到一個名稱類似的topic,因此監控程序可以訂閱這個mirrored queue
topic。為了啟用Mirrored Queues,首先要將BrokerService 的useMirroredQueues
屬性設置成 true,然后可以通過destinationInterceptors 設置其它屬性,如
mirror topic 的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ 配置文
件的一個例子:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useMirroredQueues="true"> <amq:destinationInterceptors> <amq:mirroredQueue copyMessage="true" postfix="Mirror.Topic"> </amq:mirroredQueue> </amq:destinationInterceptors> |
12.3. ActiveMQ的接收Wildcards模式
Wildcards 用來支持聯合的名字分層體系(federated name hierarchies)。它不是JMS 規范的一部分,而是ActiveMQ 的擴展。ActiveMQ 支持以下三種wildcards:
"." 用於作為路徑上名字間的分隔符。
"*" 用於匹配路徑上的任何名字。
">" 用於遞歸地匹配任何以這個名字開始的 destination。
12.4. ActiveMQ虛擬主題
13. ActiveMQ 消費者特性
13.1. 獨有消費者或者獨有隊列
Queue 中的消息是按照順序被分發到 consumers 的。然而,當你有多個consumers同時從相同的 queue中提取消息時,你將失去這個保證。因為這些消息是被多個線程並發的處理。有的時候,保證消息按照順序處理是很重要的。Broker會從多個 consumers中挑選一個 consumer來處理 queue中所有的消息,從而保證了消息的有序處理。如果這個 consumer失效,那么 broker會自動切換到其它的 consumer。 可以通過 Destination Options 來創建一個 Exclusive Consumer:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue); |
如果存在 Exclusive Consumer 和普通的Consumer,那么 Broker會首先把消息發送給Exclusive Consumer。除非該獨有消費者死亡。
13.2. Message Group
Message Groups 可以看成是一種並發的 Exclusive Consumer。跟所有的消息都由唯一的 consumer 處理不同,JMS 消息屬性JMSXGroupID 被用來區分 message group。Message Groups 特性保證所有具有相同JMSXGroupID 的消息會被分發到相同的 consumer(只要這個consumer 保持active)。
在一個消息被分發到 consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么 broker 會檢查是否有某個 consumer擁有這個 message group。如果沒有,那么 broker會選擇一個consumer,並將它關聯到這個 message group。此后,這個 consumer 會接收這個message group 的所有消息,直到:
Consumer 被關閉;
Message group 被關閉。通過發送一個消息,並設置這個消息的JMSXGroupSeq 為-1。
開啟Message Group:
TextMessage message = session.createTextMessage("ActiveMq 發送的消息"); message.setStringProperty("JMSXFroupID", "TEST_GROUP_A"); |
關閉Message Group:
TextMessage message = session.createTextMessage("ActiveMq 發送的消息"); message.setStringProperty("JMSXFroupID", "TEST_GROUP_A"); message.setIntProperty("JMSXGroupSeq", -1); |
13.3. Message Slelectors
JMS Selectors 用於在訂閱中,基於消息屬性和 Xpath 語法對進行消息的過濾。
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500"); |
在JMS Selectors 表達式中,可以使用 IN、NOT IN、LIKE 等,例如:
LIKE '12%3' ('123' true,'12993' true,'1234' false)
LIKE 'l_se' ('lose' true,'loose' false)
LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
需要注意的是,JMS Selectors表達式中的日期和時間需要使用標准的long型毫秒值。另外表達式中的屬性不會自動進行類型轉換,例如:
myMessage.setStringProperty("NumberOfOrders", "2");
"NumberOfOrders > 1" 求值結果是false。
13.4. 消息的重新傳遞和死信隊列
ActiveMQ需要重新傳遞消息需要 Client 有以下幾種操作:
1. Client 用了transactions 和調用了rollback()在session 中。
2. Client 用了transactiions 和在調用commit()之前關閉。
3. Client 在 CLIENT_ACKNOWLEDGE 的傳遞模式下在 session 中調用了
recover()。
只有最后一個事物提交后,消息才能發送到 broker 上,事物沒有提交前,整
個傳遞消息仍處於事物中。一旦回滾,恢復以前情況。在 broker 端不知道消息是
否處於重新傳遞狀態,這將會造成消息分發開銷。
默認,aciaveMQ 中死隊列被聲明為“ActivemMQ.DLQ”,所有不能消費的消
息被傳遞到該死隊列中。你可以在 acivemq.xml中配置individualDeadLetterStrategy屬性
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue= "> " > <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix= "DLQ." useQueueForQueueMessages= "true" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
有時需要直接刪除過期的消息而不需要發送到死隊列中,xml 可以使用屬性
processExpired=false 來設置
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue= "> " > <deadLetterStrategy> <sharedDeadLetterStrategy processExpired= "false" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
存放非持久消息到死隊列中
默認情況下,Activemq 不會把非持久的死消息發送到死隊列中。
非持久性如果你想把非持久的消息發送到死隊列中,需要設置屬性
processNonPersistent=“true”
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue= "> " > <deadLetterStrategy> <sharedDeadLetterStrategy processNonPersistent= "true" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
13.5. 消息優先級值
JMS JMSPriority 定義了十個消息優先級值, 0 是最低的優先級, 9 是最高的優先級。另外,客戶端應當將0‐4 看作普通優先級,5‐9 看作加急優先級.
配置如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); consumer = session.createConsumer(queue); |
13.6. 慢消息處理機制
目前ActiveMQ 使用 Pending Message Limit Strategy來解決慢消息帶來的性能問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限后,新消息到來時會丟棄舊消息。通過在配置文件的 destination map 中配置PendingMessageLimitStrategy,可以為不用的 topic namespace 配置不同的策略。
A:Pending Message Limit Strategy(等待消息限制策略)目前有以下兩種:
- ConstantPendingMessageLimitStrategy
Limit 可以設置 0、>0、-1三種方式:
0表示:不額外的增加其預存大小。
>0表示:在額外的增加其預存大小。
-1表示:不增加預存也不丟棄舊的消息。
這個策略使用常量限制:
<constantPendingMessageLimitStrategy limit="50"/> |
- PrefetchRatePendingMessageLimitStrategy
這種策略是利用Consumer 的之前的預存的大小乘以其倍數等於現在的預存大小。
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/> |
在以上兩種方式中,如果設置 0 意味着除了 prefetch 之外不再緩存消息;如果設置-1
意味着禁止丟棄消息。
此外,你還可以配置消息的丟棄策略,目前有以下兩種:
oldestMessageEvictionStrategy。這個策略丟棄最舊的消息。
oldestMessageWithLowestPriorityEvictionStrategy。這個策略丟棄最舊的,
而且具有最低優先級的消息。
以下是個ActiveMQ配置文件的例子:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="PRICES.>"> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="10000" /> </subscriptionRecoveryPolicy> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
13.7. 消費者追溯消息
ActiveMQ支持6種恢復策略,可以自行選擇使用不同的策略
(一) <fixedCountSubscriptionRecoveryPolicy>
這種策略限制在基於一個靜態的計數中對於主題(Topic)消息緩存的數量。
(二) <fixedSizedSubscriptionRecoveryPolicy>
這種策略限制在內存使用量中對於主題(Topic)消息緩存的數量。這是
ActiveMQ 的默認持久恢復策略。你可以選擇設置 cache的大小來應用與所
有的主題[Topic]。
(三) <lastImageSubscriptionRecoveryPolicy>
這種策略僅僅保持發送到主題(Topic)的最后一個消息。
(四) <noSubscriptionRecoveryPolicy>
這種策略是不保存主題消息,不需要任何配置
(五) <queryBasedSubscriptionRecoveryPolicy>
這種策略基於一個 JMS屬性選擇器應用到所有的消息來設置其消息緩存的
大小
(六) <timedSubscriptionRecoveryPolicy>
這種策略是基於應用到每個消息的過期時間來限制其消息緩存數量。提示
這種消息的生命周期時間來源於消息發送者設置其 timeToLive 參數