Activemq獲取消息信息


Activemq的公告消息

下面是來自官網的翻譯:http://activemq.apache.org/advisory-message.html

ActiveMQ消息屬性

消息屬性

屬性名 類型 默認值 描述
JMSDestination javax.jms.Destination 生產者set進去 發送消息目的地
JMSReplyTo javax.jms.Destination null 用戶定義
JMSType String “” 用戶定義
JMSDeliveryMode int DeliveryMode.PERSISTENT 消息是否要持久化
JMSPriority int 4 0-9
JMSMessageID String unique 消息唯一標示符
JMSTimestamp long 消息發送時間 毫秒
JMSCorrelationID String null 用戶定義
JMSExpiration long 0 0表示用不過氣,消息過期時間
JMSRedelivered boolean false 重新傳遞

JMS定義屬性

屬性名 類型 默認值 描述
JMSXDeliveryCount int 0 嘗試發送消息的次數
JMSXGroupID String null 表示消息的分組
JMSXGroupSeq int 0 消息的序列號
JMSXProducerTXID String null 事務標示

ActiveMQ定義屬性

屬性名 類型 默認值 描述
JMSActiveMQBrokerInTime long 0 消息到達broker的時間
JMSActiveMQBrokerOutTime long 0 消息離開broker的時間

公告消息(Advisory Message)

ActiveMQ支持的公告消息(相當於監聽器)

  • 消費者,生產者和connection開始和停止

  • 臨時目的地的創建和銷毀

  • 話題和隊列上的消息過期

  • broker發送消息到沒有消費者的目的地

  • connections開啟和停止

    公告消息是一種管理JMS的渠道,我們可以接收到在broker上的生產者消費者目的地的變化。
    當我們通過JMX查看broker時候,可以看到公告話題以ActiveMQ.Advisory為前綴,每個公告擁有類型為Advisory的消息和一些預定義的消息屬性

屬性名 類型 描述
originBrokerId StringProperty 公告起源的broker的ID
originBrokerName StringProperty 公告起源的broker的名稱
originBrokerURL StringProperty 公告起源的第一個broker url

我們可以獲得到消息的一些元數據

 Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
    MessageConsumer consumer = session.createConsumer(advisoryDestination);
    consumer.setMessageListener(this);
public void onMessage(Message msg){
    if (msg instanceof ActiveMQMessage){
        try {
             ActiveMQMessage aMsg =  (ActiveMQMessage)msg;
             ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
        } catch (JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}

支持的公告話題

客戶端的公告消息
我們可以用consumerCount頭信息獲取當前消費者的數量來得知當前的公告的消息是否已經發送。

Advisory Topics Description properties Data Structure
ActiveMQ.Advisory.Connection ActiveMQ.Advisory.Connection ConnectionInfo, RemoveInfo
ActiveMQ.Advisory.Producer.Queue Producer start & stop messages on a Queue String=’producerCount’ - the number of producers ProducerInfo
ActiveMQ.Advisory.Producer.Topic Producer start & stop messages on a Topic String=’producerCount’ - the number of producers ProducerInfo
ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a Queue String=’consumerCount’ - the number of Consumers ConsumerInfo, RemoveInfo
ActiveMQ.Advisory.Consumer.Topic Consumer start & stop messages on a Topic String=’consumerCount’ - the number of Consumers ConsumerInfo, RemoveInfo

目的地和消息的公告

Advisory Topics 描述 屬性 數據結構 默認值 策略屬性
ActiveMQ.Advisory.Queue Queue create & destroy null DestinationInfo true none
ActiveMQ.Advisory.Topic Topic create & destroy null DestinationInfo true none
ActiveMQ.Advisory.TempQueue Temporary Queue create & destroy null DestinationInfo true none
ActiveMQ.Advisory.TempTopic Temporary Topic create & destroy null DestinationInfo true none
ActiveMQ.Advisory.Expired.Queue Expired messages on a Queue String=’orignalMessageId’ - the expired id Message true none
ActiveMQ.Advisory.Expired.Topic Expired messages on a Topic String=’orignalMessageId’ - the expired id Message true none
ActiveMQ.Advisory.NoConsumer.Queue No consumer is available to process messages being sent on a Queue null Message false sendAdvisoryIfNoConsumers
ActiveMQ.Advisory.NoConsumer.Topic No consumer is available to process messages being sent on a Topic null Message false sendAdvisoryIfNoConsumers

ActiveMQ5.2新的公告消息

Advisory Topics 描述 屬性 數據結構 默認值 策略屬性
ActiveMQ.Advisory.SlowConsumer.Queue Slow Queue Consumer String=’consumerId’ - the consumer id ConsumerInfo false advisoryForSlowConsumers
ActiveMQ.Advisory.SlowConsumer.Topic Slow Topic Consumer String=’consumerId’ - the consumer id ConsumerInfo false advisoryForSlowConsumers
ActiveMQ.Advisory.FastProducer.Queue Fast Queue producer String=’producerId’ - the producer id ProducerInfo false advisdoryForFastProducers
ActiveMQ.Advisory. FastProducer.Topic Fast Topic producer String=’consumerId’ - the producer id ProducerInfo false advisdoryForFastProducers
ActiveMQ.Advisory.MessageDiscarded.Queue Message discarded String=’orignalMessageId’ - the discarded id Message false advisoryForDiscardingMessages
ActiveMQ.Advisory.MessageDiscarded.Topic Message discarded String=’orignalMessageId’ - the discarded id Message false advisoryForDiscardingMessages
ActiveMQ.Advisory.MessageDelivered.Queue Message delivered to the broker String=’orignalMessageId’ - the delivered id Message false advisoryForDelivery
ActiveMQ.Advisory.MessageDelivered.Topic Message delivered to the broker String=’orignalMessageId’ - the delivered id Message false advisoryForDelivery
ActiveMQ.Advisory.MessageConsumed.Queue Message consumed by a client String=’orignalMessageId’ - the delivered id Message false advisoryForConsumed
ActiveMQ.Advisory.MessageConsumed.Topic Message consumed by a client String=’orignalMessageId’ - the delivered id Message false advisoryForConsumed
ActiveMQ.Advisory.FULL A Usage resource is at its limit String=’usageName’ - the name of Usage resource null false advisoryWhenFull
ActiveMQ.Advisory.MasterBroker A broker is now the master in a master/slave configuration null null true none

5.4后新公告

Advisory Topics 描述 屬性 數據結構 默認值 策略屬性
ActiveMQ.Advisory.MessageDLQd.Queue Message sent to DLQ String=’orignalMessageId’ - the delivered id Message Always on advisoryForConsumed
ActiveMQ.Advisory.MessageDLQd.Topic Message sent to DLQ String=’orignalMessageId’ - the delivered id Message Always on advisoryForConsumed

Network bridge advisories

Advisory Topics 描述 屬性 數據結構 默認值
ActiveMQ.Advisory.NetworkBridge Network bridge being stopped or started Boolean=”started” - true if bridge is started, false if it is stopped Boolean=”createdByDuplex” - true if the bridge is created by remote network connector BrokerInfo - provides data of the remote broker Always on

開啟公告消息

公告消息默認是不開啟的,我們可以使用下面的配置開啟公告配置

<destinationPolicy>
   <policyMap><policyEntries> 
      <policyEntry topic=">" advisoryForConsumed="true" />
   </policyEntries></policyMap>
</destinationPolicy>

關閉公告消息

xml配置如下

<broker advisorySupport="false">

java code 配置

BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
broker.start();

url鏈接串

"tcp://localhost:61616?jms.watchTopicAdvisories=false"

設置ConnectionFactory的屬性

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setWatchTopicAdvisories(false);

ActiveMQ一些靜態方法

AdvisorySupport.getConsumerAdvisoryTopic()
AdvisorySupport.getProducerAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getDestinationAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()

//Version 5.2 onwards


AdvisorySupport.getSlowConsumerAdvisoryTopic()
AdvisorySupport.getFastProducerAdvisoryTopic()
AdvisorySupport.getMessageDiscardedAdvisoryTopic()
AdvisorySupport.getMessageDeliveredAdvisoryTopic()
AdvisorySupport.getMessageConsumedAdvisoryTopic()
AdvisorySupport.getMasterBrokerAdvisoryTopic()
AdvisorySupport.getFullAdvisoryTopic()

具體可以看一下AdvisorySupport這個類就懂了

image-20201015223216280

應用場景:

本來工作中用websocket當聊天的,結果經過F5后要做了一層代理,消息就不通了,於是就用的mq,需要捕獲一個隊列的消費者數量的變化,就用了Activemq的公告消息

    @Test
    public void test1(){
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 創建 advisory topic : ActiveMQ.Advisory.Consumer.Topic.chat1,用於監控topic消費者的狀態變化
            ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQTopic("topic01"));
            MessageConsumer consumer = session.createConsumer(advisoryTopic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message instanceof ActiveMQMessage) {
                        try {
                            ActiveMQMessage aMsg = (ActiveMQMessage) message;
                            System.out.println(aMsg.getStringProperty("consumerCount"));
                            System.out.println(aMsg.getStringProperty("producerCount"));
                            if (aMsg.getDataStructure() instanceof ConsumerInfo) {
                                // Consumer start
                                ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure();
                                System.out.println(consumerInfo);
                            } else if (aMsg.getDataStructure() instanceof RemoveInfo) {
                                // Consumer stop
                                RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
                                System.out.println(removeInfo);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            Thread.sleep(2000000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(null != connection){
                    connection.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

測試:先上線后下線

1
null
ConsumerInfo {commandId = 6, responseRequired = true, consumerId = ID:WGR-PC-6298-1602771436318-1:1:2:1, destination = topic://topic01, prefetchSize = 32767, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = wgr, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}


0
null
RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:WGR-PC-6298-1602771436318-1:1:2:1, lastDeliveredSequenceId = -2}

其實除了JMX,咨詢消息,還有統計插件,沒有深入研究,官網說明如下:

http://activemq.apache.org/statisticsplugin.html

示例:https://segmentfault.com/a/1190000004522121?utm_source=debugrun&utm_medium=referral

從ActiveMQ 5.3開始,其中包括一個統計信息插件,可用於從代理或其目的地檢索統計信息。請注意,該消息必須包含replyTo標頭(jmsReplyTo如果使用的是JMS,則為標頭),否則該消息將被忽略。該replyTo標題必須包含從中要獲取統計信息(一個或多個)目標的名稱。統計消息中MapMessage填充了目標(即,代理或目標)的統計信息。

要檢索代理的統計信息,只需向標ActiveMQ.Statistics.BrokerreplyTo標題的目的地發送一條空消息。要檢索目的地統計,只需發送一個空的消息載明的目的地ActiveMQ.Statistics.Destination.<destination-name>ActiveMQ.Statistics.Destination.<wildcard-expression>連同replyTo頭。如果許多目的地與給定的通配符表達式匹配,則將向每個目的地發送一條狀態消息replyTo

要將ActiveMQ配置為使用統計信息插件,只需將以下內容添加到ActiveMQ XML配置中:

<broker ...>
  <plugins>
    <statisticsBrokerPlugin/>
  </plugins>
</broker>

統計信息插件查找發送到特定目的地的消息。以下是使用統計信息插件從代理獲取統計信息的示例:

Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);

String queueName = "ActiveMQ.Statistics.Broker";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);

MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);

for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
  String name = e.nextElement().toString();
  System.out.println(name + "=" + reply.getObject(name));
}

上面代碼的輸出如下所示:

vm=vm://localhost  
memoryUsage=0  
storeUsage=3330  
tempPercentUsage=0  
ssl=  
openwire=tcp://localhost:50059  
brokerId=ID:bigmac-50057-1253605065511-0:0  
consumerCount=2  
brokerName=localhost  
expiredCount=0  
dispatchCount=1  
maxEnqueueTime=5.0  
storePercentUsage=0  
dequeueCount=0  
inflightCount=1  
messagesCached=0  
tempLimit=107374182400  
averageEnqueueTime=5.0  
stomp+ssl=  
memoryPercentUsage=0  
size=10  
tempUsage=0  
producerCount=1  
minEnqueueTime=5.0  
dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data  
enqueueCount=10  
stomp=  
storeLimit=107374182400  
memoryLimit=67108864

同樣,要查詢目標的統計信息,只需將消息發送到以開頭的目標名稱即可ActiveMQ.Statistics.Destination。例如,要檢索名稱為TEST.FOO的隊列的統計信息,請將空消息發送到名為的隊列ActiveMQ.Statistics.Destination.TEST.FOO。下面是一個示例:

Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);

Queue testQueue = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(null);

String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName()
Queue query = session.createQueue(queueName);

Message msg = session.createMessage();

producer.send(testQueue, msg) 
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
        
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
    String name = e.nextElement().toString();
    System.err.println(name + "=" + reply.getObject(name));
}

上面代碼的輸出如下所示:

memoryUsage=0  
dequeueCount=0  
inflightCount=0  
messagesCached=0  
averageEnqueueTime=0.0  
destinationName=queue://TEST.FOO  
size=1  
memoryPercentUsage=0  
producerCount=0  
consumerCount=0  
minEnqueueTime=0.0  
maxEnqueueTime=0.0  
dispatchCount=0  
expiredCount=0  
enqueueCount=1  
memoryLimit=67108864

您也可以在隊列名稱中使用通配符。對於通配符匹配的每個目的地,這將導致單獨的統計信息。確實非常方便。

訂閱統計

從5.6.0開始,您還可以檢索所有隊列和主題訂閱的統計信息。您所需要做的只是將空消息發送到ActiveMQ.Statistics. Subscription帶有replyTo標題的目的地。響應將以一個或多個消息的形式出現,每個消息都包含關於Broker上恰好一個訂閱的統計信息。

以下是使用統計信息插件從代理獲取統計信息的示例:

Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);

String queueName = "ActiveMQ.Statistics.Subscription";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);

MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);

for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
  String name = e.nextElement().toString();
  System.out.println(name + "=" + reply.getObject(name));
}

上面的代碼示例輸出如下所示:

selector=null  
dispatchedQueueSize=1  
maximumPendingMessageLimit=0  
exclusive=false  
connectionId=ID:dejan-bosanacs-macbook-pro-2.local-64989-1335528942875-4:1  
destinationName=Test.Queue  
clientId=ID:dejan-bosanacs-macbook-pro-2.local-64989-1335528942875-3:1  
slowConsumer=false  
prefetchSize=1000  
sessionId=1  
dequeueCounter=0  
enqueueCounter=1  
retroactive=false  
dispatchedCounter=1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM