ActiveMQ的Destination高級特性


1.    Composite Destinations  組合目的地

組合隊列Composite Destinations : 允許用一個虛擬的destination代表多個destinations,這樣就可以通過composite destinations在一個操作中同時向多個queue/topic發送消息。
  有兩種實現方式:
    第一種:在客戶端編碼實現
    第二種:在activemq.xml配置文件中實現

  • 第一種:在客戶端編碼實現

    在composite destinations中,多個destination之間采用","分隔。如下:這里有2個destination  "my-queue3"和"topic://topic-1",這個代表主題模式的topic-1

private static final String queueName = "my-queue3,topic://topic-1";

 

  默認是queue模式。

 

package cn.qlq.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生產消息
 */
public class MsgProducer {

    // 默認端口61616
    private static final String url = "tcp://localhost:61616/";
    private static final String queueName = "my-queue3,topic://topic-1";
    private static Session session = null;

    public static void main(String[] args) throws JMSException {
        // 1創建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory創建connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動connection
        connection.start();
        // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 5.創建Destination(Queue繼承Queue)
        Queue destination = session.createQueue(queueName);

        TemporaryQueue temporaryQueue = session.createTemporaryQueue();

        // 6.創建生產者producer
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 5; i++) {
            // 7.創建Message,有好多類型,這里用最簡單的TextMessage
            TextMessage tms = session.createTextMessage("textMessage:" + i);

            // 設置附加屬性
            tms.setStringProperty("str", "stringProperties" + i);
            tms.setJMSPriority(6);
            tms.setJMSReplyTo(temporaryQueue);

            // 8.生產者發送消息
            producer.send(tms);
        }

        // 9.提交事務
        session.commit();

        // 10.關閉connection
        session.close();
        connection.close();
    }

}

 

  結果會創建5條  my-queue3  隊列消息 與 5條 主題模式   topic-1   消息。

  消費者正常消費即可,與隊列模型的消息和主題模式的消息消費一樣。

  • 第二種:在activemq.xml配置文件中實現
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
    
        <destinationInterceptors>
           <virtualDestinationInterceptor>
              <virtualDestinations>
                     <compositeQueue name="comQueue">
                            <forwardTo>
                                   <queue physicalName="queue88" />
                                   <topic physicalName="topic88" />
                            </forwardTo>
                     </compositeQueue>
              </virtualDestinations>
           </virtualDestinationInterceptor>
        </destinationInterceptors>
    。。。

 

程序中向組合隊列發送消息:

package cn.qlq.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生產消息
 */
public class MsgProducer {

    // 默認端口61616
    private static final String url = "tcp://localhost:61616/";
    private static final String queueName = "comQueue";
    private static Session session = null;

    public static void main(String[] args) throws JMSException {
        // 1創建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory創建connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動connection
        connection.start();
        // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 5.創建Destination(Queue繼承Queue)
        Queue destination = session.createQueue(queueName);

        TemporaryQueue temporaryQueue = session.createTemporaryQueue();

        // 6.創建生產者producer
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 5; i++) {
            // 7.創建Message,有好多類型,這里用最簡單的TextMessage
            TextMessage tms = session.createTextMessage("textMessage:" + i);

            // 設置附加屬性
            tms.setStringProperty("str", "stringProperties" + i);
            tms.setJMSPriority(6);
            tms.setJMSReplyTo(temporaryQueue);

            // 8.生產者發送消息
            producer.send(tms);
        }

        // 9.提交事務
        session.commit();

        // 10.關閉connection
        session.close();
        connection.close();
    }

}

結果:

  queue88產生五條消息:

 

   topic88生產五條消息:

 

 

 2 .Configure Startup Destinations--啟動創建隊列和主題,只是沒有消息

  在啟動ActiveMQ的時候如果需要創建Destination的話,可以在activemq.xml中配置:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}">
        <destinations>
              <queue physicalName="autoqueue" />
              <topic physicalName="autotopic" />
       </destinations>
    ...

 

 3.Delete Inactive Destinations---刪除沒有消息的隊列或主題

  在ActiveMQ的queue在不使用之后,可以通過web控制台或者JMX方式來刪除掉,當然,也可以通過配置,使得broker可以自動探測到無用的隊列並刪除掉,回收響應資源。

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
        <destinationPolicy>
              <policyMap>
                <policyEntries>
                     <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
                </policyEntries>
              </policyMap>
       </destinationPolicy>
    。。。

 說明:
  schedulePeriodForDestinationPurge: 設置多長時間檢查一次,這里是1秒。
  inactiveTimoutBeforeGC: 設置當Destination為空后,多長時間被刪除,這里是30秒。
  gcInactiveDestinations:設置刪除掉不活動的隊列,默認為false

 4.wildcars(通配符)

  Wildcars用來支持名字分層體系,它不是JMS規范的一部分,是ActiveMQ的擴展。

  ActiveMQ支持以下三種wildcars:
    .     用於作為路徑上名字間的分隔符
    *    用於匹配路徑上的任何名字
    >   用於遞歸地匹配任何以這個名字開始的destination

 

