概念:
Producer
消息生產者,生產者的作用就是將消息發送到 MQ,生產者本身既可以產生消息,如讀取文本信息等。也可以對外提供接口,由外部應用來調用接口,再由生產者將收到的消息發送到 MQ。
Producer Group
生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。在這里可以不用關心,只要知道有這么一個概念即可。
Consumer
消息消費者,簡單來說,消費 MQ 上的消息的應用程序就是消費者,至於消息是否進行邏輯處理,還是直接存儲到數據庫等取決於業務需要。
Consumer Group
消費者組,和生產者類似,消費同一類消息的多個 consumer 實例組成一個消費者組。
Topic
Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。
Message
Message 是消息的載體。一個 Message 必須指定 topic,相當於寄信的地址。Message 還有一個可選的 tag 設置,以便消費端可以基於 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,方便在開發過程中診斷問題。
Tag
標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。一個Tag標識為一類消息中的二級分類。
Broker
Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的消息,儲存以及為消費者拉取消息的請求做好准備。
Name Server
Name Server 為 producer 和 consumer 提供路由信息。
1..生產者代碼
這里生產者組為TopicTest007Group,消息主題為TopicTest007,消息標簽為TagA 。Tag標簽可以不填,當需要將同一主題的消息分類時,可以利用Tag。
package com.example.rocketmq; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RocketmqApplication { public static void main(String[] args) throws MQClientException, InterruptedException { SpringApplication.run(RocketmqApplication.class, args); DefaultMQProducer producer = new DefaultMQProducer("TopicTest007Group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 100; i < 120; i++) { try { Message msg = new Message("TopicTest007" , "TagA" , ("Hello RocketMQ,消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.println( new String(msg.getBody())); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
2.消費者代碼,這里為了測試多個消費者情況 ,寫了兩個消費者。
兩個消費者內容一樣,消費者組為TopicTest007Group,消費的消息主題為TopicTest007。這就表示,這兩個消費者共同處理TopicTest007的消息。
package com.example.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.List; @SpringBootApplication public class RocketMQConsumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TopicTest007Group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest007", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for(MessageExt msg:msgs){ System.out.println("RocketMQConsumer1:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
package com.example.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.List; @SpringBootApplication public class RocketMQConsumer2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TopicTest007Group"); consumer.setNamesrvAddr("27.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest007", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for(MessageExt msg:msgs){ System.out.println("RocketMQConsumer2:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
運行上面兩個消費者代碼,然后運行生產者代碼,生成20條消息。結果可以看到,兩個消費者分別消費了一部分消息,加起來一共20條。
問題1,生產者group和消費者group名稱必須一致么?
需要注意的是,生產者group只對生產者有效,和消費者的group無關。所以上面的消費者的group名稱可以和生產者的group名稱不一致。
問題2:消費者的group名稱不同,是每組group都能消費到生產者發出的消息嗎?
將上面的RocketMQConsumer1的Group改為ConsumerGroup1,RocketMQConsumer2 的Group改為ConsumerGroup2。
可以發現生產者發送消息后,這兩個消費者都收到了消息。
