開發生產者代碼
第一步:創建很普通的 SpringBoot 項目
第二步:加入相關依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
第三步:寫代碼

PayProducer 類如下所示:
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class PayProducer { private String producerGroup = "pay_group"; private String nameServerAddr = "192.168.0.104:9876"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多個地址以 ; 隔開 //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876"); producer.setNamesrvAddr(nameServerAddr); start(); } public DefaultMQProducer getProducer() { return this.producer; } /** * 對象在使用之前必須要調用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown() { this.producer.shutdown(); } }
PayController 類如下所示:
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @RestController public class PayController { @Autowired private PayProducer payProducer; private static final String topic = "pay_test_topic"; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(topic,"taga", ("hello rocketmq = "+text).getBytes() ); SendResult sendResult = payProducer.getProducer().send(message); System.out.println(sendResult); return new HashMap<>(); } }
第四步:測試
通過可視化管理后台查看消息


Message對象
- topic:主題名稱
- tag:標簽,用於過濾
- key:消息唯一標示,可以是業務字段組合
- body:消息體,字節數組
注意:發送消息到 Broker 前,需要判斷是否有此 Topic。啟動 Broker 的時候,本地環境建議開啟自動創建 Topic,生產環境建議關閉自動化創建 Topic。建議先手工創建 Topic,如果靠程序自動創建,然后再投遞消息,會出現延遲情況。自動創建topic: autoCreateTopicEnable=true 無效原因:客戶端版本要和服務端版本保持一致。
概念模型: 一個 Topic 下面對應多個 Queue,可以在創建 Topic 時指定,如訂單類 Topic。
常見錯誤一
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
原因:阿里雲存在多網卡,rocketmq都會根據當前網卡選擇一個IP使用,當你的機器有多塊網卡時,很有可能會有問題。比如,我遇到的問題是我機器上有兩個IP,一個公網IP,一個私網IP, 因此需要配置broker.conf 指定當前的公網ip, 然后重新啟動broker 新增配置:conf/broker.conf (屬性名稱brokerIP1=broker所在的公網ip地址 ) 新增這個配置:brokerIP1=120.76.62.13 啟動命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
常見錯誤二
MQClientException: No route info of this topic, TopicTest1 原因:Broker 禁止自動創建 Topic,且用戶沒有通過手工方式創建 此Topic, 或者broker和Nameserver網絡不通 解決: 通過 sh bin/mqbroker -m 查看配置 autoCreateTopicEnable=true 則自動創建topic Centos7關閉防火牆 systemctl stop firewalld
常見錯誤三
控制台查看不了數據,提示連接 10909錯誤 原因:Rocket默認開啟了VIP通道,VIP通道端口為10911-2=10909 解決:阿里雲安全組需要增加一個端口 10909
其他錯誤:
https://blog.csdn.net/qq_14853889/article/details/81053145 https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E https://www.jianshu.com/p/bfd6d849f156 https://blog.csdn.net/wangmx1993328/article/details/81588217
開發消費者代碼
接着上面的工程,直接上代碼,PayConsumer 類如下所示:
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String CONSUMER_GROUP = "pay_consumer_group"; private String NAME_SERVER = "192.168.0.104:9876"; private String TOPIC = "pay_test_topic"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(this.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(this.TOPIC, "*"); // consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // try { // Message msg = msgs.get(0); // System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); // String topic = msg.getTopic(); // String body = new String(msg.getBody(), "utf-8"); // String tags = msg.getTags(); // String keys = msg.getKeys(); // System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } catch (UnsupportedEncodingException e) { // e.printStackTrace(); // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // } // }); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { Message msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
注釋掉的部分采用 Lambda 表達式寫法,效果是一樣的。
常見問題
1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed
2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]
3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, MacBook-Air.local, MacBook-Air.local]
解決:多網卡問題處理
1、設置producer: producer.setVipChannelEnabled(false);
2、編輯ROCKETMQ 配置文件:broker.conf(下列ip為自己的ip)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
4、DESC: service not available now, maybe disk full, CL:
解決:修改啟動腳本runbroker.sh,在里面增加一句話即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
(磁盤保護的百分比設置成98%,只有磁盤空間使用率達到98%時才拒絕接收producer消息)
常見問題處理
https://blog.csdn.net/sqzhao/article/details/54834761
https://blog.csdn.net/mayifan0/article/details/67633729
https://blog.csdn.net/a906423355/article/details/78192828
