http://activemq.apache.org/consumer-priority.html consumer 優先級
http://activemq.apache.org/activemq-message-properties.html 消息優先級
1、設置 consumer 的優先級:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); consumer = session.createConsumer(queue);
priority 的取值從0到127。broker 按照 consumer 的優先級給 queue 的 consumers 排序,首先把消息分發給優先級最高的 consumer。一旦該 consumer 的 prefetch buffer 滿了,broker 就把消息分發給優先級次高的,prefetch buffer 不滿的 consumer。
// org.apache.activemq.broker.region.Queue // consumer priority 的比較器 private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { @Override public int compare(Subscription s1, Subscription s2) { // We want the list sorted in descending order // 倒序,即數值大的優先級高 int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); if (val == 0 && messageGroupOwners != null) { // then ascending order of assigned message groups to favour less loaded consumers // Long.compare in jdk7 long x = s1.getConsumerInfo().getLastDeliveredSequenceId(); long y = s2.getConsumerInfo().getLastDeliveredSequenceId(); val = (x < y) ? -1 : ((x == y) ? 0 : 1); } return val; } }; // 添加 consumer 的時候,會觸發排序 // 在 consumers 列表中,靠前的 consumer,先分發消息 private void addToConsumerList(Subscription sub) { if (useConsumerPriority) { consumers.add(sub); Collections.sort(consumers, orderedCompare); } else { consumers.add(sub); } }
2、設置 message 的優先級需要在 broker 端和 producer 端配置:
2.1 在 broker 端設置 TEST.BAT 隊列為 prioritizedMessages = "true"
<policyEntry queue="TEST.BAT" prioritizedMessages="true" producerFlowControl="true" memoryLimit="1mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="TEST"/> </deadLetterStrategy> <pendingQueuePolicy> <storeCursor/> </pendingQueuePolicy> </policyEntry>
2.2 producer 發送消息時,設置 message 的優先級
TextMessage message = session.createTextMessage(text);
producer.send(destination, message, DeliveryMode.NON_PERSISTENT, 1, 0);
設置 message 的優先級,需要調用:
void javax.jms.MessageProducer.send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
而不能這么寫:
TextMessage message = session.createTextMessage(text);
message.setJMSPriority(0);
初步看是 ActiveMQ 的 bug。消息的 priority 值,從0到9。消息配置了優先級之后,消息存放在 PrioritizedPendingList 中。
// 省略部分代碼 private class PrioritizedPendingListIterator implements Iterator<MessageReference> { private int index = 0; private int currentIndex = 0; List<PendingNode> list = new ArrayList<PendingNode>(size()); PrioritizedPendingListIterator() { for (int i = MAX_PRIORITY - 1; i >= 0; i--) { // priority 值大的優先級高 OrderedPendingList orderedPendingList = lists[i]; if (!orderedPendingList.isEmpty()) { list.addAll(orderedPendingList.getAsList()); } } } }
