1 JMS
在介紹ActiveMQ之前,首先簡要介紹一下JMS規范。
1.1 JMS的基本構件
1.1.1 連接工廠
連接工廠是客戶用來創建連接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory。
1.1.2 連接
JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連接。
1.1.3 會話
JMS Session是生產和消費消息的一個單線程上下文。會話用於創建消息生產者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組發送和接收被組合到了一個原子操作中。
1.1.4 目的地
目的地是客戶用來指定它生產的消息的目標和它消費的消息的來源的對象。JMS1.0.2規范中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發布/訂閱消息傳遞域。 點對點消息傳遞域的特點如下:
• 每個消息只能有一個消費者。
• 消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處於運行狀態,它都可以提取消息。
發布/訂閱消息傳遞域的特點如下:
• 每個消息可以有多個消費者。
• 生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱之后發布的消息。JMS規范允許客戶創建持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處於激活狀態時發送的消息。
在點對點消息傳遞域中,目的地被成為隊列(queue);在發布/訂閱消息傳遞域中,目的地被成為主題(topic)。
1.1.5 消息生產者
消息生產者是由會話創建的一個對象,用於把消息發送到一個目的地。
1.1.6 消息消費者
消息消費者是由會話創建的一個對象,它用於接收發送到目的地的消息。消息的消費可以采用以下兩種方法之一:
• 同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。
• 異步消費。客戶可以為消費者注冊一個消息監聽器,以定義在消息到達時所采取的動作。
1.1.7 消息
JMS消息由以下三部分組成的:
• 消息頭。每個消息頭字段都有相應的getter和setter方法。
• 消息屬性。如果需要除消息頭字段以外的值,那么可以使用消息屬性。
• 消息體。JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。
1.2 JMS的可靠性機制
1.2.1 確認 JMS消息
只有在被確認之后,才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息何時被確認取決於創建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:
• Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。
• Session.CLIENT_ACKNOWLEDGE。客戶通過消息的acknowledge方法確認消息。需要注意的是,在這種模式中,確認是在會話層上進行:確認一個被消費的消息將自動確認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然后確認第5個消息,那么所有10個消息都被確認。
• Session.DUPS_ACKNOWLEDGE。該選擇只是會話遲鈍的確認消息的提交。如果JMS Provider失敗,那么可能會導致一些重復的消息。如果是重復的消息,那么JMS Provider必須把消息頭的JMSRedelivered字段設置為true。
1.2.2 持久性
JMS 支持以下兩種消息提交模式:
• PERSISTENT。指示JMS Provider持久保存消息,以保證消息不會因為JMS Provider的失敗而丟失。
• NON_PERSISTENT。不要求JMS Provider持久保存消息。
1.2.3 優先級
可以使用消息優先級來指示JMS Provider首先提交緊急的消息。優先級分10個級別,從0(最低)到9(最高)。如果不指定優先級,默認級別是4。需要注意的是,JMS Provider並不一定保證按照優先級的順序提交消息。
1.2.4 消息過期
可以設置消息在一定時間后過期,默認是永不過期。
1.2.5 臨時目的地
可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來創建臨時目的地。它們的存在時間只限於創建它們的連接所保持的時間。只有創建該臨時目的地的連接上的消息消費者才能夠從臨時目的地中提取消息。
1.2.6 持久訂閱
首先消息生產者必須使用PERSISTENT提交消息。客戶可以通過會話上的createDurableSubscriber方法來創建一個持久訂閱,該方法的第一個參數必須是一個topic,第二個參數是訂閱的名稱。 JMS Provider會存儲發布到持久訂閱對應的topic上的消息。如果最初創建持久訂閱的客戶或者任何其它客戶使用相同的連接工廠和連接的客戶ID、相同的主題和相同的訂閱名再次調用會話上的createDurableSubscriber方法,那么該持久訂閱就會被激活。JMS Provider會象客戶發送客戶處於非激活狀態時所發布的消息。 持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在創建之后會一直保留,直到應用程序調用會話上的unsubscribe方法。
1.2.7 本地事務
在一個JMS客戶端,可以使用本地事務來組合消息的發送和接收。JMS Session接口提供了commit和rollback方法。事務提交意味着生產的所有消息被發送,消費的所有消息被確認;事務回滾意味着生產的所有消息被銷毀,消費的所有消息被恢復並重新提交,除非它們已經過期。 事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被調用,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。 需要注意的是,如果使用請求/回復機制,即發送一個消息,同時希望在同一個事務中等待接收該消息的回復,那么程序將被掛起,因為知道事務提交,發送操作才會真正執行。 需要注意的還有一個,消息的生產和消費不能包含在同一個事務中。
1.3 JMS 規范的變遷
JMS的最新版本的是1.1。它和同1.0.2版本之間最大的差別是,JMS1.1通過統一的消息傳遞域簡化了消息傳遞。這不僅簡化了JMS API,也有利於開發人員靈活選擇消息傳遞域,同時也有助於程序的重用和維護。 以下是不同消息傳遞域的相應接口:
JMS公共 點對點域 發布/訂閱域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber
2.ActiveMQ
2.1 Broker
2.1.1 運行Broker
ActiveMQ 5.0 的二進制發布包中bin目錄中包含一個名為activemq的腳本,直接運行這個腳本就可以啟動一個broker。 此外也可以通過Broker Configuration URI或Broker XBean URI對broker進行配置,以下是一些命令行參數的例子:
Example |
Description |
activemq |
Runs a broker using the default 'xbean:activemq.xml' as the broker configuration file. |
activemq xbean:myconfig.xml |
Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath. |
activemq xbean:file:./conf/broker1.xml |
Runs a broker using the file broker1.xml as the broker configuration file that is located in the relative file path ./conf/broker1.xml |
activemq xbean:file:C:/ActiveMQ/conf/broker2.xml |
Runs a broker using the file broker2.xml as the broker configuration file that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml |
activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true |
Runs a broker with two transport connectors and JMX enabled. |
activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false |
Runs a broker with 1 transport connector and 1 network connector with persistence disabled. |
2.1.2 嵌入式Broker
可以通過在應用程序中以編碼的方式啟動broker,例如:
Java代碼
1. BrokerService broker = new BrokerService();
2. broker.addConnector("tcp://localhost:61616");
3. broker.start();
如果需要啟動多個broker,那么需要為broker設置一個名字。例如:
Java代碼
1. BrokerService broker = new BrokerService();
2. broker.setName("fred");
3. broker.addConnector("tcp://localhost:61616");
4. broker.start();
如果希望在同一個JVM內訪問這個broker,那么可以使用VM Transport,URI是:vm://brokerName。關於更多的broker屬性,可以參考Apache的官方文檔。 此外,也可以通過BrokerFactory來創建broker,例如:
Java代碼
1. BrokerService broker = BrokerFactory.createBroker(new URI(someURI));
someURI的可選值如下:
URI scheme |
Example |
Description |
xbean: |
xbean:activemq.xml |
Searches the classpath for an XML document with the given URI (activemq.xml in this case) which will then be used as the Xml Configuration |
file: |
file:foo/bar/activemq.xml |
Loads the given file (in this example foo/bar/activemq.xml) as the Xml Configuration |
broker: |
broker:tcp://localhost:61616 |
Uses the Broker Configuration URI to configure the broker |
當使用XBean的配置方式的時候,需要指定一個xml配置文件,例如:
Java代碼
1. BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/test/activemq.xml"));
使用Spring的配置方式如下:
Xml代碼
1. <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
2. <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
3. <property name="start" value="true" />
4. </bean>
2.1.3 監控Broker
2.1.3.1 JMX
在使用JMX監控broker之前,首先要啟用broker的JMX監控功能,例如在配置文件中設置useJmx="true",如下:
Xml代碼
1. <broker useJmx="true" brokerName="broker1”>
2. <managementContext>
3. <managementContext createConnector="true"/>
4. </managementContext>
5. ...
6. </broker>
接下來運行JDK自帶的jconsole。在運行了jconsole后,它會彈出對話框來選擇需要連接到的agent。如果是在啟動broker的主機上運行jconsole,那么ActiveMQ broker會出現在jconsole的Local 標簽中。如果要連接到遠程的broker,那么可以在Advanced標簽中指定JMX URL,以下是一個連接到本機的JMX URL: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
在jconsole的MBeans標簽中,可以查看詳細信息,也可以執行相應的operation。需要注意的是,在jconsole連接到broker的時候,並不需要輸入用戶名和密碼,如果這存在潛在的安全問題,那么就需要為JMX Connector配置密碼保護(需要使用1.5以上版本的JDK)。
首先要禁止ActiveMQ創建自己的connector,例如:
Xml代碼
1. <broker xmlns="http://activemq.org/config/1.0" brokerName="localhost"useJmx="true">
2. <managementContext>
3. <managementContext createConnector="false"/>
4. </managementContext>
5. </broker>
然后在ActiveMQ的conf目錄下創建一個訪問控制文件和密碼文件,如下:
conf/jmx.access:
# The "monitorRole" role has readonly access.
# The "controlRole" role has readwrite access.
monitorRole readonly
controlRole readwrite
conf/jmx.password:
# The "monitorRole" role has password "abc123".
# The "controlRole" role has password "abcd1234".
monitorRole abc123
controlRole abcd1234
然后修改ActiveMQ的bin目錄下activemq的啟動腳本,查找包含"SUNJMX="的一行如下:
REM set SUNJMX=-Dcom.sun.management.jmxremote.port=1616
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
把它替換成
set SUNJMX=-Dcom.sun.management.jmxremote.port=1616
-Dcom.sun.management.jmxremote.authenticate=true
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password
-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access
最后重啟ActiveMQ和jconsole,這時候需要強制login。如果在啟動activemq的過程中出現以下錯誤,那么需要為這個文件增加訪問控制。Windows平台上的具體解決方法請參考如下網址:
http://java.sun.com/j2se/1.5.0/docs/guide/management/security-windows.html
Error: Password file read access must be restricted:
D:\apache-activemq-5.0.0\bin\../conf/jmx.password
2.1.3.2 Web Console
Web Console被集成到了ActiveMQ的二進制發布包中,因此缺省訪問http://localhost:8161/admin即可訪問Web Console。 在配置文件中,可以通過修改nioConnector的port屬性來修改Web console的缺省端口:
Xml代碼
1. <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
2. <connectors>
3. <nioConnector port="8161" />
4. </connectors>
5. ...
6. </jetty>
出於安全性或者可靠性的考慮,Web Console 可以被部署到不同於ActiveMQ的進程中。例如把activemq-web-console.war部署到一個單獨的web容器中(Tomcat,Jetty等)。在ActiveMQ5.0的二進制發布包中不包含activemq-web-console.war,因此需要下載ActiveMQ的源碼,然后進入到${activemq.base}/src/activemq-web-console目錄中執行mvn instanll。如果一切正常,那么缺省會在${activemq.base}/src/activemq-web-console/target目錄中生成activemq-web-console-5.0.0.war。然后將activemq-web-console-5.0.0.war拷貝到Tomcat的webapps目錄中,並重命名成activemq-web-console.war。
需要注意的是,要將activemq-all-5.0.0.jar拷貝到WEB-INF\lib目錄中(可能還需要拷貝jms.jar)。還要為Tomcat設置以下五個系統屬性(修改catalina.bat文件):
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.type="properties"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jms.url="tcp://localhost:61616"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.role=""
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.password=""
如果JMX沒有配置密碼保護,那么webconsole.jmx.role和webconsole.jmx.password設置成""即可。如果broker被配置成了Master/Slave模式,那么可以配置成使用failover transport,例如:
-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)
順便說一下,由於webconsole.type 屬性是properties,因此實際上起作用的Web Console的配置文件是WEB-INF/ webconsole-properties.xml。最后啟動被監控的ActiveMQ,訪問http://localhost:8080/activemq-web-console/,查看顯示是否正常。
2.1.3.3 Advisory Message
ActiveMQ 支持Advisory Messages,它允許你通過標准的JMS 消息來監控系統。目前的Advisory Messages支持:
• consumers, producers and connections starting and stopping
• temporary destinations being created and destroyed
• messages expiring on topics and queues
• brokers sending messages to destinations with no consumers.
• connections starting and stopping
Advisory Messages可以被想象成某種的管理通道,通過它你可以得到關於JMS Provider、producers、consumers和destinations的信息。Advisory topics都使用ActiveMQ.Advisory.這個前綴,以下是目前支持的topics:
Client based advisories
Advisory Topics |
Description |
ActiveMQ.Advisory.Connection |
Connection start & stop messages |
ActiveMQ.Advisory.Producer.Queue |
Producer start & stop messages on a Queue |
ActiveMQ.Advisory.Producer.Topic |
Producer start & stop messages on a Topic |
ActiveMQ.Advisory.Consumer.Queue |
Consumer start & stop messages on a Queue |
ActiveMQ.Advisory.Consumer.Topic |
Consumer start & stop messages on a Topic |
在消費者啟動/停止的Advisory Messages的消息頭中有個consumerCount屬性,他用來指明目前desination上活躍的consumer的數量。
Destination and Message based advisories
Advisory Topics |
Description |
ActiveMQ.Advisory.Queue |
Queue create & destroy |
ActiveMQ.Advisory.Topic |
Topic create & destroy |
ActiveMQ.Advisory.TempQueue |
Temporary Queue create & destroy |
ActiveMQ.Advisory.TempTopic |
Temporary Topic create & destroy |
ActiveMQ.Advisory.Expired.Queue |
Expired messages on a Queue |
ActiveMQ.Advisory.Expired.Topic |
Expired messages on a Topic |
ActiveMQ.Advisory.NoConsumer.Queue |
No consumer is available to process messages being sent on a Queue |
ActiveMQ.Advisory.NoConsumer.Topic |
No consumer is available to process messages being sent on a Topic |
以上的這些destnations都可以用來作為前綴,在其后面追加其它的重要信息,例如topic、queue、clientID、producderID和consumerID等。這令你可以利用Wildcards 和 Selectors 來過濾Advisory Messages(關於Wildcard和Selector會在稍后介紹)。
例如,如果你希望訂閱FOO.BAR這個queue上Consumer的start/stop的消息,那么可以訂閱ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望訂閱所有queue上的start/stop消息,那么可以訂閱ActiveMQ.Advisory.Consumer.Queue.>;如果希望訂閱所有queue或者topic上的start/stop消息,那么可以訂閱ActiveMQ.Advisory.Consumer. >。
org.apache.activemq.advisory.AdvisorySupport類上有如下的helper methods,用來在程序中得到advisory destination objects。
Java代碼
1. AdvisorySupport.getConsumerAdvisoryTopic()
2. AdvisorySupport.getProducerAdvisoryTopic()
3. AdvisorySupport.getDestinationAdvisoryTopic()
4. AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
5. AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
6. AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
7. AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
以下是段使用Advisory Messages的程序代碼:
Java代碼
1. Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
2. MessageConsumer consumer = session.createConsumer(advisoryDestination);
3. consumer.setMessageListener(this);
4. ...
5. public void onMessage(Message msg){
6. if (msg instanceof ActiveMQMessage){
7. try {
8. ActiveMQMessage aMsg = (ActiveMQMessage)msg;
9. ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
10. } catch (JMSException e) {
11. log.error("Failed to process message: " + msg);
12. }
13. }
14.}
2.1.3.4 Command Agent
在介紹Command Agent前首先簡要介紹一下XMPP(Jabber)協議,XMPP是一種基於XML的即時通信協議,它由Jabber軟件基金會開發。在配置文件中通過增加transportConnector來支持XMPP協議:
Xml代碼
1. <broker xmlns="http://activemq.org/config/1.0">
2. <transportConnectors>
3. ...
4. <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
5. </transportConnectors>
6. </broker>
ActiveMQ提供了ActiveMQ messages和XMPP之間的雙向橋接:
• 如果客戶加入了一個聊天室,那么這個聊天室的名字會被映射到一個JMS topic。
• 嘗試在聊天室內發送消息會導致一個JMS消息被發送到這個topic。
• 呆在一個聊天室中意味着這將保持一個對相應JMS topic的訂閱。因此發送到這個topic的JMS消息也會被發送到聊天室。
推薦XMPP客戶端Spark(http://www.igniterealtime.org/)。
從4.2版本起,ActiveMQ支持Command Agent。在配置文件中,通過設置commandAgent來啟用Command Agent:
Xml代碼
1. <beans>
2. <broker useJmx="true" xmlns="http://activemq.org/config/1.0">
3. ...
4. </broker>
5. <commandAgent xmlns="http://activemq.org/config/1.0"/>
6. </beans>
啟用了Command Agent的broker上會有一個來自Command Agent的連接,它同時訂閱topic: ActiveMQ.Agent。在你啟動XMPP客戶端,加入到ActiveMQ.Agent聊天室后,就可以同broker進行交談了。通過在XMPP客戶端中鍵入help,可以得到幫助信息。 需要注意的是,ActiveMQ5.0版本有個小bug,如果broker沒有采用缺省的用戶名和密碼,那么Command Agent便無法正常啟動。Apache官方文檔說,此bug已經被修正,預定在5.2.0版本上體現。修改方式如下:
Xml代碼
1. <commandAgent xmlns="http://activemq.org/config/1.0" brokerUser="user" brokerPassword="passward"/>
2.1.3.5 Visualization Plugin
ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer來查看),以圖表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:
Xml代碼
1. <broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" useJmx="true">
2. ...
3. <plugins>
4. <connectionDotFilePlugin file="connection.dot"/>
5. <destinationDotFilePlugin file="destination.dot"/>
6. </plugins>
7. </broker>
需要注意的是,筆者認為ActiveMQ5.0版本的Visualization Plugin尚不穩定,存在諸多問題。例如:如果使用connectionDotFilePlugin,那么brokerName必須是localhost;如果使用destinationDotFilePlugin可能會導致ArrayStoreException。
2.2 Transport
ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下簡單介紹其中的幾種,更多請參考Apache官方文檔。
2.2.1 VM Transport
VM transport允許在VM內部通信,從而避免了網絡傳輸的開銷。這時候采用的連接不是socket連接,而是直接地方法調用。 第一個創建VM 連接的客戶會啟動一個embed VM broker,接下來所有使用相同的broker名稱的VM連接都會使用這個broker。當這個broker上所有的連接都關閉的時候,這個broker也會自動關閉。 以下是配置語法:
vm://brokerName?transportOptions
例如:vm://broker1?marshal=false&broker.persistent=false
Transport Options的可選值如下:
Option Name |
Default Value |
Description |
Marshal |
false |
If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
|
All the properties with this prefix are used to configure the wireFormat |
create |
true |
If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1 |
broker.* |
|
All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information |
以下是高級配置語法:
vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions
vm:broker:(tcp://localhost)?brokerOptions
例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
Transport Options的可選值如下:
Option Name |
Default Value |
Description |
marshal |
false |
If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
|
All the propertieswith this prefix are used to configure the wireFormat |
使用配置文件的配置語法: vm://localhost?brokerConfig=xbean:activemq.xml
例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml
使用Spring的配置:
Xml代碼
1. <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
2. <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
3. <property name="start" value="true" />
4. </bean>
5.
6. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
7. <property name="brokerURL" value="vm://localhost"/>
8. </bean>
如果persistent是true,那么ActiveMQ會在當前目錄下創建一個缺省值是activemq-data的目錄用於持久化保存數據。需要注意的是,如果程序中啟動了多個不同名字的VM broker,那么可能會有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通過在transportOptions中追加broker.useJmx=false來禁用JMX來避免這個警告。
2.2.2 TCP Transport
TCP transport 允許客戶端通過TCP socket連接到遠程的broker。以下是配置語法: tcp://hostname:port?transportOptions Transport Options的可選值如下:
Option Name |
Default Value |
Description |
minmumWireFormatVersion |
0 |
The minimum version wireformat that is allowed |
trace |
false |
Causes all commands that are sent over the transport to be logged |
useLocalHost |
true |
When true, it causes the local machines name to resolve to "localhost". |
socketBufferSize |
64 * 1024 |
Sets the socket buffer size in bytes |
soTimeout |
0 |
sets the socket timeout in milliseconds |
connectionTimeout |
30000 |
A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored. |
wireFormat |
default |
The name of the WireFormat to use |
wireFormat.* |
|
All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information |
例如:tcp://localhost:61616?trace=false
2.2.3 Failover Transport
Failover Transport是一種重新連接的機制,它工作於其它transport的上層,用於建立可靠的傳輸。它的配置語法允許制定任意多個復合的URI。Failover transport會自動選擇其中的一個URI來嘗試建立連接。如果沒有成功,那么會選擇一個其它的URI來建立一個新的連接。以下是配置語法:
failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN
Transport Options的可選值如下:
Option Name |
Default Value |
Description |
initialReconnectDelay |
10 |
How long to wait before the first reconnect attempt (in ms) |
maxReconnectDelay |
30000 |
The maximum amount of time we ever wait between reconnect attempts (in ms) |
useExponentialBackOff |
true |
Should an exponential backoff be used between reconnect attempts |
backOffMultiplier |
2 |
The exponent used in the exponential backoff attempts |
maxReconnectAttempts |
0 |
If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
randomize |
true |
use a random algorithm to choose the URI to use for reconnect from the list provided |
backup |
false |
initialize and hold a second transport connection - to enable fast failover |
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
2.2.4 Discovery transport
Discovery transport是可靠的tranport。它使用Discovery transport來定位用來連接的URI列表。以下是配置語法:
discovery:(discoveryAgentURI)?transportOptions
discovery:discoveryAgentURI
Transport Options的可選值如下:
Option Name |
Default Value |
Description |
initialReconnectDelay |
10 |
How long to wait before the first reconnect attempt |
maxReconnectDelay |
30000 |
The maximum amount of time we ever wait between reconnect attempts |
useExponentialBackOff |
true |
Should an exponential backoff be used btween reconnect attempts |
backOffMultiplier |
2 |
The exponent used in the exponential backoff attempts |
maxReconnectAttempts |
0 |
If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
例:discovery:(multicast://default)?initialReconnectDelay=100 為了使用Discovery來發現broker,需要為broker啟用discovery agent。 以下是XML配置文件中的一個例子:
Xml代碼
1. <broker name="foo">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
4. </transportConnectors>
5. ...
6. </broker>
在使用Failover Transport或Discovery transport等能夠自動重連的transport的時候,需要注意的是:設想有兩個broker,它們都啟用AMQ Message
Store作為持久化存儲,有一個producer和一個consumer連接到某個queue。當因其中一個broker失效時而切換到另一個broker的時候,如果失效的broker的queue中還有未被consumer消費的消息,那么這個queue里的消息仍然滯留在失效broker的中,直到失效的broker被修復並重新切換回這個被修復的broker后,之前被保留的消息才會被consumer消費掉。如果被處理的消息有時序限制,那么應用程序就需要處理這個問題。另外也可以通過ActiveMQ集群來解決這個問題。
在transport重連的時候,可以在connection上注冊TransportListener來獲得回調,例如:
Java代碼
1. (ActiveMQConnection)connection).addTransportListener(new TransportListener() {
2. public void onCommand(Object cmd) {
3. }
4.
5. public void onException(IOException exp) {
6. }
7.
8. public void transportInterupted() {
9. // The transport has suffered an interruption from which it hopes to recover.
10. }
11.
12. public void transportResumed() {
13. // The transport has resumed after an interruption.
14. }
15.});
2.3 持久化
2.3.1 AMQ Message Store
AMQ Message Store是ActiveMQ5.0缺省的持久化存儲。Message commands被保存到transactional journal(由rolling data logs組成)。Messages被保存到data logs中,同時被reference store進行索引以提高存取速度。Date logs由一些單獨的data log文件組成,缺省的文件大小是32M,如果某個消息的大小超過了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某個data log文件中所有的消息都被成功消費了,那么這個data log文件將會被標記,以便在下一輪的清理中被刪除或者歸檔。以下是其配置的一個例子:
Xml代碼
1. <broker brokerName="broker" persistent="true" useShutdownHook="false">
2. <persistenceAdapter>
3. <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
4. </persistenceAdapter>
5. </broker>
Property Name |
Default Value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
useNIO |
true |
use NIO to write messages to the data logs |
syncOnWrite |
false |
sync every write to disk |
maxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
persistentIndex |
true |
use a persistent index for the message logs. If this is false, an in-memory structure is maintained |
maxCheckpointMessageAddSize |
4kb |
the maximum number of messages to keep in a transaction before automatically committing |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
indexBinSize |
1024 |
default number of bins used by the index. The bigger the bin size -the better the relative performance of the index |
indexKeySize |
96 |
the size of the index key - the key is the message id |
indexPageSize |
16kb |
the size of the index page -the bigger the page - the better the write performance of the index |
directoryArchive |
archive |
the path to the directory to use to store discarded data logs |
archiveDataLogs |
false |
if true data logs are moved to the archive directory instead of being deleted |
2.3.2 Kaha Persistence
Kaha Persistence 是一個專門針對消息持久化的解決方案。它對典型的消息使用模式進行了優化。在Kaha中,數據被追加到data logs中。當不再需要log文件中的數據的時候,log文件會被丟棄。以下是其配置的一個例子:
Xml代碼
1. <broker brokerName="broker" persistent="true" useShutdownHook="false">
2. <persistenceAdapter>
3. <kahaPersistenceAdapter directory="activemq-data" maxDataFileLength="33554432"/>
4. </persistenceAdapter>
5. </broker>
2.3.3 JDBC Persistence
目前支持的數據庫有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。 如果你使用的數據庫不被支持,那么可以調整StatementProvider 來保證使用正確的SQL方言(flavour of SQL)。通常絕大多數數據庫支持以下adaptor:
• org.activemq.store.jdbc.adapter.BlobJDBCAdapter
• org.activemq.store.jdbc.adapter.BytesJDBCAdapter
• org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
• org.activemq.store.jdbc.adapter.ImageJDBCAdapter
也可以在配置文件中直接指定JDBC adaptor,例如:
Xml代碼
1. <jdbcPersistenceAdapter adapterClass="org.apache.activemq.store.jdbc.adapter.ImageBasedJDBCAdaptor"/>以下是其配置的一個例子:
Xml代碼
1. <persistence>
2. <jdbcPersistence dataSourceRef=" mysql-ds"/>
3. </persistence>
4.
5. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
6. <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
7. <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
8. <property name="username" value="activemq"/>
9. <property name="password" value="activemq"/>
10. <property name="poolPreparedStatements" value="true"/>
11.</bean>
需要注意的是,如果使用MySQL,那么需要設置relaxAutoCommit 標志為true。
2.3.4 Disable Persistence
以下是其配置的一個例子:
Xml代碼
1. <broker persistent="false">
2. </broker>
2.4 安全機制
ActiveMQ支持可插拔的安全機制,用以在不同的provider之間切換。
2.4.1 Simple Authentication Plugin
Simple Authentication Plugin適用於簡單的認證需求,或者用於建立測試環境。它允許在XML配置文件中指定用戶、用戶組和密碼等信息。以下是ActiveMQ配置的一個例子:
Xml代碼
1. <plugins>
2. ...
3. <simpleAuthenticationPlugin>
4. <users>
5. <authenticationUser username="system" password="manager" groups="users,admins"/>
6. <authenticationUser username="user" password="password" groups="users"/>
7. <authenticationUser username="guest" password="password" groups="guests"/>
8. </users>
9. </simpleAuthenticationPlugin>
10.</plugins>
2.4.2 JAAS Authentication Plugin
JAAS Authentication Plugin依賴標准的JAAS機制來實現認證。通常情況下,你需要通過設置java.security.auth.login.config系統屬性來配置login modules的配置文件。如果沒有指定這個系統屬性,那么JAAS Authentication Plugin會缺省使用login.config作為文件名。以下是一個login.config文件的例子:
activemq-domain {
org.apache.activemq.jaas.PropertiesLoginModule required debug=true
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};
這個login.config文件中設置了兩個屬性:org.apache.activemq.jaas.properties.user和org.apache.activemq.jaas.properties.group分別用來指向user.properties和group.properties文件。需要注意的是,PropertiesLoginModule使用本地文件的查找方式,而且查找時采用的base directory是login.config文件所在的目錄。因此這個login.config說明user.properties和group.properties文件存放在跟login.config文件相同的目錄里。 以下是ActiveMQ配置的一個例子:
Xml代碼
1. <plugins>
2. ...
3. <jaasAuthenticationPlugin configuration="activemq-domain" />
4. </plugins>
基於以上的配置,在JAAS的LoginContext中會使用activemq-domain中配置的PropertiesLoginModule來進行登陸。 ActiveMQ JAAS還支持LDAPLoginModule、CertificateLoginModule、TextFileCertificateLoginModule等login module。
2.4.3 Custom Authentication Implementation
可以通過編碼的方式為ActiveMQ增加認證功能。例如編寫一個類繼承自XBeanBrokerService。
Java代碼
1. package com.yourpackage;
2.
3. import java.net.URI;
4. import java.util.HashMap;
5. import java.util.Map;
6.
7. import org.apache.activemq.broker.Broker;
8. import org.apache.activemq.broker.BrokerFactory;
9. import org.apache.activemq.broker.BrokerService;
10.import org.apache.activemq.security.SimpleAuthenticationBroker;
11.import org.apache.activemq.xbean.XBeanBrokerService;
12.
13.public class SimpleAuthBroker extends XBeanBrokerService {
14. //
15. private String user;
16. private String password;
17.
18. @SuppressWarnings("unchecked")
19. protected Broker addInterceptors(Broker broker) throws Exception {
20. broker = super.addInterceptors(broker);
21. Map passwords = new HashMap();
22. passwords.put(getUser(), getPassword());
23. broker = new SimpleAuthenticationBroker(broker, passwords, new HashMap());
24. return broker;
25. }
26.
27. public String getUser() {
28. return user;
29. }
30.
31. public void setUser(String user) {
32. this.user = user;
33. }
34.
35. public String getPassword() {
36. return password;
37. }
38.
39. public void setPassword(String password) {
40. this.password = password;
41. }
42.}
以下是ActiveMQ配置文件的一個例子:
Xml代碼
1. <beans>
2. …
3. <auth:SimpleAuthBroker
4. xmlns:auth="java://com.yourpackage"
5. xmlns="http://activemq.org/config/1.0" brokerName="SimpleAuthBroker1" user="user" password="password" useJmx="true">
6.
7. <transportConnectors>
8. <transportConnector uri="tcp://localhost:61616"/>
9. </transportConnectors>
10. </auth:SimpleAuthBroker>
11. …
12.</beans>
在這個配置文件中增加了一個namespace auth,用於指向之前編寫的哪個類。同時為SimpleAuthBroker注入了兩個屬性值user和password,因此在被SimpleAuthBroker改寫的addInterceptors方法里,可以使用這兩個屬性進行認證了。ActiveMQ提供的SimpleAuthenticationBroker類繼承自BrokerFilter(可以簡單的看成是Broker的Adaptor),它的構造函數中的兩個Map分別是userPasswords和userGroups。 SimpleAuthenticationBroker在 addConnection方法中使用userPasswords進行認證,同時會把userGroups的信息保存到ConnectionContext中 。
2.4.4 Authorization Plugin
可以通過Authorization Plugin為認證后的用戶授權,以下ActiveMQ配置文件的一個例子:
Xml代碼
1. <plugins>
2. <jaasAuthenticationPlugin configuration="activemq-domain"/>
3.
4. <authorizationPlugin>
5. <map>
6. <authorizationMap>
7. <authorizationEntries>
8. <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
9. <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
10. <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
11.
12. <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
13. <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
14. <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
15.
16. <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
17. </authorizationEntries>
18. </authorizationMap>
19. </map>
20. </authorizationPlugin>
21.</plugins>
2.5 Clustering
ActiveMQ從多種不同的方面提供了集群的支持。
2.5.1 Queue consumer clusters
ActiveMQ支持訂閱同一個queue的consumers上的集群。如果一個consumer失效,那么所有未被確認(unacknowledged)的消息都會被發送到這個queue上其它的consumers。如果某個consumer的處理速度比其它consumers更快,那么這個consumer就會消費更多的消息。 需要注意的是, 筆者發現AcitveMQ5.0版本的Queue consumer clusters存在一個bug: 采用AMQ Message Store,運行一個producer,兩個consumer,並采用如下的配置文件:
Xml代碼
1. <beans>
2. <broker xmlns="http://activemq.org/config/1.0" brokerName="BugBroker1" useJmx="true">
3.
4. <transportConnectors>
5. <transportConnector uri="tcp://localhost:61616"/>
6. </transportConnectors>
7.
8. <persistenceAdapter>
9. <amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>
10. </persistenceAdapter>
11.
12. </broker>
13.</beans>
那么經過一段時間后可能會報出如下錯誤:
ERROR [ActiveMQ Transport: tcp:///127.0.0.1:1843 - RecoveryListenerAdapter.java:58 - RecoveryListenerAdapter]
Message id ID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered from the data store! Apache官方文檔說,此bug已經被修正,預定在5.1.0版本上體現。
2.5.2 Broker clusters
一個常見的場景是有多個JMS broker,有一個客戶連接到其中一個broker。如果這個broker失效,那么客戶會自動重新連接到其它的broker。在ActiveMQ中使用failover:// 協議來實現這個功能。ActiveMQ3.x版本的reliable://協議已經變更為failover://。 如果某個網絡上有多個brokers而且客戶使用靜態發現(使用Static Transport或Failover Transport)或動態發現(使用Discovery Transport),那么客戶可以容易地在某個broker失效的情況下切換到其它的brokers。然而,stand alone brokers並不了解其它brokers上的consumers,也就是說如果某個broker上沒有consumers,那么這個broker上的消息可能會因得不到處理而積壓起來。目前的解決方案是使用Network of brokers,以便在broker之間存儲轉發消息。ActiveMQ在未來會有更好的特性,用來在客戶端處理這個問題。 從ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一個broker會相同對待所有的訂閱(subscription):不管他們是來自本地的客戶連接,還是來自遠程broker,它都會遞送有關的消息拷貝到每個訂閱。遠程broker得到這個消息拷貝后,會依次把它遞送到其內部的本地連接上。有兩種方式配置Network of brokers,一種是使用static transport,如下:
Xml代碼
1. <broker brokerName="receiver" persistent="false" useJmx="false">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:62002"/>
4. </transportConnectors>
5. <networkConnectors>
6. <networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
7. </networkConnectors>
8. …
9. </broker>
另外一種是使用multicast discovery,如下:
Xml代碼
1. <broker name="sender" persistent="false" useJmx="false">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
4. </transportConnectors>
5. <networkConnectors>
6. <networkConnector uri="multicast://default"/>
7. </networkConnectors>
8. ...
9. </broker>
Network Connector有以下屬性:
Property |
Default Value |
Description |
name |
bridge |
name of the network - for more than one network connector between the same two brokers -use different names |
dynamicOnly |
false |
if true, only forward messages if a consumer is active on the connected broker |
decreaseNetworkConsumerPriority |
false |
decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer |
networkTTL |
1 |
the number of brokers in the network that messages and subscriptions can pass through |
conduitSubscriptions |
true |
multiple consumers subscribing to the same destination are treated as one consumer by the network |
excludedDestinations |
empty |
destinations matching this list won't be forwarded across the network |
dynamicallyIncludedDestinations |
empty |
destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded |
staticallyIncludedDestinations |
empty |
destinations that match will always be passed across the network -even if no consumers have ever registered an interest |
duplex |
false |
if true, a network connection will be used to both produce AND Consume messages. This is useful for hub and spoke scenarios when the hub is behind a firewall etc. |
關於conduitSubscriptions屬性,這里稍稍說明一下。設想有兩個brokers,分別是brokerA和brokerB,它們之間用forwarding bridge連接。有一個consumer連接到brokerA並訂閱queue:Q.TEST。有兩個consumers連接到brokerB,也是訂閱queue:Q.TEST。這三個consumers有相同的優先級。然后啟動一個producer,它發送了30條消息到brokerA。如果conduitSubscriptions=true,那么brokerA上的consumer會得到15條消息, 另外15條消息會發送給brokerB。此時負載並不均衡,因為此時brokerA將brokerB上的兩個consumers視為一個;如果conduitSubscriptions=false,那么每個consumer上都會收到10條消息。以下是關於NetworkConnector屬性的一個例子:
Xml代碼
1. <networkConnectors>
2. <networkConnector uri="static://(tcp://localhost:61617)"
3. name="bridge" dynamicOnly="false" conduitSubscriptions="true"
4. decreaseNetworkConsumerPriority="false">
5. <excludedDestinations>
6. <queue physicalName="exclude.test.foo"/>
7. <topic physicalName="exclude.test.bar"/>
8. </excludedDestinations>
9. <dynamicallyIncludedDestinations>
10. <queue physicalName="include.test.foo"/>
11. <topic physicalName="include.test.bar"/>
12. </dynamicallyIncludedDestinations>
13. <staticallyIncludedDestinations>
14. <queue physicalName="always.include.queue"/>
15. <topic physicalName="always.include.topic"/>
16. </staticallyIncludedDestinations>
17. </networkConnector>
18.</networkConnectors>
2.5.3 Master Slave
在一個網絡內運行多個brokers或者stand alone brokers時存在一個問題,這就是消息在物理上只被一個broker持有,因此當某個broker失效,那么你只能等待直到它重啟后,這個broker上的消息才能夠被繼續發送(如果沒有設置持久化,那么在這種情況下,消息將會丟失)。Master Slave 背后的想法是,消息被復制到slave broker,因此即使master broker遇到了像硬件故障之類的錯誤,你也可以立即切換到slave broker而不丟失任何消息。 Master Slave是目前ActiveMQ推薦的高可靠性和容錯的解決方案。以下是幾種不同的類型:
Master Slave Type |
Requirements |
Pros |
Cons |
Pure Master Slave |
None |
No central point of failure |
Requires manual restart to bring back a failed master and can only support 1 slave |
Shared File System Master Slave |
A Shared File system such as a SAN |
Run as many slaves as required. Automatic recovery of old masters |
Requires shared file system |
JDBC Master Slave |
A Shared database |
Run as many slaves as required. Automatic recovery of old masters |
Requires a shared database. Also relatively slow as it cannot use the high performance journal |
2.5.3.1 Pure Master Slave的工作方式
• Slave broker消費master broker上所有的消息狀態,例如消息、確認和事務狀態等。只要slave broker連接到了master broker,它不會(也不被允許)啟動任何network connectors或者transport connectors,所以唯一的目的就是復制master broker的狀態。
• Master broker只有在消息成功被復制到slave broker之后才會響應客戶。例如,客戶的commit請求只有在master broker和slave broker都處理完畢commit請求之后才會結束。
• 當master broker失效的時候,slave broker有兩種選擇,一種是slave broker啟動所有的network connectors和transport connectors,這允許客戶端切換到slave broker;另外一種是slave broker停止。這種情況下,slave broker只是復制了master broker的狀態。
• 客戶應該使用failover transport並且應該首先嘗試連接master broker。例如: failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false 設置randomize為false就可以讓客戶總是首先嘗試連接master broker(slave broker並不會接受任何連接,直到它成為了master broker)。
Pure Master Slave具有以下限制:
• 只能有一個slave broker連接到master broker。
• 在因master broker失效而導致slave broker成為master之后,之前的master broker只有在當前的master broker(原slave broker)停止后才能重新生效。
• Master broker失效后而切換到slave broker后,最安全的恢復master broker的方式是人工處理。首先要停止slave broker(這意味着所有的客戶也要停止)。然后把slave broker的數據目錄中所有的數據拷貝到master broker的數據目錄中。然后重啟master broker和slave broker。
Master broker不需要特殊的配置。Slave broker需要進行以下配置
Xml代碼
1. <broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">
2. ...
3. <transportConnectors>
4. <transportConnector uri="tcp://slavehost:61616"/>
5. </transportConnectors>
6. </broker>
其中的masterConnectorURI用於指向master broker,shutdownOnMasterFailure用於指定slave broker在master broker失效的時候是否需要停止。此外,也可以使用如下配置:
Xml代碼
1. <broker brokerName="slave" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.org/config/1.0">
2. ...
3. <services>
4. <masterConnector remoteURI= "tcp://localhost:62001" userName="user" password="password"/>
5. </services>
6. </broker>
需要注意的是,筆者認為ActiveMQ5.0版本的Pure Master Slave仍然不夠穩定。
2.5.3.2 Shared File System Master Slave
如果你使用SAN或者共享文件系統,那么你可以使用Shared File System Master Slave。基本上,你可以運行多個broker,這些broker共享數據目錄。當第一個broker得到文件上的排他鎖之后,其它的broker便會在循環中等待獲得這把鎖。客戶端使用failover transport來連接到可用的broker。當master broker失效的時候會釋放這把鎖,這時候其中一個slave broker會得到這把鎖從而成為master broker。以下是ActiveMQ配置的一個例子:
Xml代碼
1. <broker useJmx="false" xmlns="http://activemq.org/config/1.0">
2. <persistenceAdapter>
3. <journaledJDBC dataDirectory="/sharedFileSystem/broker"/>
4. </persistenceAdapter>
5. …
6. </broker>
2.5.3.3 JDBC Master Slave
JDBC Master Slave工作原理跟Shared File System Master Slave類似,只是采用了數據庫作為持久化存儲。以下是ActiveMQ配置的一個例子:
Xml代碼
1. <beans>
2. <broker xmlns="http://activemq.org/config/1.0" brokerName="JdbcMasterBroker">
3. ...
4. <persistenceAdapter>
5. <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
6. </persistenceAdapter>
7.
8. </broker>
9.
10. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
11. <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
12. <property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
13. <property name="username" value="username"/>
14. <property name="password" value="passward"/>
15. <property name="poolPreparedStatements" value="true"/>
16. </bean>
17.</beans>
需要注意的是,如果你使用MySQL數據庫,需要首先執行以下三條語句:(Apache官方文檔說,此bug已經被修正,預定在5.1.0版本上體現)
Sql代碼
1. ALTER TABLE activemq_acks ENGINE = InnoDB;
2. ALTER TABLE activemq_lock ENGINE = InnoDB;
3. ALTER TABLE activemq_msgs ENGINE = InnoDB;
2.6 Features
ActiveMQ包含了很多功能強大的特性,下面簡要介紹其中的幾個。
2.6.1 Exclusive Consumer
Queue中的消息是按照順序被分發到consumers的。然而,當你有多個consumers同時從相同的queue中提取消息時,你將失去這個保證。因為這些消息是被多個線程並發的處理。有的時候,保證消息按照順序處理是很重要的。例如,你可能不希望在插入訂單操作結束之前執行更新這個訂單的操作。 ActiveMQ從4.x版本起開始支持Exclusive Consumer (或者說Exclusive Queues)。 Broker會從多個consumers中挑選一個consumer來處理queue中所有的消息,從而保證了消息的有序處理。如果這個consumer失效,那么broker會自動切換到其它的consumer。 可以通過Destination Options 來創建一個Exclusive Consumer,如下:
Java代碼
1. queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
2. consumer = session.createConsumer(queue);
順便說一下,可以給consumer設置優先級,以便針對網絡情況(如network hops)進行優化,如下:
Java代碼
1. queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true &consumer.priority=10");
2.6.2 Message Groups
用Apache官方文檔的話說,Message Groups rock!它是Exclusive Consumer功能的增強。邏輯上,Message Groups 可以看成是一種並發的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來區分message group。Message Groups特性保證所有具有相同JMSXGroupID 的消息會被分發到相同的consumer(只要這個consumer保持active)。另外一方面,Message Groups特性也是一種負載均衡的機制。 在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker 會檢查是否有某個consumer擁有這個message group。如果沒有,那么broker會選擇一個consumer,並將它關聯到這個message group。此后,這個consumer會接收這個message group的所有消息,直到:
• Consumer被關閉。
• Message group被關閉。通過發送一個消息,並設置這個消息的JMSXGroupSeq為0。
從4.1版本開始,ActiveMQ支持一個布爾字段JMSXGroupFirstForConsumer 。當某個message group的第一個消息被發送到consumer的時候,這個字段被設置。如果客戶使用failover transport連接到broker。在由於網絡問題等造成客戶重新連接到broker的時候,相同message group的消息可能會被分發到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會被重新設置。
以下是使用message groups的例子:
Java代碼
1. Mesasge message = session.createTextMessage("<foo>hey</foo>");
2. message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
3. ...
4. producer.send(message);
2.6.3 JMS Selectors
JMS Selectors用於在訂閱中,基於消息屬性對消息進行過濾。JMS Selectors由SQL92語法定義。以下是個Selectors的例子:
Java代碼
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");
在JMS Selectors表達式中,可以使用IN、NOT IN、LIKE等,例如: LIKE '12%3' ('123' true,'12993' true,'1234' false) LIKE 'l_se' ('lose' true,'loose' false) LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false) 需要注意的是,JMS Selectors表達式中的日期和時間需要使用標准的long型毫秒值。另外表達式中的屬性不會自動進行類型轉換,例如:
Java代碼
myMessage.setStringProperty("NumberOfOrders", "2");
"NumberOfOrders > 1" 求值結果是false。關於JMS Selectors的詳細文檔請參考javax.jms.Message的javadoc。 上一小節介紹的Message Groups雖然可以保證具有相同message group的消息被唯一的consumer順序處理,但是卻不能確定被哪個consumer處理。在某些情況下,Message Groups可以和JMS Selector一起工作,例如: 設想有三個consumers分別是A、B和C。你可以在producer中為消息設置三個message groups分別是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作為selector。B和C也同理。這樣就可以保證message group A的消息只被consumer A處理。需要注意的是,這種做法有以下缺點:
• producer必須知道當前正在運行的consumers,也就是說producer和consumer被耦合到一起。
• 如果某個consumer失效,那么應該被這個consumer消費的消息將會一直被積壓在broker上。
2.6.4 Pending Message Limit Strategy
首先簡要介紹一下prefetch機制。ActiveMQ通過prefetch機制來提高性能,這意味這客戶端的內存里可能會緩存一定數量的消息。緩存消息的數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那么broker不會再向consumer分發消息,直到consumer向broker發送消息的確認。可以通過在ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); prefetch size的缺省值如下:
• persistent queues (default value: 1000)
• non-persistent queues (default value: 1000)
• persistent topics (default value: 100)
• non-persistent topics (default value: Short.MAX_VALUE -1)
慢消費者會在非持久的topics上導致問題:一旦消息積壓起來,會導致broker把大量消息保存在內存中,broker也會因此而變慢。未來ActiveMQ可能會實現磁盤緩存,但是這也還是會存在性能問題。目前ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限后,新消息到來時會丟棄舊消息。通過在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。目前有以下兩種:
• ConstantPendingMessageLimitStrategy。這個策略使用常量限制。 例如:<constantPendingMessageLimitStrategy limit="50"/>
• PrefetchRatePendingMessageLimitStrategy。這個策略使用prefetch size的倍數限制。 例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
在以上兩種方式中,如果設置0意味着除了prefetch之外不再緩存消息;如果設置-1意味着禁止丟棄消息。 此外,你還可以配置消息的丟棄策略,目前有以下兩種:
• oldestMessageEvictionStrategy。這個策略丟棄最舊的消息。
• oldestMessageWithLowestPriorityEvictionStrategy。這個策略丟棄最舊的,而且具有最低優先級的消息。
以下是個ActiveMQ配置文件的例子:
Xml代碼
1. <broker persistent="false" brokerName="${brokername}" xmlns="http://activemq.org/config/1.0">
2. <destinationPolicy>
3. <policyMap>
4. <policyEntries>
5. <policyEntry topic="PRICES.>">
6.
7. <subscriptionRecoveryPolicy>
8. <timedSubscriptionRecoveryPolicy recoverDuration="10000" />
9. </subscriptionRecoveryPolicy>
10.
11.
12. <pendingMessageLimitStrategy>
13. <constantPendingMessageLimitStrategy limit="10"/>
14. </pendingMessageLimitStrategy>
15. </policyEntry>
16. </policyEntries>
17. </policyMap>
18. </destinationPolicy>
19. ...
20.</broker>
2.6.5 Composite Destinations
從1.1版本起, ActiveMQ支持composite destinations。它允許用一個虛擬的destination 代表多個destinations。例如你可以通過composite destinations在一個操作中同時向12個queue發送消息。在composite destinations中,多個destination之間采用","分割。例如:
Java代碼
1. Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
如果你希望使用不同類型的destination,那么需要加上前綴如queue:// 或topic://,例如:
Java代碼
1. Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
以下是ActiveMQ配置文件進行配置的一個例子:
Xml代碼
1. <destinationInterceptors>
2. <virtualDestinationInterceptor>
3. <virtualDestinations>
4. <compositeQueue name="MY.QUEUE">
5. <forwardTo>
6. <queue physicalName="FOO" />
7. <topic physicalName="BAR" />
8. </forwardTo>
9. </compositeQueue>
10. </virtualDestinations>
11. </virtualDestinationInterceptor>
12.</destinationInterceptors>
可以在轉發前,先通過JMS Selector判斷一個消息是否需要轉發,例如:
Xml代碼
1. <destinationInterceptors>
2. <virtualDestinationInterceptor>
3. <virtualDestinations>
4. <compositeQueue name="MY.QUEUE">
5. <forwardTo>
6. <filteredDestination selector="odd = 'yes'" queue="FOO"/>
7. <filteredDestination selector="i = 5" topic="BAR"/>
8. </forwardTo>
9. </compositeQueue>
10. </virtualDestinations>
11. </virtualDestinationInterceptor>
12.</destinationInterceptors>
2.6.6 Mirrored Queues
每個queue中的消息只能被一個consumer消費。然而,有時候你可能希望能夠監視生產者和消費者之間的消息流。你可以通過使用Virtual Destinations 來建立一個virtual queue 來把消息轉發到多個queues中。但是 為系統中每個queue都進行如此的配置可能會很麻煩。 ActiveMQ支持Mirrored Queues。Broker會把發送到某個queue的所有消息轉發到一個名稱類似的topic,因此監控程序可以訂閱這個mirrored queue topic。為了啟用Mirrored Queues,首先要將BrokerService的useMirroredQueues屬性設置成true,然后可以通過destinationInterceptors設置其它屬性,如mirror topic的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一個例子:
Xml代碼
1. <broker xmlns="http://activemq.org/config/1.0" brokerName="MirroredQueuesBroker1" useMirroredQueues="true">
2.
3. <transportConnectors>
4. <transportConnector uri="tcp://localhost:61616"/>
5. </transportConnectors>
6.
7. <destinationInterceptors>
8. <mirroredQueue copyMessage = "true" prefix="Mirror.Topic"/>
9. </destinationInterceptors>
10. ...
11.</broker>
假如某個producer向名為Foo.Bar的queue中發送消息,那么你可以通過訂閱名為Mirror.Topic.Foo.Bar的topic來獲得發送到Foo.Bar中的所有消息。
2.6.7 Wildcards
Wildcards用來支持聯合的名字分層體系(federated name hierarchies)。它不是JMS規范的一部分,而是ActiveMQ的擴展。ActiveMQ支持以下三種wildcards:
• "." 用於作為路徑上名字間的分隔符。
• "*" 用於匹配路徑上的任何名字。
• ">" 用於遞歸地匹配任何以這個名字開始的destination。
作為一種組織事件和訂閱感興趣那部分信息的一種方法,這個概念在金融市場領域已經流行了一段時間了。設想你有以下兩個destination:
• PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股價)
• PRICE.STOCK.NYSE.SUNW (SUN在紐約證券交易所的股價)
訂閱者可以明確地指定destination的名字來訂閱消息,或者它也可以使用wildcards來定義一個分層的模式來匹配它希望訂閱的destination。例如:
Subscription Meaning
PRICE.> Any price for any product on any exchange
PRICE.STOCK.> Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
PRICE.STOCK.*.IBM Any IBM stock price on any exchange
2.6.8 Async Sends
ActiveMQ支持以同步(sync)方式或者異步(async)方式向broker發送消息。 使用何種方式對send方法的延遲有巨大的影響。對於生產者來說,既然延遲是決定吞吐量的重要因素,那么使用異步發送方式會極大地提高系統的性能。 ActiveMQ缺省使用異步傳輸方式。但是按照JMS規范,當在事務外發送持久化消息的時候,ActiveMQ會強制使用同步發送方式。在這種情況下,每一次發送都是同步的,而且阻塞到收到broker的應答。這個應答保證了broker已經成功地將消息持久化,而且不會丟失。但是這樣作也嚴重地影響了性能。 如果你的系統可以容忍少量的消息丟失,那么可以在事務外發送持久消息的時候,選擇使用異步方式。以下是幾種不同的配置方式:
Java代碼
1. cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
2. ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3. ((ActiveMQConnection)connection).setUseAsyncSend(true);
2.6.9 Dispatch Policies
2.6.9.1 Round Robin Dispatch Policy
在2.6.4小節介紹過ActiveMQ的prefetch機制,ActiveMQ的缺省參數是針對處理大量消息時的高性能和高吞吐量而設置的。所以缺省的prefetch參數比較大,而且缺省的dispatch policies會嘗試盡可能快的填滿prefetch緩沖。然而在有些情況下,例如只有少量的消息而且單個消息的處理時間比較長,那么在缺省的prefetch和dispatch policies下,這些少量的消息總是傾向於被分發到個別的consumer上。這樣就會因為負載的不均衡分配而導致處理時間的增加。 Round robin dispatch policy會嘗試平均分發消息,以下是ActiveMQ配置文件的一個例子:
Xml代碼
1. <destinationPolicy>
2. <policyMap>
3. <policyEntries>
4. <policyEntry topic="FOO.>">
5. <dispatchPolicy>
6. <roundRobinDispatchPolicy />
7. </dispatchPolicy>
8. </policyEntry>
9. </policyEntries>
10. </policyMap>
11.</destinationPolicy>
2.6.9.2 Strict Order Dispatch Policy
有時候需要保證不同的topic consumer以相同的順序接收消息。通常ActiveMQ會保證topic consumer以相同的順序接收來自同一個producer的消息。然而,由於多線程和異步處理,不同的topic consumer可能會以不同的順序接收來自不同producer的消息。例如有兩個producer,分別是P和Q。差不多是同一時間內,P發送了P1、P2和P3三個消息;Q發送了Q1和Q2兩個消息。兩個不同的consumer可能會以以下順序接收到消息:
consumer1: P1 P2 Q1 P3 Q2
consumer2: P1 Q1 Q2 P2 P3
Strict order dispatch policy 會保證每個topic consumer會以相同的順序接收消息,代價是性能上的損失。以下是采用了strict order dispatch policy后,兩個不同的consumer可能以以下的順序接收消息:
consumer1: P1 P2 Q1 P3 Q2
consumer2: P1 P2 Q1 P3 Q2
以下是ActiveMQ配置文件的一個例子:
Xml代碼
1. <destinationPolicy>
2. <policyMap>
3. <policyEntries>
4. <policyEntry topic=""FOO.>">
5. <dispatchPolicy>
6. <strictOrderDispatchPolicy />
7. </dispatchPolicy>
8. </policyEntry>
9. </policyEntries>
10. </policyMap>
11.</destinationPolicy>
2.6.10 Message Cursors
當producer發送的持久化消息到達broker之后,broker首先會把它保存在持久存儲中。接下來,如果發現當前有活躍的consumer,而且這個consumer消費消息的速度能跟上producer生產消息的速度,那么ActiveMQ會直接把消息傳遞給broker內部跟這個consumer關聯的dispatch queue;如果當前沒有活躍的consumer或者consumer消費消息的速度跟不上producer生產消息的速度,那么ActiveMQ會使用Pending Message Cursors保存對消息的引用。在需要的時候,Pending Message Cursors把消息引用傳遞給broker內部跟這個consumer關聯的dispatch queue。以下是兩種Pending Message Cursors:
• VM Cursor。在內存中保存消息的引用。
• File Cursor。首先在內存中保存消息的引用,如果內存使用量達到上限,那么會把消息引用保存到臨時文件中。
在缺省情況下,ActiveMQ 5.0根據使用的Message Store來決定使用何種類型的Message Cursors,但是你可以根據destination來配置Message Cursors。
對於topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有vmDurableCursor 和 fileDurableSubscriberCursor。以下是ActiveMQ配置文件的一個例子:
Xml代碼
1. <destinationPolicy>
2. <policyMap>
3. <policyEntries>
4. <policyEntry topic="org.apache.>">
5. <pendingSubscriberPolicy>
6. <vmCursor />
7. </pendingSubscriberPolicy>
8. <PendingDurableSubscriberMessageStoragePolicy>
9. <vmDurableCursor/>
10. </PendingDurableSubscriberMessageStoragePolicy>
11. </policyEntry>
12. </policyEntries>
13. </policyMap>
14.</destinationPolicy>
對於queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。以下是ActiveMQ配置文件的一個例子:
Xml代碼
1. <destinationPolicy>
2. <policyMap>
3. <policyEntries>
4. <policyEntry queue="org.apache.>">
5. <pendingQueuePolicy>
6. <vmQueueCursor />
7. </pendingQueuePolicy>
8. </policyEntry>
9. </policyEntries>
10. </policyMap>
11.</destinationPolicy>
2.6.11 Optimized Acknowledgement
ActiveMQ缺省支持批量確認消息。由於批量確認會提高性能,因此這是缺省的確認方式。如果希望在應用程序中禁止經過優化的確認方式,那么可以采用如下方法:
Java代碼
1. cf = new ActiveMQConnectionFactory ("tcp://locahost:61616?jms.optimizeAcknowledge=false");
2. ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(false);
3. ((ActiveMQConnection)connection).setOptimizeAcknowledge(false);
2.6.12 Producer Flow Control
同步發送消息的producer會自動使用producer flow control ;對於異步發送消息的producer,要使用producer flow control,你先要為connection配置一個ProducerWindowSize參數,如下:
Java代碼
1. ((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);
ProducerWindowSize是producer在發送消息的過程中,收到broker對於之前發送消息的確認之前, 能夠發送消息的最大字節數。你也可以禁用producer flow control,以下是ActiveMQ配置文件的一個例子:
Java代碼
1. <destinationPolicy>
2. <policyMap>
3. <policyEntries>
4. <policyEntry topic="FOO.>" producerFlowControl="false">
5. <dispatchPolicy>
6. <strictOrderDispatchPolicy/>
7. </dispatchPolicy>
8. </policyEntry>
9. </policyEntries>
10. </policyMap>
11.</destinationPolicy>
2.6.13 Message Transformation
有時候需要在JMS Provider內部進行message的轉換。從4.2版本起,ActiveMQ 提供了一個MessageTransformer 接口用於進行消息轉換,如下:
Java代碼
1. public interface MessageTransformer {
2. Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
3. Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException;
4. }
通過在以下對象上通過調用setTransformer方法來設置MessageTransformer:
• ActiveMQConnectionFactory
• ActiveMQConnection
• ActiveMQSession
• ActiveMQMessageConsumer
• ActiveMQMessageProducer
MessageTransformer接口支持:
• 在消息被發送到JMS Provider的消息總線前進行轉換。通過producerTransform方法。
• 在消息到達消息總線后,但是在consumer接收到消息前進行轉換。通過consumerTransform方法。
以下是個簡單的例子:
Java代碼
public class SimpleMessage implements Serializable { private static final long serialVersionUID = 2251041841871975105L; private String id; private String text; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getText() { return text; } public void setText(String text) { this.text = text; } }
在producer內發送ObjectMessage,如下:
Java代碼
SimpleMessage sm = new SimpleMessage(); sm.setId("1"); sm.setText("this is a sample message"); ObjectMessage message = session.createObjectMessage(); message.setObject(sm); producer.send(message); 在consumer的session上設置一個MessageTransformer用於將ObjectMessage轉換成TextMessage,如下: Java代碼 ((ActiveMQSession)session).setTransformer(new MessageTransformer() { public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException { ObjectMessage om = (ObjectMessage)message; XStream xstream = new XStream(); xstream.alias("simple message", SimpleMessage.class); String xml = xstream.toXML(om.getObject()); return session.createTextMessage(xml); } public Message producerTransform(Session session, MessageProducer consumer, Message message) throws JMSException { return null; } });
歡迎關注