ActiveMQ Message Groups


http://activemq.apache.org/message-groups.html

與Exclusive Consumer相比,Message Groups的對消息分組的粒度更細。具有相同groupId的消息會被投送到同一個消費者,除非這個消費者掛了。

代碼示例:

Mesasge message = session.createTextMessage("<foo>hey</foo>");
// 設置groupId
message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
// 設置sequence
message.setIntProperty("JMSXGroupSeq", -1);

producer.send(message);

對應的代碼在 org.apache.activemq.broker.region.Queue 中:

// 判斷消息能否分發給消費者,返回true表示可以
// Subscription 表示消費者,QueueMessageReference 表示消息
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) 
            throws Exception {
    // 默認為true
    boolean result = true;
    // Keep message groups together.
    // 獲取消息的"JMSXGroupID"屬性
    String groupId = node.getGroupID();
    // 獲取消息的"JMSXGroupSeq"屬性
    int sequence = node.getGroupSequence();
    if (groupId != null) {
        // MessageGroupMap是一個Map,鍵是groupId,值是消費者
        MessageGroupMap messageGroupOwners = getMessageGroupOwners();
        // If we can own the first, then no-one else should own the
        // rest.
        if (sequence == 1) {
            assignGroup(subscription, messageGroupOwners, node, groupId);
        } else {

            // Make sure that the previous owner is still valid, we may
            // need to become the new owner.
            ConsumerId groupOwner;
            // 根據groupId取出消費者
            groupOwner = messageGroupOwners.get(groupId);
            if (groupOwner == null) {
                assignGroup(subscription, messageGroupOwners, node, groupId);
            } else {
                if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
                    // A group sequence < 1 is an end of group signal.
                    if (sequence < 0) {
                        messageGroupOwners.removeGroup(groupId);
                        subscription.getConsumerInfo().
                        setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
                    }
                } else {
                    result = false;
                }
            }
        }
    }

    return result;
}

// 往MessageGroupMap中插入鍵值對
protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, 
                        MessageReference n, String groupId) throws IOException {
    messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
    Message message = n.getMessage();
    message.setJMSXGroupFirstForConsumer(true);
    subs.getConsumerInfo().
        setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM