Activemq的公告消息
下面是來自官網的翻譯:http://activemq.apache.org/advisory-message.html
ActiveMQ消息屬性
消息屬性
屬性名 | 類型 | 默認值 | 描述 |
---|---|---|---|
JMSDestination | javax.jms.Destination | 生產者set進去 | 發送消息目的地 |
JMSReplyTo | javax.jms.Destination | null | 用戶定義 |
JMSType | String | “” | 用戶定義 |
JMSDeliveryMode | int | DeliveryMode.PERSISTENT | 消息是否要持久化 |
JMSPriority | int | 4 | 0-9 |
JMSMessageID | String | unique | 消息唯一標示符 |
JMSTimestamp | long | 消息發送時間 | 毫秒 |
JMSCorrelationID | String | null | 用戶定義 |
JMSExpiration | long | 0 | 0表示用不過氣,消息過期時間 |
JMSRedelivered | boolean | false | 重新傳遞 |
JMS定義屬性
屬性名 | 類型 | 默認值 | 描述 |
---|---|---|---|
JMSXDeliveryCount | int | 0 | 嘗試發送消息的次數 |
JMSXGroupID | String | null | 表示消息的分組 |
JMSXGroupSeq | int | 0 | 消息的序列號 |
JMSXProducerTXID | String | null | 事務標示 |
ActiveMQ定義屬性
屬性名 | 類型 | 默認值 | 描述 |
---|---|---|---|
JMSActiveMQBrokerInTime | long | 0 | 消息到達broker的時間 |
JMSActiveMQBrokerOutTime | long | 0 | 消息離開broker的時間 |
公告消息(Advisory Message)
ActiveMQ支持的公告消息(相當於監聽器)
-
消費者,生產者和connection開始和停止
-
臨時目的地的創建和銷毀
-
話題和隊列上的消息過期
-
broker發送消息到沒有消費者的目的地
-
connections開啟和停止
公告消息是一種管理JMS的渠道,我們可以接收到在broker上的生產者消費者目的地的變化。
當我們通過JMX查看broker時候,可以看到公告話題以ActiveMQ.Advisory為前綴,每個公告擁有類型為Advisory的消息和一些預定義的消息屬性
屬性名 | 類型 | 描述 |
---|---|---|
originBrokerId | StringProperty | 公告起源的broker的ID |
originBrokerName | StringProperty | 公告起源的broker的名稱 |
originBrokerURL | StringProperty | 公告起源的第一個broker url |
我們可以獲得到消息的一些元數據
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
public void onMessage(Message msg){
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}
支持的公告話題
客戶端的公告消息
我們可以用consumerCount頭信息獲取當前消費者的數量來得知當前的公告的消息是否已經發送。
Advisory Topics | Description | properties | Data Structure |
---|---|---|---|
ActiveMQ.Advisory.Connection | ActiveMQ.Advisory.Connection | ConnectionInfo, RemoveInfo | |
ActiveMQ.Advisory.Producer.Queue | Producer start & stop messages on a Queue | String=’producerCount’ - the number of producers | ProducerInfo |
ActiveMQ.Advisory.Producer.Topic | Producer start & stop messages on a Topic | String=’producerCount’ - the number of producers | ProducerInfo |
ActiveMQ.Advisory.Consumer.Queue | Consumer start & stop messages on a Queue | String=’consumerCount’ - the number of Consumers | ConsumerInfo, RemoveInfo |
ActiveMQ.Advisory.Consumer.Topic | Consumer start & stop messages on a Topic | String=’consumerCount’ - the number of Consumers | ConsumerInfo, RemoveInfo |
目的地和消息的公告
Advisory Topics | 描述 | 屬性 | 數據結構 | 默認值 | 策略屬性 |
---|---|---|---|---|---|
ActiveMQ.Advisory.Queue | Queue create & destroy | null | DestinationInfo | true | none |
ActiveMQ.Advisory.Topic | Topic create & destroy | null | DestinationInfo | true | none |
ActiveMQ.Advisory.TempQueue | Temporary Queue create & destroy | null | DestinationInfo | true | none |
ActiveMQ.Advisory.TempTopic | Temporary Topic create & destroy | null | DestinationInfo | true | none |
ActiveMQ.Advisory.Expired.Queue | Expired messages on a Queue | String=’orignalMessageId’ - the expired id | Message | true | none |
ActiveMQ.Advisory.Expired.Topic | Expired messages on a Topic | String=’orignalMessageId’ - the expired id | Message | true | none |
ActiveMQ.Advisory.NoConsumer.Queue | No consumer is available to process messages being sent on a Queue | null | Message | false | sendAdvisoryIfNoConsumers |
ActiveMQ.Advisory.NoConsumer.Topic | No consumer is available to process messages being sent on a Topic | null | Message | false | sendAdvisoryIfNoConsumers |
ActiveMQ5.2新的公告消息
Advisory Topics | 描述 | 屬性 | 數據結構 | 默認值 | 策略屬性 |
---|---|---|---|---|---|
ActiveMQ.Advisory.SlowConsumer.Queue | Slow Queue Consumer | String=’consumerId’ - the consumer id | ConsumerInfo | false | advisoryForSlowConsumers |
ActiveMQ.Advisory.SlowConsumer.Topic | Slow Topic Consumer | String=’consumerId’ - the consumer id | ConsumerInfo | false | advisoryForSlowConsumers |
ActiveMQ.Advisory.FastProducer.Queue | Fast Queue producer | String=’producerId’ - the producer id | ProducerInfo | false | advisdoryForFastProducers |
ActiveMQ.Advisory. FastProducer.Topic | Fast Topic producer | String=’consumerId’ - the producer id | ProducerInfo | false | advisdoryForFastProducers |
ActiveMQ.Advisory.MessageDiscarded.Queue | Message discarded | String=’orignalMessageId’ - the discarded id | Message | false | advisoryForDiscardingMessages |
ActiveMQ.Advisory.MessageDiscarded.Topic | Message discarded | String=’orignalMessageId’ - the discarded id | Message | false | advisoryForDiscardingMessages |
ActiveMQ.Advisory.MessageDelivered.Queue | Message delivered to the broker | String=’orignalMessageId’ - the delivered id | Message | false | advisoryForDelivery |
ActiveMQ.Advisory.MessageDelivered.Topic | Message delivered to the broker | String=’orignalMessageId’ - the delivered id | Message | false | advisoryForDelivery |
ActiveMQ.Advisory.MessageConsumed.Queue | Message consumed by a client | String=’orignalMessageId’ - the delivered id | Message | false | advisoryForConsumed |
ActiveMQ.Advisory.MessageConsumed.Topic | Message consumed by a client | String=’orignalMessageId’ - the delivered id | Message | false | advisoryForConsumed |
ActiveMQ.Advisory.FULL | A Usage resource is at its limit | String=’usageName’ - the name of Usage resource | null | false | advisoryWhenFull |
ActiveMQ.Advisory.MasterBroker | A broker is now the master in a master/slave configuration | null | null | true | none |
5.4后新公告
Advisory Topics | 描述 | 屬性 | 數據結構 | 默認值 | 策略屬性 |
---|---|---|---|---|---|
ActiveMQ.Advisory.MessageDLQd.Queue | Message sent to DLQ | String=’orignalMessageId’ - the delivered id | Message | Always on | advisoryForConsumed |
ActiveMQ.Advisory.MessageDLQd.Topic | Message sent to DLQ | String=’orignalMessageId’ - the delivered id | Message | Always on | advisoryForConsumed |
Network bridge advisories
Advisory Topics | 描述 | 屬性 | 數據結構 | 默認值 |
---|---|---|---|---|
ActiveMQ.Advisory.NetworkBridge | Network bridge being stopped or started | Boolean=”started” - true if bridge is started, false if it is stopped Boolean=”createdByDuplex” - true if the bridge is created by remote network connector | BrokerInfo - provides data of the remote broker | Always on |
開啟公告消息
公告消息默認是不開啟的,我們可以使用下面的配置開啟公告配置
<destinationPolicy>
<policyMap><policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" />
</policyEntries></policyMap>
</destinationPolicy>
關閉公告消息
xml配置如下
<broker advisorySupport="false">
java code 配置
BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
broker.start();
url鏈接串
"tcp://localhost:61616?jms.watchTopicAdvisories=false"
設置ConnectionFactory的屬性
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setWatchTopicAdvisories(false);
ActiveMQ一些靜態方法
AdvisorySupport.getConsumerAdvisoryTopic()
AdvisorySupport.getProducerAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getDestinationAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
//Version 5.2 onwards
AdvisorySupport.getSlowConsumerAdvisoryTopic()
AdvisorySupport.getFastProducerAdvisoryTopic()
AdvisorySupport.getMessageDiscardedAdvisoryTopic()
AdvisorySupport.getMessageDeliveredAdvisoryTopic()
AdvisorySupport.getMessageConsumedAdvisoryTopic()
AdvisorySupport.getMasterBrokerAdvisoryTopic()
AdvisorySupport.getFullAdvisoryTopic()
具體可以看一下AdvisorySupport這個類就懂了
應用場景:
本來工作中用websocket當聊天的,結果經過F5后要做了一層代理,消息就不通了,於是就用的mq,需要捕獲一個隊列的消費者數量的變化,就用了Activemq的公告消息
@Test
public void test1(){
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 創建 advisory topic : ActiveMQ.Advisory.Consumer.Topic.chat1,用於監控topic消費者的狀態變化
ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(new ActiveMQTopic("topic01"));
MessageConsumer consumer = session.createConsumer(advisoryTopic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof ActiveMQMessage) {
try {
ActiveMQMessage aMsg = (ActiveMQMessage) message;
System.out.println(aMsg.getStringProperty("consumerCount"));
System.out.println(aMsg.getStringProperty("producerCount"));
if (aMsg.getDataStructure() instanceof ConsumerInfo) {
// Consumer start
ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure();
System.out.println(consumerInfo);
} else if (aMsg.getDataStructure() instanceof RemoveInfo) {
// Consumer stop
RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
System.out.println(removeInfo);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
Thread.sleep(2000000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(null != connection){
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
測試:先上線后下線
1
null
ConsumerInfo {commandId = 6, responseRequired = true, consumerId = ID:WGR-PC-6298-1602771436318-1:1:2:1, destination = topic://topic01, prefetchSize = 32767, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, clientId = wgr, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}
0
null
RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:WGR-PC-6298-1602771436318-1:1:2:1, lastDeliveredSequenceId = -2}
其實除了JMX,咨詢消息,還有統計插件,沒有深入研究,官網說明如下:
http://activemq.apache.org/statisticsplugin.html
示例:https://segmentfault.com/a/1190000004522121?utm_source=debugrun&utm_medium=referral
從ActiveMQ 5.3開始,其中包括一個統計信息插件,可用於從代理或其目的地檢索統計信息。請注意,該消息必須包含replyTo
標頭(jmsReplyTo
如果使用的是JMS,則為標頭),否則該消息將被忽略。該replyTo
標題必須包含從中要獲取統計信息(一個或多個)目標的名稱。統計消息中MapMessage
填充了目標(即,代理或目標)的統計信息。
要檢索代理的統計信息,只需向標ActiveMQ.Statistics.Broker
有replyTo
標題的目的地發送一條空消息。要檢索目的地統計,只需發送一個空的消息載明的目的地ActiveMQ.Statistics.Destination.<destination-name>
或ActiveMQ.Statistics.Destination.<wildcard-expression>
連同replyTo
頭。如果許多目的地與給定的通配符表達式匹配,則將向每個目的地發送一條狀態消息replyTo
。
要將ActiveMQ配置為使用統計信息插件,只需將以下內容添加到ActiveMQ XML配置中:
<broker ...>
<plugins>
<statisticsBrokerPlugin/>
</plugins>
</broker>
統計信息插件查找發送到特定目的地的消息。以下是使用統計信息插件從代理獲取統計信息的示例:
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
String queueName = "ActiveMQ.Statistics.Broker";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.out.println(name + "=" + reply.getObject(name));
}
上面代碼的輸出如下所示:
vm=vm://localhost
memoryUsage=0
storeUsage=3330
tempPercentUsage=0
ssl=
openwire=tcp://localhost:50059
brokerId=ID:bigmac-50057-1253605065511-0:0
consumerCount=2
brokerName=localhost
expiredCount=0
dispatchCount=1
maxEnqueueTime=5.0
storePercentUsage=0
dequeueCount=0
inflightCount=1
messagesCached=0
tempLimit=107374182400
averageEnqueueTime=5.0
stomp+ssl=
memoryPercentUsage=0
size=10
tempUsage=0
producerCount=1
minEnqueueTime=5.0
dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data
enqueueCount=10
stomp=
storeLimit=107374182400
memoryLimit=67108864
同樣,要查詢目標的統計信息,只需將消息發送到以開頭的目標名稱即可ActiveMQ.Statistics.Destination
。例如,要檢索名稱為TEST.FOO的隊列的統計信息,請將空消息發送到名為的隊列ActiveMQ.Statistics.Destination.TEST.FOO
。下面是一個示例:
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(null);
String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName()
Queue query = session.createQueue(queueName);
Message msg = session.createMessage();
producer.send(testQueue, msg)
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name + "=" + reply.getObject(name));
}
上面代碼的輸出如下所示:
memoryUsage=0
dequeueCount=0
inflightCount=0
messagesCached=0
averageEnqueueTime=0.0
destinationName=queue://TEST.FOO
size=1
memoryPercentUsage=0
producerCount=0
consumerCount=0
minEnqueueTime=0.0
maxEnqueueTime=0.0
dispatchCount=0
expiredCount=0
enqueueCount=1
memoryLimit=67108864
您也可以在隊列名稱中使用通配符。對於通配符匹配的每個目的地,這將導致單獨的統計信息。確實非常方便。
訂閱統計
從5.6.0開始,您還可以檢索所有隊列和主題訂閱的統計信息。您所需要做的只是將空消息發送到ActiveMQ.Statistics. Subscription
帶有replyTo
標題的目的地。響應將以一個或多個消息的形式出現,每個消息都包含關於Broker上恰好一個訂閱的統計信息。
以下是使用統計信息插件從代理獲取統計信息的示例:
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
String queueName = "ActiveMQ.Statistics.Subscription";
Queue testQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(testQueue);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.out.println(name + "=" + reply.getObject(name));
}
上面的代碼示例輸出如下所示:
selector=null
dispatchedQueueSize=1
maximumPendingMessageLimit=0
exclusive=false
connectionId=ID:dejan-bosanacs-macbook-pro-2.local-64989-1335528942875-4:1
destinationName=Test.Queue
clientId=ID:dejan-bosanacs-macbook-pro-2.local-64989-1335528942875-3:1
slowConsumer=false
prefetchSize=1000
sessionId=1
dequeueCounter=0
enqueueCounter=1
retroactive=false
dispatchedCounter=1