1. Composite Destinations 組合目的地
組合隊列Composite Destinations : 允許用一個虛擬的destination代表多個destinations,這樣就可以通過composite destinations在一個操作中同時向多個queue/topic發送消息。
有兩種實現方式:
第一種:在客戶端編碼實現
第二種:在activemq.xml配置文件中實現
- 第一種:在客戶端編碼實現
在composite destinations中,多個destination之間采用","分隔。如下:這里有2個destination "my-queue3"和"topic://topic-1",這個代表主題模式的topic-1
private static final String queueName = "my-queue3,topic://topic-1";
默認是queue模式。
package cn.qlq.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 生產消息 */ public class MsgProducer { // 默認端口61616 private static final String url = "tcp://localhost:61616/"; private static final String queueName = "my-queue3,topic://topic-1"; private static Session session = null; public static void main(String[] args) throws JMSException { // 1創建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2.由connectionFactory創建connection Connection connection = connectionFactory.createConnection(); // 3.啟動connection connection.start(); // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 5.創建Destination(Queue繼承Queue) Queue destination = session.createQueue(queueName); TemporaryQueue temporaryQueue = session.createTemporaryQueue(); // 6.創建生產者producer MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 5; i++) { // 7.創建Message,有好多類型,這里用最簡單的TextMessage TextMessage tms = session.createTextMessage("textMessage:" + i); // 設置附加屬性 tms.setStringProperty("str", "stringProperties" + i); tms.setJMSPriority(6); tms.setJMSReplyTo(temporaryQueue); // 8.生產者發送消息 producer.send(tms); } // 9.提交事務 session.commit(); // 10.關閉connection session.close(); connection.close(); } }
結果會創建5條 my-queue3 隊列消息 與 5條 主題模式 topic-1 消息。
消費者正常消費即可,與隊列模型的消息和主題模式的消息消費一樣。
- 第二種:在activemq.xml配置文件中實現
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="comQueue"> <forwardTo> <queue physicalName="queue88" /> <topic physicalName="topic88" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> 。。。
程序中向組合隊列發送消息:
package cn.qlq.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 生產消息 */ public class MsgProducer { // 默認端口61616 private static final String url = "tcp://localhost:61616/"; private static final String queueName = "comQueue"; private static Session session = null; public static void main(String[] args) throws JMSException { // 1創建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2.由connectionFactory創建connection Connection connection = connectionFactory.createConnection(); // 3.啟動connection connection.start(); // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 5.創建Destination(Queue繼承Queue) Queue destination = session.createQueue(queueName); TemporaryQueue temporaryQueue = session.createTemporaryQueue(); // 6.創建生產者producer MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 5; i++) { // 7.創建Message,有好多類型,這里用最簡單的TextMessage TextMessage tms = session.createTextMessage("textMessage:" + i); // 設置附加屬性 tms.setStringProperty("str", "stringProperties" + i); tms.setJMSPriority(6); tms.setJMSReplyTo(temporaryQueue); // 8.生產者發送消息 producer.send(tms); } // 9.提交事務 session.commit(); // 10.關閉connection session.close(); connection.close(); } }
結果:
queue88產生五條消息:
topic88生產五條消息:
2 .Configure Startup Destinations--啟動創建隊列和主題,只是沒有消息
在啟動ActiveMQ的時候如果需要創建Destination的話,可以在activemq.xml中配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}"> <destinations> <queue physicalName="autoqueue" /> <topic physicalName="autotopic" /> </destinations> ...
3.Delete Inactive Destinations---刪除沒有消息的隊列或主題
在ActiveMQ的queue在不使用之后,可以通過web控制台或者JMX方式來刪除掉,當然,也可以通過配置,使得broker可以自動探測到無用的隊列並刪除掉,回收響應資源。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" /> </policyEntries> </policyMap> </destinationPolicy> 。。。
說明:
schedulePeriodForDestinationPurge: 設置多長時間檢查一次,這里是1秒。
inactiveTimoutBeforeGC: 設置當Destination為空后,多長時間被刪除,這里是30秒。
gcInactiveDestinations:設置刪除掉不活動的隊列,默認為false
4.wildcars(通配符)
Wildcars用來支持名字分層體系,它不是JMS規范的一部分,是ActiveMQ的擴展。
ActiveMQ支持以下三種wildcars:
. 用於作為路徑上名字間的分隔符
* 用於匹配路徑上的任何名字
> 用於遞歸地匹配任何以這個名字開始的destination
5. Destination 選項
這個是給消費者在JMS規范之外添加的功能特性,通過在隊列名稱后面使用類似url的語法添加多個選項。包括:
1 consumer.perfetchSize,消費者持有的未確認的最大消費數量
2 consumer.maximumPendingMessageLimit: 用來控制非持久化的topic在存在慢消費者的情況下,丟棄的數量,默認為0
3 consumer.noLocal: 默認false
4 consumer.dispatchAsync: 是否異步分發,默認true
5 consumer.retroactive: 是否為回溯消費者,默認false
6 consumer.selector: JMS的selector,默認null
7 consumer.exclusive: 是否為獨占消費者,默認false
8 consumer.priority:設置消費者的優先級,默認0
使用示例:
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync= false&consumer.perfetchSize=10"); Consumer consumer = session.createConsumer(queue);
6. 虛擬destination用來創建邏輯destination,客戶端可以通過它來生產和消費消息,它會把消息映射到物理destination.
ActiveMQ支持2種方式:
1:虛擬主題(Virtual Topics)
2:組合Destinations(Composite Destinations)
為什么使用虛擬主題?
ActiveMQ只有在持久訂閱才是持久化的。持久訂閱時,每一個持久訂閱者,都相當於一個queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:
第一:同一應用內消費者端護在均衡的問題。也就是說一個應用程序內的持久化消息,不能使用對個消費者共同承擔消息處理能力。因為每個消費者都會獲取所有消息。因為每一個消費者都會獲取所有信息。
Queue到時可以解決這個問題,但broker端又不能將消息發送到多個應用端,所以紀要發布訂閱,又要讓消費者分組,這個功能JMS本身是沒有的
第二:同一應用內消費者端failover問題,由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理,系統的健壯性不高。
如何使用虛擬topic?
第一:對於消息發布者來說,就是一個正常的topic,名稱以VirtualTopic.開始,比如VirtualTopic.Orders,代碼示例如下:
Topicdestination = session.createTopic("VirtualTopic.Orders");
第二:對於消息接收端來說,是個隊列,不同應用里使用不同的前綴作為隊列名稱,即可表明自己的身份即可實現消費端應用分組。
例如Consumer.A.VirtualTopic.Orders說明它是名稱為A的消費端,同理Consumer.B VirtualTopic.Orders說明是一名稱為B的消費端。可以在同一個應用中使用多個消費者消費這個隊列
又因為不同應用使用的topic名稱不一樣,前綴不同,所以不同應用中都可以接受到全部消息。每一個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
代碼示例:
Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");
生產者代碼:
package cn.qlq.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class MsgProducer { private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "VirtualTopic.Orders"; public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(topicName); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 10; i++) { TextMessage tms = session.createTextMessage("textMessage:" + i); producer.send(tms); System.out.println("send:" + tms.getText()); } connection.close(); } }
消費者代碼:
package cn.qlq.activemq; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消費消息 * * @author QiaoLiQiang * @time 2018年9月18日下午11:26:41 */ public class MsgConsumer { // 默認端口61616 private static final String url = "tcp://localhost:61616/"; private static final String queueName = "Consumer.A.VirtualTopic.Orders"; public static void main(String[] args) throws JMSException { // 1創建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2.由connectionFactory創建connection Connection connection = connectionFactory.createConnection(); Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames(); while (jmsxPropertyNames.hasMoreElements()) { String nextElement = (String) jmsxPropertyNames.nextElement(); System.out.println("JMSX name ===" + nextElement); } // 3.啟動connection connection.start(); // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式 Session session = connection.createSession(true, Session.SESSION_TRANSACTED); // 5.創建Destination(Queue繼承Queue) Queue destination = session.createQueue(queueName); // 6.創建消費者consumer MessageConsumer consumer = session.createConsumer(destination); int i = 0; while (i < 5) { TextMessage textMessage = (TextMessage) consumer.receive(); System.out.println("接收消息:" + textMessage.getText() + ";屬性" + textMessage.getStringProperty("str")); i++; if (i == 5) {// 確保消費完所有的消息再進行確認 textMessage.acknowledge(); } } // 提交事務,進行確認收到消息 session.commit(); session.close(); connection.close(); } }
其實把消費者隊列化了。
修改虛擬主題的前綴:
默認前綴是VirtualTopic.>
自定義消費虛擬地址默認格式:Consumer.*.VirtualTopic.>
修改配置:
<broker xmlns="http://activemq.apache.org/schema/core"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" /> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> </broker>