直觀的結果:當生產者向 topic 發送消息,
1. 若不存在持久訂閱者和在線的普通訂閱者,這個消息不會保存,當普通訂閱者上線后,它是收不到消息的。
2. 若存在離線的持久訂閱者,broker 會為該持久訂閱者保存消息,當該持久訂閱者上線后,會收到消息。
本質:producer 發送消息給 topic,broker 接收消息並且分發給 consumers。consumers 包括持久訂閱者和在線的普通訂閱者,對於持久訂閱者,broker 把消息添加到它的 message cursor 中;對於普通訂閱者,broker 直接分發消息。
如果,1個 topic 有2個持久訂閱者,並且這2個持久訂閱者都不在線,這時 producer 向 topic 發送1條消息,則 broker 會保存2條消息。因此,如果1個 topic 有很多不在線的持久訂閱者,會導致 broker 消耗過多存儲。
對於持久化的消息,這很好驗證:為方便查看消息,將 broker 持久化方式配置為jdbc,則可以在 ACTIVEMQ_MSGS 表中看到持久化消息。
持久訂閱者1:
new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10086"); ... TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");
持久訂閱者2:
new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10087"); ... TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");
對於持久消息,驗證 broker 為每個持久訂閱者保存1條消息:
1. 啟動 broker;
2. 分別啟動2個持久訂閱者,然后關閉它們,這樣 broker 有了2個離線的持久訂閱者;
3. 啟動 producer 向 topic 發送1條消息;
4. 查看 ACTIVEMQ_MSGS 表
對於非持久消息,直接跟代碼了,這里不說明。
下圖是 Topic 接收消息並分發的調用棧:

// org.apache.activemq.broker.region.policy.SimpleDispatchPolicy // 分發策略很簡單,就是遍歷consumers public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception { int count = 0; for (Subscription sub : consumers) { // Don't deliver to browsers if (sub.getConsumerInfo().isBrowser()) { continue; } // Only dispatch to interested subscriptions if (!sub.matches(node, msgContext)) { sub.unmatched(node); continue; } //持久化訂閱是 DurableTopicSubscription //普通訂閱是 TopicSubscription sub.add(node); count++; } return count > 0; }
持久化訂閱:
// org.apache.activemq.broker.region.DurableTopicSubscription public void add(MessageReference node) throws Exception { if (!active.get() && !keepDurableSubsActive) { return; } // 調用 PrefetchSubscription.add super.add(node); } // org.apache.activemq.broker.region.PrefetchSubscription public void add(MessageReference node) throws Exception { synchronized (pendingLock) { // The destination may have just been removed... if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) { // perhaps we should inform the caller that we are no longer valid to dispatch to? return; } // Don't increment for the pullTimeout control message. if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { enqueueCounter++; } //首先加入到 message cursor 中,pending 類型為 StoreDurableSubscriberCursor pending.addMessageLast(node); } dispatchPending(); }
持久化訂閱者上線后,也會觸發消息分發動作即 dispatchPending,調用棧如下圖:

持久化訂閱者使用的 message cursor 是 StoreDurableSubscriberCursor。
普通訂閱:
org.apache.activemq.broker.region.TopicSubscription.add(MessageReference node)
