RocketMQ(七 )為什么同一個消費組設置不同tag會出現奇怪現象


一、問題復現

1、描述

兩個一樣的Consumer Group的Consumer訂閱同一個Topic,但是是不同的tag,Consumer1訂閱Topic的tag1,Consumer2訂閱Topic的tag2,然后分別啟動。這時候往Topic的tag1里發送10條數據,Topic的tag2里發送10條。目測應該是Consumer1和Consumer2分別收到對應的10條消息。結果卻是只有Consumer2收到了消息,而且只收到了4-6條消息,不固定。

2、代碼

2.1、Consumer

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer");
        consumer.setNamesrvAddr("124.57.180.156:9876");
        consumer.subscribe("TopicTest2","tag1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                System.out.println(msg.getTags());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("ConsumerStarted.");
    }
}

啟動這個訂閱了TopicTest2的tag1標簽的Consumer,然后將tag1改為tag2再次啟動Consumer。這就相當於啟動了兩個Consumer進程,一個訂閱了TopicTest2的tag1標簽,另一個訂閱了TopicTest2的tag2標簽。

2.2、Producer

public class Producer {
    public static void main(String[] args) throws MQClientException {
        final DefaultMQProducer producer = new DefaultMQProducer("test-producer");
        producer.setNamesrvAddr("124.57.180.156:9876");
        producer.start();

        for (int i = 0; i < 10; i++){
            try {
                Message msg = new Message("TopicTest2", "tag1", ("Hello tag1 - "+i).getBytes());
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}

啟動Producer,往TopicTest2的tag1里發10條消息。再次將tag1改為tag2,然后再次啟動Producer進行發送,這樣就是TopicTest2的tag1下有10條消息,TopicTest2的tag2下也有10條消息。

3、結果

Consumer和Producer都啟動后,發現如下:

  • Producer發送了20條消息正常。
  • Consumer1沒有消費到tag1下的數據
  • Consumer2消費了一半(不一定是幾條,有時候5條,有時候6條的)消息。

二、問題答案

  • 首先這是Broker決定的,而不是Consumer端決定的

之前看過一篇文章寫的有理有據,寫的是Consumer端,還貼出了debug的源碼,說后者覆蓋了前者,但是我想說:你啟動了兩個獨立的Consumer,那是兩個獨立的進程,根本不存在覆蓋不覆蓋的問題,那就是獨立的。JVM就一個。又不是共享的JVM,何來覆蓋?

  • Consumer端發心跳給Broker,Broker收到后存到consumerTable里(就是個Map),key是GroupName,value是ConsumerGroupInfo。
  • ConsumerGroupInfo里面是包含topic等信息的,但是問題就出在上一步驟,key是groupName,你同GroupName的話Broker心跳最后收到的Consumer會覆蓋前者的。相當於如下代碼:
map.put(groupName, ConsumerGroupInfo);

這樣同key,肯定產生了覆蓋。所以Consumer1不會收到任何消息,但是Consumer2為什么只收到了一半(不固定)消息呢?

那是因為:你是集群模式消費,它會負載均衡分配到各個節點去消費,所以一半消息(不固定個數)跑到了Consumer1上,結果Consumer1訂閱的是tag1,所以不會任何輸出。

如果換成BROADCASTING,那絕逼后者會收到全部消息,而不是一半,因為廣播是廣播全部Consumer。

三、源碼驗證

1、調用鏈

# 核心在於如下這個方法
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()
    
# 關鍵調用鏈如下
# 入口是Broker啟動的時候
org.apache.rocketmq.broker.BrokerStartup#start()
org.apache.rocketmq.broker.BrokerController#start()
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start() 
org.apache.rocketmq.remoting.netty.NettyRemotingServer#prepareSharableHandlers()
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0()
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived()
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand()
org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest()
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat()
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()

2、源碼

2.1、registerConsumer

/**
 * Consumer信息
 */
public class ConsumerGroupInfo {
    // 組名
    private final String groupName;
    // topic信息,比如topic、tag等
    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();
    // 客戶端信息,比如clientId等
    private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
    // PULL/PUSH
    private volatile ConsumeType consumeType;
    // 消費模式:BROADCASTING/CLUSTERING
    private volatile MessageModel messageModel;
    // 消費到哪了
    private volatile ConsumeFromWhere consumeFromWhere;
}

/**
 * 通過心跳將Consumer信息注冊到Broker端。
 */
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

    // consumerTable:維護所有的Consumer
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    // 如果沒有Consumer,則put到map里
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        // put到map里
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    // 更新Consumer信息,客戶端信息
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                                        consumeFromWhere);
    // 更新訂閱Topic信息
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    return r1 || r2;
}

從這一步可以看出消費者信息是以groupName為key,ConsumerGroupInfo為value存到map(consumerTable)里的,那很明顯了,后者肯定會覆蓋前者的,因為key是一樣的。而后者的tag是tag2,那肯定覆蓋了前者的tag1,這部分是存到ConsumerGroupInfo的subscriptionTable里面的

private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
    new ConcurrentHashMap<String, SubscriptionData>();

SubscriptionData包含了topic等信息

public class SubscriptionData implements Comparable<SubscriptionData> {
    // topic
    private String topic;
    private String subString;
    // tags
    private Set<String> tagsSet = new HashSet<String>();
    private Set<Integer> codeSet = new HashSet<Integer>();
}

2.2、兩個問題

1.topic、tag等信息是怎么覆蓋的?

boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);
/**
 * 其實很簡單,就是以topic為key,SubscriptionData為value。而SubscriptionData里包含了tags信息,所以直接覆蓋掉
 */
public boolean updateSubscription(final Set<SubscriptionData> subList) {
    for (SubscriptionData sub : subList) {
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        if (old == null) {
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
        } else if (sub.getSubVersion() > old.getSubVersion()) {
            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }
}

等等,這里好像有新發現ConsumerGroupInfo#subscriptionTable

// {@link org.apache.rocketmq.broker.client.ConsumerGroupInfo#subscriptionTable}
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();

可以有意外收獲就是topic作為map的key,那豈不是一個Consumer可以訂閱多個Topic?是的,通過這段源碼可以發現是沒毛病的,我也測試過。

2.這么看的話Consumer端只會存在一個進程,因為同組,注冊進去就覆蓋了呀?

大哥,注意ConsumerGroupInfo里的channelInfoTable

// 客戶端信息,比如clientId等
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
    new ConcurrentHashMap<Channel, ClientChannelInfo>(16);

ClientChannelInfo是包含clientId等信息的,代表一個Consumer。注冊方法是:

boolean r2 = consumerGroupInfo.updateSubscription(subList);
/**
 * 下面是刪減后的代碼,其實就是以Channel作為key,每個Consumer的Channel是不一樣的。所以能存多個Consumer客戶端
 */
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
        MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
    ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
    if (null == infoOld) {
        ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM