訂閱關系一致指的是同一個消費者Group ID下所有Consumer實例所訂閱的Topic、Tag必須完全一致。如果訂閱關系不一致,消息消費的邏輯就會混亂,甚至導致消息丟失。本文提供訂閱關系一致的正確示例代碼以及訂閱關系不一致的可能原因,幫助您順暢地訂閱消息。
背景信息
消息隊列RocketMQ版里的一個消費者Group ID代表一個Consumer實例群組。對於大多數分布式應用來說,一個消費者Group ID下通常會掛載多個Consumer實例。
-
訂閱的Topic必須一致,例如:Consumer1訂閱TopicA和TopicB,Consumer2也必須訂閱TopicA和TopicB,不能只訂閱TopicA、只訂閱TopicB或訂閱TopicA和TopicC。
-
訂閱的同一個Topic中的Tag必須一致,包括Tag的數量和Tag的順序,例如:Consumer1訂閱TopicB且Tag為Tag1||Tag2,Consumer2訂閱TopicB的Tag也必須是Tag1||Tag2,不能只訂閱Tag1、只訂閱Tag2或者訂閱Tag2||Tag1。
正確的訂閱關系如下,多個Group ID分別訂閱了不同的Topic,但是同一個Group ID下的多個Consumer實例C1、C2、C3訂閱的Topic和Tag都一致。
注意: 消息隊列RocketMQ版支持使用TCP協議和HTTP協議的SDK客戶端收發消息,除了保證同一Group ID下的Consumer實例訂閱關系一致,還必須保證訂閱消息的Group ID的協議版本和SDK的協議版本一致,例如,使用TCP協議的SDK收發消息,訂閱消息時也必須使用創建的TCP協議的Group ID,否則會導致消息消費失敗。
正確訂閱關系一:訂閱一個Topic且訂閱一個Tag
如下圖所示,同一Group ID下的三個Consumer實例C1、C2和C3分別都訂閱了TopicA,且訂閱TopicA的Tag也都是Tag1,符合訂閱關系一致原則。
正確示例代碼一
C1、C2、C3的訂閱關系一致,即C1、C2、C3訂閱消息的代碼必須完全一致,代碼示例如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "Tag1", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
正確訂閱關系二:訂閱一個Topic且訂閱多個Tag
如下圖所示,同一Group ID下的三個Consumer實例C1、C2和C3分別都訂閱了TopicB,訂閱TopicB的Tag也都是Tag1和Tag2,表示訂閱TopicB中所有Tag為Tag1或Tag2的消息,且順序一致都是Tag1||Tag2,符合訂閱關系一致性原則。
正確示例代碼二
C1、C2、C3的訂閱關系一致,即C1、C2、C3訂閱消息的代碼必須完全一致,代碼示例如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
正確訂閱關系三:訂閱多個Topic且訂閱多個Tag
如下圖所示,同一Group ID下的三個Consumer實例C1、C2和C3分別都訂閱了TopicA和TopicB,且訂閱的TopicA都未指定Tag,即訂閱TopicA中的所有消息,訂閱的TopicB的Tag都是Tag1和Tag2,表示訂閱TopicB中所有Tag為Tag1或Tag2的消息,且順序一致都是Tag||Tag2,符合訂閱關系一致原則。
正確示例代碼三
C1、C2、C3的訂閱關系一致,即C1、C2、C3訂閱消息的代碼必須完全一致,代碼示例如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
查看訂閱關系一致性
您可在消息消息隊列RocketMQ版控制台Group 詳情頁面查看指定Group的訂閱關系是否一致。具體操作,請參見
常見訂閱關系不一致問題
使用消息隊列RocketMQ版收發消息時,Consumer收到的消息不符合預期並且在消息隊列RocketMQ版控制台查看到訂閱關系不一致,則Consumer實例可能存在以下問題:
-
錯誤示例一:同一Group ID下的Consumer實例訂閱的Topic不同。
如下圖所示,同一Group ID下的三個Consumer實例C1、C2和C3分別訂閱了TopicA、TopicB和TopicC,訂閱的Topic不一致,不符合訂閱關系一致性原則。
錯誤示例代碼一
-
Consumer實例1-1:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
-
Consumer實例1-2:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicC", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
-
Consumer實例1-3:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicB", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
錯誤示例二:同一Group ID下的Consumer實例訂閱的Topic相同,但訂閱Topic的Tag不同。
如下圖所示,同一Group ID下的三個Consumer實例C1、C2和C3分別都訂閱了TopicA,但是C1訂閱TopicA的Tag為Tag1,C2和C3訂閱的TopicA的Tag為Tag2,訂閱同一Topic的Tag不一致,不符合訂閱關系一致性原則。
錯誤示例代碼二
-
Consumer實例2-1:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "Tag1", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
-
Consumer實例2-2:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
-
Consumer實例2-3:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
錯誤示例三:同一Group ID下的Consumer實例訂閱的Topic及Topic的Tag都相同,但訂閱的Tag順序不同。
如下圖所示,同一Group ID下的三個Consumer實例C1、C2和C3分別都訂閱了TopicA和TopicB,並且訂閱的TopicA都沒有指定Tag,訂閱TopicB的Tag都是Tag1和Tag2,但是C1訂閱TopicB的Tag為Tag1||Tag2,C2和C3訂閱的Tag為Tag2||Tag1,順序不一致,不符合訂閱關系一致性原則。
錯誤示例代碼三
-
Consumer實例3-1:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
-
Consumer實例3-2:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
-
Consumer實例3-3:
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicA", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
文章來源:https://help.aliyun.com/document_detail/43523.html#section-cu2-w4u-kpn