ActiveMQ高級特性


一、常用配置屬性

  以下配置文件目錄均為:${activemq_home}/conf/activemq.xml

  1、定期掃描清理

     ActiveMQ中有一項功能:Delete Inactive Destination。可以處理 “ 沒有消費者且未處理的Destination”,也就是 queue 或者 topic 在規定時間內,沒有入隊記錄或者有效訂閱,會被清理刪除。

    下面基於Queue的配置,Topic的配置類似。

     

    其中屬性定義:schedulePeriodForDestinationPurge - 必填。聲明掃描閑置隊列的周期,單位毫秒。默認值0,為不掃描。

           gcInactiveDestinations - 必填。針對Destination的配置,聲明Broker掃描閑置隊列時,是否掃描此隊列。默認值false

           inactiveTimoutBeforeGC - 選填。配合gcInactiveDestinations=true時才生效。聲明Destination閑置多久可以進行刪除,單位毫秒。默認值60。

  2、存儲空間設置

     

    其中的配置標簽: memoryUsage - 表示ActiveMQ使用的內存。這個值必須大於等於destinationPolicy中設置的所有Destination的內存之和。

            storeUsage - 表示持久化存儲文件的大小

            tempUsage - 非持久化消息的臨時內存大小

  3、kahadb方式的持久化的刷盤方式

   

    其中屬性定義:journalDiskSyncStrategy - 聲明持久化方式,可選值always :periodic :never 。默認always

           journalDiskSyncInterval - 聲明數據落盤的時間間隔。單位毫秒

  4、JMX配置使用

     broker標簽增加:

    

    managementContext標簽修改:

    

  5、消息過期時間

   

    其中標簽及屬性定義:timeStampingBrokerPlugin - 為持久化的消息設置過期時間,zeroExpirationOverride 為沒有設置有效時間的消息設置過期時間。

                ttlCeiling 表示過期時間上線,如果程序中設置的時間超過這個值,以此值為准。

              expireMessagesPeriod -  聲明掃描過期消息的時間間隔,單位毫秒。

              queue=">" 、topic=">" 表示匹配所有的Destination

              queue="tp_>" 、topic="tp_>"  表示匹配所有的 tp_ 開頭的Destination

二、消息的持久化

  ActiveMQ的消息,Queue中的消息會根據配置的不同實現持久化,Topic如果不存在持久化訂閱者的話會直接丟棄消息。

  持久化的配置,在${activemq_home}/conf/activemq.xml配置文件中,標簽 broker內部的 persistenceAdapter 中配置持久化方案

  1、kahadb

   5.4版本之后默認的持久化方式。文件型的數據庫存儲,配置如下:

<persistenceAdapter>
<!-- directory:保存數據的目錄; journalMaxFileLength:保存消息的文件大小 -->
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>

  2、JDBC

    ActiveMQ將消息持久化保存到數據庫中,沒有限定使用某一個數據庫。

    配置成功后,需要在數據庫中建立相應的database,后續ActiveMQ啟動才能正常訪問。ActiveMQ正常啟動后,能夠自動的創建相應的數據表:

    activemq_msgs - 用於存儲消息的表

    activemq_acks - 存儲訂閱關系

    activemq_lock - 集群環境中使用,同一時間只能有一個Master Broker

    下文配置中以MySQL為例:

    2.1、定義數據源bean(與broker同級)

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  <property name="username" value="activemq"/>
  <property name="password" value="activemq"/>
  <!-- 屬性雷同可根據具體連接池配置 -->   <property name="maxActive" value="200"/>   <property name="poolPreparedStatements" value="true"/> </bean>

  其中數據連接池可以任意其他產品,只需要將相應的jar包拷貝到 ${activemq_home}/lib 目錄下即可。

    2.2、配置 persistenceAdapter 標簽

<persistenceAdapter>
    <!-- createTablesOnStartup標簽是否啟動時創建數據表,默認值為true。一般第一次啟動的時候true,后續修改為false --> 
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>

 

三、ActiveMQ的橋接模式 (network)

  ${activemq_home}/conf/activemq.xml 配置文件中,broker標簽中,增加如下:

<networkConnectors>
 <networkConnectoruri="static://(tcp://ip:61616)"/>
</networkConnector>

  在兩個ActiveMQ應用中,分別配置對方的地址信息,則兩個Broker就通過一個static協議進行網絡連接。一個consumer連接到Broker1上目的地A,當producer發送到Broker2上的相同目的地A的時候,消息會轉發到Broker1上面供消費使用。

 

四、虛擬Topic(Virtual Topic)

  1、Topic的兩種訂閱模式

    以下測試。為了能夠清晰的查看消息是否持久化,可以配置成JDBC方式的持久化方案,數據庫查看。

    消息生產者

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicPublisher;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

	private static String brokerURL = "tcp://localhost:61618";
	private static ConnectionFactory connectionFactory = null;
	private static Connection connection = null;
	private static Session session = null;
	private static MessageProducer messageProducer = null;
	
	public static void main(String[] args) throws JMSException {
		connectionFactory = new ActiveMQConnectionFactory(brokerURL);
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createTopic("TP_TEST_03");
		messageProducer = session.createProducer(destination);
		Message message = session.createTextMessage("test01");
		messageProducer.send(message);
		connection.close();
	}
}

    1.1、普通訂閱

      Topic不會持久化消息,如果不存在訂閱關系,則Topic會直接丟棄消息,后續再訂閱也不會收到之前的任何消息。

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author cfang
 *	普通訂閱
 */
public class SimpleSubscribe {

	private static String brokerURL = "tcp://localhost:61618";
	private static ConnectionFactory connectionFactory = null;
	private static Connection connection = null;
	private static Session session = null;
	
	public static void main(String[] args) throws JMSException {
		connectionFactory = new ActiveMQConnectionFactory(brokerURL);
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination topic = session.createTopic("TP_TEST_03");
		MessageConsumer consumer = session.createConsumer(topic);
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				System.out.println(message);
			}
		});
	}
}

  生產者發送消息,數據庫不保存:

    1.2、持久化訂閱

    持久化訂閱者一定得預先加載一次,也就是向broker注冊這個消費訂閱者。之后,無論訂閱者是否在線,最終都會收到消息。不在線的話,等到再次連接的時候,會將沒有受過的消息全部接收處理。

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author cfang
 *	持久訂閱
 */
public class DurableSubscribe {
	
	private static String brokerURL = "tcp://localhost:61618";
	private static ConnectionFactory connectionFactory = null;
	private static Connection connection = null;
	private static Session session = null;

	public static void main(String[] args) throws JMSException {
		connectionFactory = new ActiveMQConnectionFactory(brokerURL);
		connection = connectionFactory.createConnection();
		connection.setClientID("test01");
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("TP_TEST_03");
		TopicSubscriber consumer = session.createDurableSubscriber(topic, "Test-subscriber");
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				System.out.println(message);
			}
		});
	}
}

  運行上面訂閱程序,然后關閉程序,訂閱關系會保存到表 activemq_acks 中:

  

  發送消息,會持久化到表 activemq_msgs中,消息會在持久化訂閱者在線的時候供消費使用:

  

    1.3、離線的持久訂閱的清理

  

  上述配置說明:broker每小時檢查並刪除離線1天以上的持久訂閱者。  

  其中標簽、屬性定義: offlineDurableSubscriberTimeout -  刪除的離線的持久訂閱者時間,單位毫秒,默認 -1

             offlineDurableSubscriberTaskSchedule - 聲明掃描時間間隔,單位毫秒。

  2、虛擬Topic的應用

    Virtual Topic,對生產者來說是個topic,對消費者來說是個queue。內部處理機制是由Broker將接收到的消息進行二次分發到每個queue,然后不同的queue對應不同的應用實現持久化,不同的消費端只需要關注自己的queue就可以了。這樣,對每個queue可以做應用內可以做failover處理,也可達到了對topic的多consumer共同處理。

    下面是簡單的代碼測試

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 	VirtualTopic生產者
 */
public class VirtualProducer {

	public static void main(String[] args) throws JMSException {
		
		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createTopic("VirtualTopic.TP");
		MessageProducer producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		Message message = session.createTextMessage("hello world");
		message.setStringProperty("keys", "value-bak");
		producer.send(message);
		session.close();
		connection.close();
	}
}

  

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * VirtualTopic 消費端
 *
 */
public class VirtualConsumer {

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destinationA = session.createQueue("Consumer.A.VirtualTopic.TP");
		Destination destinationB = session.createQueue("Consumer.B.VirtualTopic.TP");
		
		MessageConsumer consumerA1 = session.createConsumer(destinationA);
		consumerA1.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				System.out.println("A1:"+message);
			}
		});
		MessageConsumer consumerA2 = session.createConsumer(destinationA);
		consumerA2.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				System.out.println("A2:"+message);
			}
		});
		
		MessageConsumer consumerB = session.createConsumer(destinationB);
		consumerB.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				System.out.println("B:"+message);
			}
		});
		System.in.read();
	}
}

  其中,消費端的A1和A2為一個應用,組成內部負載和failover。

  VirtualTopic的消費端queue名稱的前后綴可通過配置文件${activemq_home}/conf/activemq.xml修改,一般也不需要修改前后綴,極少應用

  在 broker標簽中,加入以下內容,則前綴就修改成了VT,之前例子中的消費端,監聽的queue名稱就改為 “VT.A.VirtualTopic.TP” 和 “VT.B.VirtualTopic.TP”

  selectorAware 屬性表明 consumer 有selector 的話,則對消息進行過濾,只有符合selector的消息才會分發到queue中。

<destinationInterceptors>
			<virtualDestinationInterceptor>
				<virtualDestinations>
					<virtualTopic name=">" prefix="VT.*." selectorAware="false"/>
				</virtualDestinations>
			</virtualDestinationInterceptor>
		</destinationInterceptors>

  

  總結點VirtualTopic:

    1、虛擬Topic是一種特殊命名的Topic,系統根據命名規則將該Topic內的消息分發給當前存在的名稱對應的Queue,分發是非持久化的,新加入的Queue是接收不到過去的消息的。

    2、虛擬Topic的功能完全是中間件本身額外附加的機制,對於生產者和消費者都是無感知的。

    3、虛擬Topic是非持久化的,不存在積壓。
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM