1. Messaage Properties
ActiveMQ支持很多消息屬性,具體可以參考
http://activemq.apache.org/activemq-message-properties.html
常見得一些屬性說明:
1. queue得消息默認是持久化得
2. 消息得優先級默認是4.
3. 消息發送時設置了時間戳。
4. 消息的過期時間默認是永不過期,過期的消息進入DLQ,可以配置DLQ及其處理策略。
5. 如果消息是重發的,將會被標記出來。
6. JMSReplyTo標識響應消息發送到哪個queue.
7. JMSCorelationID標識此消息相關聯的消息id,可以用這個標識把多個消息連接起來。
8. JMS同時也記錄了消息重發的次數。默認是6次
9. 如果有一組相關聯的消息需要處理,可以分組;只需要設置消息組的名字和這個消息的第幾個消息。
10. 如果消息中一個事務環境,則TXID將會被設置。
11. 此外ActiveMQ在服務器端額外設置了消息入隊和出隊的時間戳。
12. ActiveMQ里消息屬性的值,不僅可以用基本類型,還可以用List或Map類型
2. Advisory Message
Advisory Message是ActiveMQ自身的系統消息地址,可以監聽該地址來獲取activemq的系統消息。目前支持獲取如下信息:
1: consumers, producers和connections的啟動和停止
2. 創建和銷毀temporary destinations
3. toppics 和queues 的消息過期
4. brokers發送消息給destination,但是沒有consumers
5. connections啟動和停止
說明:
1. 所有advisory的topic,前綴是:ActiveMQ.Advisory
2. 所有Advisory的消息類型是:‘Advisory’,所有的Advisory都有的消息屬性有:originBrokerId,originBrokerName,originBrokerURL
3. 具體支持的topic和queue,請參照:
http://activemq.apache.org/advisory-message.html
Advisory功能默認是關閉的,打開Advisorie的具體實現如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" advisoryForConsumed="true"/> </policyEntries> </policyMap> </destinationPolicy>
開啟之后啟動ActiveMQ
查看控制台:
已經可以看到以ActiveMQ.Advisory為前綴的topic了,
監聽ActiveMQ.Advisory.Producer.Queue.my-queue,實現如下:
package com.wangx.activemq; import com.wangx.activemq.util.MQUtil; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import javax.jms.*; public class MessageReceiver { /** * topic名字 */ private static final String QUEUENAME = "ActiveMQ.Advisory.Producer.Queue.my-queue"; public void receive() throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //獲取Session //創建隊列 Topic queue = session.createTopic(QUEUENAME); //創建消費者 MessageConsumer messageConsumer = session.createConsumer(queue); //監聽生產者信息 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //將類型轉換成ActiveMQMessage ActiveMQMessage activeMQMessage = (ActiveMQMessage)message; try { //打印message System.out.println(activeMQMessage.getMessage()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } public static void main(String[] args) throws JMSException { MessageReceiver messageReceiver = new MessageReceiver(); messageReceiver.receive(); } }
當生產者發送消息時,將會打印如下信息:
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0:82, originalDestination = null, originalTransactionId = null, producerId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Queue.my-queue, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1541399457800, brokerOutTime = 1541399457800, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1098f4d, dataStructure = ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:DESKTOP-A6T5N2R-57597-1541399457549-1:1:1:1, destination = queue://my-queue, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}, redeliveryCounter = 0, size = 0, properties = {producerCount=1, originBrokerName=localhost, originBrokerURL=tcp://DESKTOP-A6T5N2R:61616, originBrokerId=ID:DESKTOP-A6T5N2R-57210-1541398368987-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0:83, originalDestination = null, originalTransactionId = null, producerId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Queue.my-queue, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1541399457890, brokerOutTime = 1541399457892, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1385d8ba, dataStructure = RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:DESKTOP-A6T5N2R-57597-1541399457549-1:1:1:1, lastDeliveredSequenceId = -2}, redeliveryCounter = 0, size = 0, properties = {producerCount=0, originBrokerName=localhost, originBrokerURL=tcp://DESKTOP-A6T5N2R:61616, originBrokerId=ID:DESKTOP-A6T5N2R-57210-1541398368987-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
總結Advisory的使用方式:
1. 要在配置文件里面開啟Advisories.
2. 消息發送端沒什么變化,不做多余改變或配置,
3. 消息接收端:
1)根據你要接收的消息類型,來設置不同的topic,當然也可以使用AdvisorySupport這個類來輔助創建,比如你想要得到消息生產者的信息,你可以:
Topic queue = session.createTopic(QUEUENAME);
Destination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
2)由於這個topic默認不是持久化的,所有要先看起接收端,然后再發送消息。
3) 接收消息的時候,接收到的消息類型是ActiveMQMessage,所以需要先轉換成ActiveMQMessage,然后再通過getDataStructure方法來得到具體的信息對象。
代碼如下:
//將類型轉換成ActiveMQMessage ActiveMQMessage activeMQMessage = (ActiveMQMessage)message; try { //打印message System.out.println(activeMQMessage.getDataStructure()); session.commit(); } catch (JMSException e) { e.printStackTrace(); }
這樣可以可以拿到相關信息,
控制台如下:
ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:DESKTOP-A6T5N2R-58264-1541401841935-1:1:1:1, destination = queue://my-queue, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0} RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:DESKTOP-A6T5N2R-58264-1541401841935-1:1:1:1, lastDeliveredSequenceId = -2}
2. 延遲和定時消息投遞
延遲和定時消息傳遞(Delay and schedule Message Delivery)
有時候我們不希望消息馬上被broker投遞出去,而是想要消息60s以后發送給消費者,或者是我們想要讓消息每隔一段時間投遞一次,一共投遞指定的次數。。。類似這種需求,ActiveMQ提供了一種broker端消息定時調度機制。
我們只需要把幾個描述消息定時調度的方式參數作為屬性添加到消息,broker端的調度器酒睡按照我們想要的行為去處理消息。當然需要再xml中配置schedulerSupport屬性為true,
一共四個屬性
AMQ_SCHEDULED_DELAY: 延遲投遞的時間
AMQ_SCHEDULED_PERIOD: 重復投遞的時間間隔
AMQ_SCHEDULED_REPEAT:重復投遞次數
AMQ_SCHEDULED_CRON: Cron表達式
ActiveMQ也提供了一個封裝的消息類型:org.apache.activemq.ScheduledMessage,可以使用這個類來輔助設置,使用例子如下:延遲60s
在broker上設置schedulerSupport="true",然后使用如下代碼設置:
TextMessage textMessage = session.createTextMessage("message" + i);
long time = 30 * 1000; textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
這樣,當生產者發送消息之后,消費者不會馬上收到消息,而是會等待30s之后才會開始接收消息
延遲10s,投遞3次,間隔5秒的例子
TextMessage textMessage = session.createTextMessage("message" + i);
long delay = 10 * 1000; long period= 5 * 1000; int repeat = 3; textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); messageProducer.send(textMessage);
使用CRON表達式,每個小時發送一次
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 * * * *");
CRON表達式的優先級高於另外三個參數,如果在設置了延時時間,也有repeat和period參數,則會在每次CRON執行的時候,重復投遞repeat次,每次間隔period,就是說設置的是疊加效果,例如每小時都會發生消息被投遞10次,延遲0秒開始,每次間隔1秒。