1、問題描述
兩個一樣的Consumer Group的Consumer訂閱同一個Topic,但是是不同的tag,Consumer1訂閱Topic的tag1,Consumer2訂閱Topic的tag2,然后分別啟動。這時候往Topic的tag1里發送10條數據,Topic的tag2里發送10條。目測應該是Consumer1和Consumer2分別收到對應的10條消息。結果卻是只有Consumer2收到了消息,而且只收到了4-6條消息,不固定。
2、代碼
consumer:
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer"); consumer.setNamesrvAddr("124.57.180.156:9876"); consumer.subscribe("TopicTest2","tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); System.out.println(msg.getTags()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("ConsumerStarted."); } }
啟動這個訂閱了TopicTest2的tag1標簽的Consumer,然后將tag1改為tag2再次啟動Consumer。這就相當於啟動了兩個Consumer進程,一個訂閱了TopicTest2的tag1標簽,另一個訂閱了TopicTest2的tag2標簽。
producer:
public class Producer { public static void main(String[] args) throws MQClientException { final DefaultMQProducer producer = new DefaultMQProducer("test-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); producer.start(); for (int i = 0; i < 10; i++){ try { Message msg = new Message("TopicTest2", "tag1", ("Hello tag1 - "+i).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); }catch(Exception e) { e.printStackTrace(); } } } }
結果:
Consumer和Producer都啟動后,發現如下:
- Producer發送了20條消息正常。
- Consumer1沒有消費到tag1下的數據
- Consumer2消費了一半(不一定是幾條,有時候5條,有時候6條的)消息。
3、問題分析
(1)首先這是broker決定的,不是consumer端決定的
(2)Consumer端發心跳給Broker,Broker收到后存到consumerTable里(就是個Map),key是GroupName,value是ConsumerGroupInfo。
(3)ConsumerGroupInfo包含topic、tag等信息,但是問題就出在上一步驟,key是groupName,相同GroupName的話Broker心跳最后收到的Consumer會覆蓋前者的。
這樣同key,肯定產生了覆蓋。所以Consumer1不會收到任何消息,但是Consumer2為什么只收到了一半(不固定)消息呢?
那是因為:你是集群模式消費,它會負載均衡分配到各個節點去消費,所以一半消息(不固定個數)跑到了Consumer1上,結果Consumer1訂閱的是tag1,所以不會任何輸出。
如果換成BROADCASTING,那后者會收到全部消息,而不是一半,因為廣播是廣播全部Consumer。
4、源碼驗證
調用鏈:
# 核心在於如下這個方法
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()
# 關鍵調用鏈如下
# 入口是Broker啟動的時候
org.apache.rocketmq.broker.BrokerStartup#start()
org.apache.rocketmq.broker.BrokerController#start()
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start()
org.apache.rocketmq.remoting.netty.NettyRemotingServer#prepareSharableHandlers()
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0()
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived()
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand()
org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest()
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat()
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()
registerConsumer
/** * Consumer信息 */ public class ConsumerGroupInfo { // 組名 private final String groupName; // topic信息,比如topic、tag等 private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>(); // 客戶端信息,比如clientId等 private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16); // PULL/PUSH private volatile ConsumeType consumeType; // 消費模式:BROADCASTING/CLUSTERING private volatile MessageModel messageModel; // 消費到哪了 private volatile ConsumeFromWhere consumeFromWhere; } /** * 通過心跳將Consumer信息注冊到Broker端。 */ public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { // consumerTable:維護所有的Consumer ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); // 如果沒有Consumer,則put到map里 if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); // put到map里 ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } // 更新Consumer信息,客戶端信息 boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); // 更新訂閱Topic信息 boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; }
從這一步可以看出消費者信息是以groupName為key,ConsumerGroupInfo為value存到map(consumerTable)里的,那很明顯了,后者肯定會覆蓋前者的,因為key是一樣的。而后者的tag是tag2,那肯定覆蓋了前者的tag1,這部分是存到ConsumerGroupInfo的subscriptionTable里面的。
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData包含了topic等信息
public class SubscriptionData implements Comparable<SubscriptionData> { // topic private String topic; private String subString; // tags private Set<String> tagsSet = new HashSet<String>(); private Set<Integer> codeSet = new HashSet<Integer>(); }
