實戰,用案例來說話
前面已經說了JMS和RocketMQ一些概念和安裝,下面使用SpringBoot來親身操作一下.
生產者的操作
- SpringBoot項目創建完成,引入依賴是第一步:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
- 創建生產者是第二步,生產者必須依賴於生產組,而且需要指定nameServer
@Component
public class PayProducer {
/**
* 生產組,生產者必須在生產組內
*/
private String producerGroup = "pay_group";
/**
* 端口
*/
private String nameServer = "39.106.214.179:9876";
private DefaultMQProducer producer;
public PayProducer() {
producer = new DefaultMQProducer(producerGroup);
// 指定nameServer地址,多個地址之間以 ; 隔開
producer.setNamesrvAddr(nameServer);
start();
}
public DefaultMQProducer getProducer() {
return producer;
}
/**
* 對象在使用之前必須調用一次,並且只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在應用上下文,使用上下文監聽器,進行關閉
*/
public void shutdown() {
producer.shutdown();
}
}
- 創建Controller進行測試發送消息,必須要指定topic,消息依賴於主題
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
/**
* topic,消息依賴於topic
*/
private static final String topic = "pay_test_topic";
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// 創建消息 主題 二級分類 消息內容好的字節數組
Message message = new Message(topic, "taga", ("hello rocketMQ " + text).getBytes());
SendResult send = payProducer.getProducer().send(message);
System.out.println(send);
return new HashMap<>();
}
}
-
采坑記錄
- 上面完成就可以啟動項目了,訪問之后報錯了:
MQClientException: No route info of this topic, TopicTest1 這個的原因就是Broker禁止自動創建Topic且用戶沒有通過手動方式創建Topic, 或者是broker與Nameserver網絡不通 解決: 使用手動創建Topic,在RocketMQ控制台的主題中創建就好,最主要的是指定topic name,如下圖 出現創建不了的情況往下看 如果還出現這個問題,請關閉防火牆
-
這次說下上面可能創建不了的問題,前面說了安裝開放安全組,這次就是因為rocketMQ虛擬的端口問題,需要開放10909,也就是說ECS最終開放的端口號: 8080,10911,9876,10909
-
繼續采坑
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 這個問題是阿里雲服務器存在多個網卡,rocketMQ會根據當前網卡選擇一個IP使用,我們需要制定一個IP: 路徑是: /usr/local/software/rocketmq/distribution/target/apache-rocketmq vim ./conf/broker.conf 添加配置: brokerIP1=公網IP 重新啟動: nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf & tail -f nohup.out
- 其他問題
https://blog.csdn.net/qq_14853889/article/details/81053145 https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E https://www.jianshu.com/p/bfd6d849f156 https://blog.csdn.net/wangmx1993328/article/details/81588217
消費者操作
-
在前一個項目的基礎上,將公共內容提取出來,創建一個JsmConfig的類,來聲明公共內容:
public class JmsConfig { /** * 端口 */ public static final String NAME_SERVER = "39.106.214.179:9876"; /** * topic,消息依賴於topic */ public static final String TOPIC = "pay_test_topic"; }
-
生產者內容變為
@Component public class PayProducer { /** * 生產組,生產者必須在生產組內 */ private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); // 指定nameServer地址,多個地址之間以 ; 隔開 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } public DefaultMQProducer getProducer() { return producer; } /** * 對象在使用之前必須調用一次,並且只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown() { producer.shutdown(); } }
-
創建消費者
@Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); // 設置消費地點,從最后一個進行消費(其實就是消費策略) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 訂閱主題的哪些標簽 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注冊監聽器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { // 獲取Message Message msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); // 標簽 String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.out.println("Consumer Listener"); } }
-
Controller的變化:
@RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 創建消息 主題 二級分類 消息內容好的字節數組 Message message = new Message(JmsConfig.TOPIC, "taga", ("hello rocketMQ " + text).getBytes()); SendResult send = payProducer.getProducer().send(message); System.out.println(send); return new HashMap<>(); } }
梳理一下整個流程,生產者存在於生產組,所以生產組很重要,創建生產者需要指定生產組.消費者同理,創建消費者也需要指定消費組. 並且二者都需要指定NameServer. 有了生產者就要發送消息,也就是Message,創建Message需要指定Topic,二級分類和消息體等信息. 那消費者如何獲取呢? 無非就是綁定Topic和二級分類就可以,這就是整個流程. 中間少說了消息的存放,消息是在broker中,這個相當於倉庫,所以就是生產者生產消息到Broker,Consumer從Broker中獲取消息進行消費.