一、問題答案
是不可以的
而且后注冊的會替換前注冊的,MqConsumer2會替換MqConsumer,並且只結束tag-2的消息
/** * @date 2019/05/28 */ @Component @Slf4j public class MqConsumer implements MessageConsumer { @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED) public void onMessage(String msg) { log.info("接收到的庫存MQ消息:{}", msg); log.info("接收到的庫存MQ消息:{}", msg); log.info("接收到的庫存MQ消息:{}", msg); } @Override public String getTopic() { return "topic-1"; } @Override public String getTag() { return "tag-1"; } }
@Component @Slf4j public class MqConsumer2 implements MessageConsumer { @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED) public void onMessage(String msg) { log.info("接收到的庫存MQ消息:{}", msg); log.info("接收到的庫存MQ消息:{}", msg); log.info("接收到的庫存MQ消息:{}", msg); } @Override public String getTopic() { return "topic-1"; } @Override public String getTag() { return "tag-2"; } }
二、為什么呢?
我們從源碼的角度來分析下
1.訂閱消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) ,其中subExpression即為tag
package com.aliyun.openservices.ons.api.impl.rocketmq; .... @Generated("ons-client") public class ConsumerImpl extends ONSConsumerAbstract implements Consumer { private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>(); public ConsumerImpl(final Properties properties) { super(properties); boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false")); this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull); String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel)); } @Override public void start() { this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl()); super.start(); } @Override public void subscribe(String topic, String subExpression, MessageListener listener) { if (null == topic) { throw new ONSClientException("topic is null"); } if (null == listener) { throw new ONSClientException("listener is null"); } this.subscribeTable.put(topic, listener); super.subscribe(topic, subExpression); } ..... }
從上面的類中我們可以從this.subscribeTable.put(topic, listener);看到subscribeTable這樣的一個Map,該Map與tag無關
2.我們再看super.subscribe(topic, subExpression)方法,屬於ONSConsumerAbstract類中
protected void subscribe(String topic, String subExpression) { try { this.defaultMQPushConsumer.subscribe(topic, subExpression); } catch (MQClientException e) { throw new ONSClientException("defaultMQPushConsumer subscribe exception", e); } }
DefaultMQPushConsumer中:
@Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression); }
DefaultMQPushConsumerImpl中:
public void subscribe(String topic, String subExpression) throws MQClientException { try {
//此處用來構建訂閱數據,並且指定了tag SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
//此處將topic和該topic的訂閱數據存放到subscriptionInner這個Map中
// protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
三、總結
從上面簡單的源碼可以看到,有用到兩個Map,
subscribeTable 和 subscriptionInner ,並且Map的key都為topic。所以我們可以篤定,RocketMQ在同一個項目中,只支持注冊一個topic消費者,那么也就只能指定一個tag