http://activemq.apache.org/message-groups.html
與Exclusive Consumer相比,Message Groups的對消息分組的粒度更細。具有相同groupId的消息會被投送到同一個消費者,除非這個消費者掛了。
代碼示例:
Mesasge message = session.createTextMessage("<foo>hey</foo>");
// 設置groupId
message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
// 設置sequence
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);
對應的代碼在 org.apache.activemq.broker.region.Queue 中:
// 判斷消息能否分發給消費者,返回true表示可以 // Subscription 表示消費者,QueueMessageReference 表示消息 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { // 默認為true boolean result = true; // Keep message groups together. // 獲取消息的"JMSXGroupID"屬性 String groupId = node.getGroupID(); // 獲取消息的"JMSXGroupSeq"屬性 int sequence = node.getGroupSequence(); if (groupId != null) { // MessageGroupMap是一個Map,鍵是groupId,值是消費者 MessageGroupMap messageGroupOwners = getMessageGroupOwners(); // If we can own the first, then no-one else should own the // rest. if (sequence == 1) { assignGroup(subscription, messageGroupOwners, node, groupId); } else { // Make sure that the previous owner is still valid, we may // need to become the new owner. ConsumerId groupOwner; // 根據groupId取出消費者 groupOwner = messageGroupOwners.get(groupId); if (groupOwner == null) { assignGroup(subscription, messageGroupOwners, node, groupId); } else { if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { // A group sequence < 1 is an end of group signal. if (sequence < 0) { messageGroupOwners.removeGroup(groupId); subscription.getConsumerInfo(). setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1); } } else { result = false; } } } } return result; } // 往MessageGroupMap中插入鍵值對 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); Message message = n.getMessage(); message.setJMSXGroupFirstForConsumer(true); subs.getConsumerInfo(). setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1); }