5.  Destination 選項

這個是給消費者在JMS規范之外添加的功能特性,通過在隊列名稱后面使用類似url的語法添加多個選項。包括:
1 consumer.perfetchSize,消費者持有的未確認的最大消費數量
2 consumer.maximumPendingMessageLimit: 用來控制非持久化的topic在存在慢消費者的情況下,丟棄的數量,默認為0
3 consumer.noLocal: 默認false
4 consumer.dispatchAsync: 是否異步分發,默認true
5 consumer.retroactive: 是否為回溯消費者,默認false
6 consumer.selector: JMS的selector,默認null
7 consumer.exclusive: 是否為獨占消費者,默認false
8 consumer.priority:設置消費者的優先級,默認0

 使用示例:

Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=
false&consumer.perfetchSize=10");
Consumer consumer = session.createConsumer(queue);

 

6.    虛擬destination用來創建邏輯destination,客戶端可以通過它來生產和消費消息,它會把消息映射到物理destination.

ActiveMQ支持2種方式:
  1:虛擬主題(Virtual Topics)
  2:組合Destinations(Composite Destinations)

為什么使用虛擬主題?
  ActiveMQ只有在持久訂閱才是持久化的。持久訂閱時,每一個持久訂閱者,都相當於一個queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:
第一:同一應用內消費者端護在均衡的問題。也就是說一個應用程序內的持久化消息,不能使用對個消費者共同承擔消息處理能力。因為每個消費者都會獲取所有消息。因為每一個消費者都會獲取所有信息。
Queue到時可以解決這個問題,但broker端又不能將消息發送到多個應用端,所以紀要發布訂閱,又要讓消費者分組,這個功能JMS本身是沒有的
第二:同一應用內消費者端failover問題,由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理,系統的健壯性不高。

如何使用虛擬topic?
第一:對於消息發布者來說,就是一個正常的topic,名稱以VirtualTopic.開始,比如VirtualTopic.Orders,代碼示例如下:

Topicdestination = session.createTopic("VirtualTopic.Orders");

第二:對於消息接收端來說,是個隊列,不同應用里使用不同的前綴作為隊列名稱,即可表明自己的身份即可實現消費端應用分組。
例如Consumer.A.VirtualTopic.Orders說明它是名稱為A的消費端,同理Consumer.B VirtualTopic.Orders說明是一名稱為B的消費端。可以在同一個應用中使用多個消費者消費這個隊列
又因為不同應用使用的topic名稱不一樣,前綴不同,所以不同應用中都可以接受到全部消息。每一個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。

代碼示例:

Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");

 

生產者代碼:

package cn.qlq.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgProducer {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "VirtualTopic.Orders";

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 10; i++) {

            TextMessage tms = session.createTextMessage("textMessage:" + i);

            producer.send(tms);

            System.out.println("send:" + tms.getText());
        }
        connection.close();
    }

}

 

消費者代碼:

package cn.qlq.activemq;

import java.util.Enumeration;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消費消息
 * 
 * @author QiaoLiQiang
 * @time 2018年9月18日下午11:26:41
 */
public class MsgConsumer {

    // 默認端口61616
    private static final String url = "tcp://localhost:61616/";
    private static final String queueName = "Consumer.A.VirtualTopic.Orders";

    public static void main(String[] args) throws JMSException {
        // 1創建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory創建connection
        Connection connection = connectionFactory.createConnection();
        Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();
        while (jmsxPropertyNames.hasMoreElements()) {
            String nextElement = (String) jmsxPropertyNames.nextElement();
            System.out.println("JMSX name ===" + nextElement);
        }
        // 3.啟動connection
        connection.start();
        // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        // 5.創建Destination(Queue繼承Queue)
        Queue destination = session.createQueue(queueName);
        // 6.創建消費者consumer
        MessageConsumer consumer = session.createConsumer(destination);

        int i = 0;
        while (i < 5) {
            TextMessage textMessage = (TextMessage) consumer.receive();
            System.out.println("接收消息:" + textMessage.getText() + ";屬性" + textMessage.getStringProperty("str"));
            i++;

            if (i == 5) {// 確保消費完所有的消息再進行確認
                textMessage.acknowledge();
            }
        }

        // 提交事務,進行確認收到消息
        session.commit();

        session.close();
        connection.close();
    }
}

 

其實把消費者隊列化了。

修改虛擬主題的前綴:

默認前綴是VirtualTopic.>

自定義消費虛擬地址默認格式:Consumer.*.VirtualTopic.>

修改配置:

<broker xmlns="http://activemq.apache.org/schema/core">
       <destinationInterceptors>
              <virtualDestinationInterceptor>
                     <virtualDestinations>
                            <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false" />
                     </virtualDestinations>
              </virtualDestinationInterceptor>
       </destinationInterceptors>
</broker>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM