一、常用配置屬性
以下配置文件目錄均為:${activemq_home}/conf/activemq.xml
1、定期掃描清理
ActiveMQ中有一項功能:Delete Inactive Destination。可以處理 “ 沒有消費者且未處理的Destination”,也就是 queue 或者 topic 在規定時間內,沒有入隊記錄或者有效訂閱,會被清理刪除。
下面基於Queue的配置,Topic的配置類似。
其中屬性定義:schedulePeriodForDestinationPurge - 必填。聲明掃描閑置隊列的周期,單位毫秒。默認值0,為不掃描。
gcInactiveDestinations - 必填。針對Destination的配置,聲明Broker掃描閑置隊列時,是否掃描此隊列。默認值false
inactiveTimoutBeforeGC - 選填。配合gcInactiveDestinations=true時才生效。聲明Destination閑置多久可以進行刪除,單位毫秒。默認值60。
2、存儲空間設置
其中的配置標簽: memoryUsage - 表示ActiveMQ使用的內存。這個值必須大於等於destinationPolicy中設置的所有Destination的內存之和。
storeUsage - 表示持久化存儲文件的大小
tempUsage - 非持久化消息的臨時內存大小
3、kahadb方式的持久化的刷盤方式
其中屬性定義:journalDiskSyncStrategy - 聲明持久化方式,可選值always :periodic :never 。默認always
journalDiskSyncInterval - 聲明數據落盤的時間間隔。單位毫秒
4、JMX配置使用
broker標簽增加:
managementContext標簽修改:
5、消息過期時間
其中標簽及屬性定義:timeStampingBrokerPlugin - 為持久化的消息設置過期時間,zeroExpirationOverride 為沒有設置有效時間的消息設置過期時間。
ttlCeiling 表示過期時間上線,如果程序中設置的時間超過這個值,以此值為准。
expireMessagesPeriod - 聲明掃描過期消息的時間間隔,單位毫秒。
queue=">" 、topic=">" 表示匹配所有的Destination
queue="tp_>" 、topic="tp_>" 表示匹配所有的 tp_ 開頭的Destination
二、消息的持久化
ActiveMQ的消息,Queue中的消息會根據配置的不同實現持久化,Topic如果不存在持久化訂閱者的話會直接丟棄消息。
持久化的配置,在${activemq_home}/conf/activemq.xml配置文件中,標簽 broker內部的 persistenceAdapter 中配置持久化方案
1、kahadb
5.4版本之后默認的持久化方式。文件型的數據庫存儲,配置如下:
<persistenceAdapter> <!-- directory:保存數據的目錄; journalMaxFileLength:保存消息的文件大小 --> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter>
2、JDBC
ActiveMQ將消息持久化保存到數據庫中,沒有限定使用某一個數據庫。
配置成功后,需要在數據庫中建立相應的database,后續ActiveMQ啟動才能正常訪問。ActiveMQ正常啟動后,能夠自動的創建相應的數據表:
activemq_msgs - 用於存儲消息的表
activemq_acks - 存儲訂閱關系
activemq_lock - 集群環境中使用,同一時間只能有一個Master Broker
下文配置中以MySQL為例:
2.1、定義數據源bean(與broker同級)
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/>
<!-- 屬性雷同可根據具體連接池配置 --> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
其中數據連接池可以任意其他產品,只需要將相應的jar包拷貝到 ${activemq_home}/lib 目錄下即可。
2.2、配置 persistenceAdapter 標簽
<persistenceAdapter> <!-- createTablesOnStartup標簽是否啟動時創建數據表,默認值為true。一般第一次啟動的時候true,后續修改為false --> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter>
三、ActiveMQ的橋接模式 (network)
${activemq_home}/conf/activemq.xml 配置文件中,broker標簽中,增加如下:
<networkConnectors> <networkConnectoruri="static://(tcp://ip:61616)"/> </networkConnector>
在兩個ActiveMQ應用中,分別配置對方的地址信息,則兩個Broker就通過一個static協議進行網絡連接。一個consumer連接到Broker1上目的地A,當producer發送到Broker2上的相同目的地A的時候,消息會轉發到Broker1上面供消費使用。
四、虛擬Topic(Virtual Topic)
1、Topic的兩種訂閱模式
以下測試。為了能夠清晰的查看消息是否持久化,可以配置成JDBC方式的持久化方案,數據庫查看。
消息生產者
package com.cfang.prebo.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicPublisher; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { private static String brokerURL = "tcp://localhost:61618"; private static ConnectionFactory connectionFactory = null; private static Connection connection = null; private static Session session = null; private static MessageProducer messageProducer = null; public static void main(String[] args) throws JMSException { connectionFactory = new ActiveMQConnectionFactory(brokerURL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("TP_TEST_03"); messageProducer = session.createProducer(destination); Message message = session.createTextMessage("test01"); messageProducer.send(message); connection.close(); } }
1.1、普通訂閱
Topic不會持久化消息,如果不存在訂閱關系,則Topic會直接丟棄消息,后續再訂閱也不會收到之前的任何消息。
package com.cfang.prebo.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author cfang * 普通訂閱 */ public class SimpleSubscribe { private static String brokerURL = "tcp://localhost:61618"; private static ConnectionFactory connectionFactory = null; private static Connection connection = null; private static Session session = null; public static void main(String[] args) throws JMSException { connectionFactory = new ActiveMQConnectionFactory(brokerURL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination topic = session.createTopic("TP_TEST_03"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println(message); } }); } }
生產者發送消息,數據庫不保存:
1.2、持久化訂閱
持久化訂閱者一定得預先加載一次,也就是向broker注冊這個消費訂閱者。之后,無論訂閱者是否在線,最終都會收到消息。不在線的話,等到再次連接的時候,會將沒有受過的消息全部接收處理。
package com.cfang.prebo.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author cfang * 持久訂閱 */ public class DurableSubscribe { private static String brokerURL = "tcp://localhost:61618"; private static ConnectionFactory connectionFactory = null; private static Connection connection = null; private static Session session = null; public static void main(String[] args) throws JMSException { connectionFactory = new ActiveMQConnectionFactory(brokerURL); connection = connectionFactory.createConnection(); connection.setClientID("test01"); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("TP_TEST_03"); TopicSubscriber consumer = session.createDurableSubscriber(topic, "Test-subscriber"); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println(message); } }); } }
運行上面訂閱程序,然后關閉程序,訂閱關系會保存到表 activemq_acks 中:
發送消息,會持久化到表 activemq_msgs中,消息會在持久化訂閱者在線的時候供消費使用:
1.3、離線的持久訂閱的清理
上述配置說明:broker每小時檢查並刪除離線1天以上的持久訂閱者。
其中標簽、屬性定義: offlineDurableSubscriberTimeout - 刪除的離線的持久訂閱者時間,單位毫秒,默認 -1
offlineDurableSubscriberTaskSchedule - 聲明掃描時間間隔,單位毫秒。
2、虛擬Topic的應用
Virtual Topic,對生產者來說是個topic,對消費者來說是個queue。內部處理機制是由Broker將接收到的消息進行二次分發到每個queue,然后不同的queue對應不同的應用實現持久化,不同的消費端只需要關注自己的queue就可以了。這樣,對每個queue可以做應用內可以做failover處理,也可達到了對topic的多consumer共同處理。
下面是簡單的代碼測試
package com.cfang.prebo.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * VirtualTopic生產者 */ public class VirtualProducer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("VirtualTopic.TP"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message message = session.createTextMessage("hello world"); message.setStringProperty("keys", "value-bak"); producer.send(message); session.close(); connection.close(); } }
package com.cfang.prebo.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * VirtualTopic 消費端 * */ public class VirtualConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destinationA = session.createQueue("Consumer.A.VirtualTopic.TP"); Destination destinationB = session.createQueue("Consumer.B.VirtualTopic.TP"); MessageConsumer consumerA1 = session.createConsumer(destinationA); consumerA1.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println("A1:"+message); } }); MessageConsumer consumerA2 = session.createConsumer(destinationA); consumerA2.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println("A2:"+message); } }); MessageConsumer consumerB = session.createConsumer(destinationB); consumerB.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println("B:"+message); } }); System.in.read(); } }
其中,消費端的A1和A2為一個應用,組成內部負載和failover。
VirtualTopic的消費端queue名稱的前后綴可通過配置文件${activemq_home}/conf/activemq.xml修改,一般也不需要修改前后綴,極少應用
在 broker標簽中,加入以下內容,則前綴就修改成了VT,之前例子中的消費端,監聽的queue名稱就改為 “VT.A.VirtualTopic.TP” 和 “VT.B.VirtualTopic.TP”
selectorAware 屬性表明 consumer 有selector 的話,則對消息進行過濾,只有符合selector的消息才會分發到queue中。
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name=">" prefix="VT.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
總結點VirtualTopic:
1、虛擬Topic是一種特殊命名的Topic,系統根據命名規則將該Topic內的消息分發給當前存在的名稱對應的Queue,分發是非持久化的,新加入的Queue是接收不到過去的消息的。
2、虛擬Topic的功能完全是中間件本身額外附加的機制,對於生產者和消費者都是無感知的。