【RocketMQ】同一個項目中,同一個topic,可以存在多個消費者么?


一、問題答案

  是不可以的

而且后注冊的會替換前注冊的,MqConsumer2會替換MqConsumer,並且只結束tag-2的消息

/**
 * @date 2019/05/28
 */
@Component
@Slf4j
public class MqConsumer implements MessageConsumer {


    @Override
    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
    public void onMessage(String msg) {
        log.info("接收到的庫存MQ消息:{}", msg);
        log.info("接收到的庫存MQ消息:{}", msg);
        log.info("接收到的庫存MQ消息:{}", msg);
    }

    @Override
    public String getTopic() {
        return "topic-1";
    }

    @Override
    public String getTag() {
        return "tag-1";
    }
}
@Component
@Slf4j
public class MqConsumer2 implements MessageConsumer {


    @Override
    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
    public void onMessage(String msg) {
        log.info("接收到的庫存MQ消息:{}", msg);
        log.info("接收到的庫存MQ消息:{}", msg);
        log.info("接收到的庫存MQ消息:{}", msg);
    }

    @Override
    public String getTopic() {
        return "topic-1";
    }

    @Override
    public String getTag() {
        return "tag-2";
    }
}

 

二、為什么呢?

我們從源碼的角度來分析下

1.訂閱消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) ,其中subExpression即為tag

package com.aliyun.openservices.ons.api.impl.rocketmq;
....
@Generated("ons-client")
public class ConsumerImpl extends ONSConsumerAbstract implements Consumer {
    private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>();

    public ConsumerImpl(final Properties properties) {
        super(properties);
        boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false"));
        this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull);

        String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
    }

    @Override
    public void start() {
        this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl());
        super.start();
    }


    @Override
    public void subscribe(String topic, String subExpression, MessageListener listener) { if (null == topic) {
            throw new ONSClientException("topic is null");
        }

        if (null == listener) {
            throw new ONSClientException("listener is null");
        }
        this.subscribeTable.put(topic, listener); super.subscribe(topic, subExpression);
    }

.....
}

 

從上面的類中我們可以從this.subscribeTable.put(topic, listener);看到subscribeTable這樣的一個Map,該Map與tag無關

2.我們再看super.subscribe(topic, subExpression)方法,屬於ONSConsumerAbstract類中

protected void subscribe(String topic, String subExpression) {
        try {
            this.defaultMQPushConsumer.subscribe(topic, subExpression);
        } catch (MQClientException e) {
            throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
        }
    }

DefaultMQPushConsumer中:

@Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }

DefaultMQPushConsumerImpl中:

public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
      //此處用來構建訂閱數據,並且指定了tag SubscriptionData subscriptionData
= FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
      //此處將topic和該topic的訂閱數據存放到subscriptionInner這個Map中
   // protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }

 

三、總結

從上面簡單的源碼可以看到,有用到兩個Map,

subscribeTable 和 subscriptionInner ,並且Map的key都為topic。所以我們可以篤定,RocketMQ在同一個項目中,只支持注冊一個topic消費者,那么也就只能指定一個tag

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM