ActiveMQ topic 普通訂閱和持久訂閱


直觀的結果:當生產者向 topic 發送消息,

1. 若不存在持久訂閱者和在線的普通訂閱者,這個消息不會保存,當普通訂閱者上線后,它是收不到消息的。

2. 若存在離線的持久訂閱者,broker 會為該持久訂閱者保存消息,當該持久訂閱者上線后,會收到消息。

本質:producer 發送消息給 topic,broker 接收消息並且分發給 consumers。consumers 包括持久訂閱者和在線的普通訂閱者,對於持久訂閱者,broker 把消息添加到它的 message cursor 中;對於普通訂閱者,broker 直接分發消息。

如果,1個 topic 有2個持久訂閱者,並且這2個持久訂閱者都不在線,這時 producer 向 topic 發送1條消息,則 broker 會保存2條消息。因此,如果1個 topic 有很多不在線的持久訂閱者,會導致 broker 消耗過多存儲。

對於持久化的消息,這很好驗證:為方便查看消息,將 broker 持久化方式配置為jdbc,則可以在 ACTIVEMQ_MSGS 表中看到持久化消息。

持久訂閱者1:

new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10086");
...
TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");

持久訂閱者2:

new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10087");
...
TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang");

對於持久消息,驗證 broker 為每個持久訂閱者保存1條消息:
1. 啟動 broker;
2. 分別啟動2個持久訂閱者,然后關閉它們,這樣 broker 有了2個離線的持久訂閱者;
3. 啟動 producer 向 topic 發送1條消息;
4. 查看 ACTIVEMQ_MSGS 表

對於非持久消息,直接跟代碼了,這里不說明。

下圖是 Topic 接收消息並分發的調用棧:

// org.apache.activemq.broker.region.policy.SimpleDispatchPolicy
// 分發策略很簡單,就是遍歷consumers
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
        throws Exception {
    int count = 0;
    for (Subscription sub : consumers) {
        // Don't deliver to browsers
        if (sub.getConsumerInfo().isBrowser()) {
            continue;
        }
        // Only dispatch to interested subscriptions
        if (!sub.matches(node, msgContext)) {
            sub.unmatched(node);
            continue;
        }

        //持久化訂閱是 DurableTopicSubscription
        //普通訂閱是 TopicSubscription
        sub.add(node);
        count++;
    }

    return count > 0;
}

持久化訂閱:

// org.apache.activemq.broker.region.DurableTopicSubscription
public void add(MessageReference node) throws Exception {
    if (!active.get() && !keepDurableSubsActive) {
        return;
    }
    // 調用 PrefetchSubscription.add
    super.add(node);
}

//  org.apache.activemq.broker.region.PrefetchSubscription
public void add(MessageReference node) throws Exception {
    synchronized (pendingLock) {
        // The destination may have just been removed...
        if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
            // perhaps we should inform the caller that we are no longer valid to dispatch to?
            return;
        }

        // Don't increment for the pullTimeout control message.
        if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
            enqueueCounter++;
        }
        //首先加入到 message cursor 中,pending 類型為 StoreDurableSubscriberCursor
        pending.addMessageLast(node);
    }
    dispatchPending();
}

持久化訂閱者上線后,也會觸發消息分發動作即 dispatchPending,調用棧如下圖:

持久化訂閱者使用的 message cursor 是 StoreDurableSubscriberCursor。

 

普通訂閱:

org.apache.activemq.broker.region.TopicSubscription.add(MessageReference node)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM