http://activemq.apache.org/exclusive-consumer.html
producer發送消息是有先后順序的,這種順序保持到了broker中。如果希望消息按順序被消費掉,則應該把消息投送給單獨一個consumer。如果隊列只有一個consumer,那就很ok了,broker沒有選擇。但是,一旦唯一的consumer掛了,會造成服務不可用。因此出現了exclusive consumer,配置如下:
new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
如果有2個consumer都是這樣配置的,broker只會把隊列消息發送給其中一個consumer,如果這個consumer掛掉了,broker會把消息推送給另外的consumer,這樣就保證了按順序消費消息。
那么,ActiveMQ是怎樣實現這種邏輯的呢?
org.apache.activemq.broker.region.Queue中維持了一個consumer列表,分發消息的時候,會去遍歷列表,在隊列中靠前的consumer會優先被分發消息。
// org.apache.activemq.broker.region.Queue // 該方法把消息分發給consumer,PendingList是消息列表 private PendingList doActualDispatch(PendingList list) throws Exception { //消費者列表 List<Subscription> consumers; consumersLock.writeLock().lock(); try { if (this.consumers.isEmpty()) { // 消費者為空,直接返回 // slave dispatch happens in processDispatchNotification return list; } consumers = new ArrayList<Subscription>(this.consumers); } finally { consumersLock.writeLock().unlock(); } // 初始化fullConsumers Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); // 遍歷消息列表 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); Subscription target = null; // 遍歷消費者 for (Subscription s : consumers) { if (s instanceof QueueBrowserSubscription) { continue; } if (!fullConsumers.contains(s)) { if (!s.isFull()) { //消費者not full //滿足以下條件可以分發: //1. 符合QueueDispatchSelector的規則 //2. 消息的group屬性和消費者匹配 //3. 消息沒有被應答 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { // Dispatch it. s.add(node); LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId()); //從發送列表中刪除,消息並不會真的刪除 iterator.remove(); //設置消息的target target = s; break; } } else { // no further dispatch of list to a full consumer to // avoid out of order message receipt fullConsumers.add(s); LOG.trace("Subscription full {}", s); } } } if (target == null && node.isDropped()) { iterator.remove(); } // return if there are no consumers or all consumers are full if (target == null && consumers.size() == fullConsumers.size()) { return list; } // 在列表中調整consumer的順序 // 如果是exlusive consumer,則不會進分支,那么exlusive consumer的順序不會變 // 一旦進入這個分支,當前的consumer會被放到最后 // If it got dispatched, rotate the consumer list to get round robin // distribution. if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) { consumersLock.writeLock().lock(); try { // 先從this.consumers中刪除當前consumer if (removeFromConsumerList(target)) { // 然后把當前consumer添加到this.consumers中 addToConsumerList(target); consumers = new ArrayList<Subscription>(this.consumers); } } finally { consumersLock.writeLock().unlock(); } } } return list; } private void addToConsumerList(Subscription sub) { if (useConsumerPriority) { consumers.add(sub); Collections.sort(consumers, orderedCompare); } else { consumers.add(sub); } }
QueueDispatchSelector保證:如果配置了 exclusive consumer,一定會把消息分發給 exclusive consumer。
// org.apache.activemq.broker.region.QueueDispatchSelector public boolean canSelect(Subscription subscription, MessageReference m) throws Exception { boolean result = super.canDispatch(subscription, m); if (result && !subscription.isBrowser()) { // 沒有配置exclusiveConsumer,或者exclusiveConsumer就是當前消費者 result = exclusiveConsumer == null || exclusiveConsumer == subscription; } return result; }
在添加消費者的時候,設置exclusive consumer:
//org.apache.activemq.broker.region.Queue public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() }); super.addSubscription(context, sub); // synchronize with dispatch method so that no new messages are sent // while setting up a subscription. avoid out of order messages, // duplicates, etc. pagedInPendingDispatchLock.writeLock().lock(); try { sub.add(context, this); // needs to be synchronized - so no contention with dispatching // consumersLock. consumersLock.writeLock().lock(); try { // set a flag if this is a first consumer if (consumers.size() == 0) { firstConsumer = true; if (consumersBeforeDispatchStarts != 0) { consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); } } else { if (consumersBeforeStartsLatch != null) { consumersBeforeStartsLatch.countDown(); } } addToConsumerList(sub); //設置 exclusive consumer if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) { Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); if (exclusiveConsumer == null) { // exclusiveConsumer為空 exclusiveConsumer = sub; } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE || sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) { //如果當前訂閱者的優先級比已有的exclusiveConsumer高 exclusiveConsumer = sub; } dispatchSelector.setExclusiveConsumer(exclusiveConsumer); } } finally { consumersLock.writeLock().unlock(); } if (sub instanceof QueueBrowserSubscription) { // tee up for dispatch in next iterate QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); browserDispatches.add(browserDispatch); } if (!this.optimizedDispatch) { wakeup(); } } finally { pagedInPendingDispatchLock.writeLock().unlock(); } if (this.optimizedDispatch) { // Outside of dispatchLock() to maintain the lock hierarchy of // iteratingMutex -> dispatchLock. - see // https://issues.apache.org/activemq/browse/AMQ-1878 wakeup(); } }
