SpringBoot2.X 整合 RocketMQ4.X


開發生產者代碼

第一步:創建很普通的 SpringBoot 項目

第二步:加入相關依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

第三步:寫代碼

PayProducer 類如下所示:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    private String producerGroup = "pay_group";

    private String nameServerAddr = "192.168.0.104:9876";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);

        //指定NameServer地址,多個地址以 ; 隔開
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");

        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 對象在使用之前必須要調用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController 類如下所示:

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;

    private static  final String topic = "pay_test_topic";

    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(topic,"taga", ("hello rocketmq = "+text).getBytes() );
        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

第四步:測試

通過可視化管理后台查看消息

 

 

Message對象

  • topic:主題名稱
  • tag:標簽,用於過濾
  • key:消息唯一標示,可以是業務字段組合
  • body:消息體,字節數組

注意:發送消息到 Broker 前,需要判斷是否有此 Topic。啟動 Broker 的時候,本地環境建議開啟自動創建 Topic,生產環境建議關閉自動化創建 Topic。建議先手工創建 Topic,如果靠程序自動創建,然后再投遞消息,會出現延遲情況。自動創建topic: autoCreateTopicEnable=true 無效原因:客戶端版本要和服務端版本保持一致。

概念模型: 一個 Topic 下面對應多個 Queue,可以在創建 Topic 時指定,如訂單類 Topic。

常見錯誤一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里雲存在多網卡,rocketmq都會根據當前網卡選擇一個IP使用,當你的機器有多塊網卡時,很有可能會有問題。比如,我遇到的問題是我機器上有兩個IP,一個公網IP,一個私網IP, 因此需要配置broker.conf 指定當前的公網ip, 然后重新啟動broker 
新增配置:conf/broker.conf  (屬性名稱brokerIP1=broker所在的公網ip地址 )
新增這個配置:brokerIP1=120.76.62.13  

啟動命令:nohup sh bin/mqbroker -n localhost:9876  -c ./conf/broker.conf &

常見錯誤二

MQClientException: No route info of this topic, TopicTest1
原因:Broker 禁止自動創建 Topic,且用戶沒有通過手工方式創建 此Topic, 或者broker和Nameserver網絡不通
解決:
通過 sh bin/mqbroker -m  查看配置
autoCreateTopicEnable=true 則自動創建topic

Centos7關閉防火牆  systemctl stop firewalld

常見錯誤三

控制台查看不了數據,提示連接 10909錯誤

原因:Rocket默認開啟了VIP通道,VIP通道端口為10911-2=10909

解決:阿里雲安全組需要增加一個端口 10909

其他錯誤:

https://blog.csdn.net/qq_14853889/article/details/81053145
https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
https://www.jianshu.com/p/bfd6d849f156
https://blog.csdn.net/wangmx1993328/article/details/81588217

 

開發消費者代碼

接着上面的工程,直接上代碼,PayConsumer 類如下所示:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;

    private String CONSUMER_GROUP = "pay_consumer_group";
    private String NAME_SERVER = "192.168.0.104:9876";
    private String TOPIC = "pay_test_topic";

    public PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(this.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(this.TOPIC, "*");

//        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//            try {
//                Message msg = msgs.get(0);
//                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//                String topic = msg.getTopic();
//                String body = new String(msg.getBody(), "utf-8");
//                String tags = msg.getTags();
//                String keys = msg.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {

                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }
}

注釋掉的部分采用 Lambda 表達式寫法,效果是一樣的。

常見問題

1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 

2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, 	MacBook-Air.local, MacBook-Air.local]
解決:多網卡問題處理
	1、設置producer:  producer.setVipChannelEnabled(false);
	2、編輯ROCKETMQ 配置文件:broker.conf(下列ip為自己的ip)
		namesrvAddr = 192.168.0.101:9876
		brokerIP1 = 192.168.0.101

4、DESC: service not available now, maybe disk full, CL:
	解決:修改啟動腳本runbroker.sh,在里面增加一句話即可:		
	JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
	(磁盤保護的百分比設置成98%,只有磁盤空間使用率達到98%時才拒絕接收producer消息)
	
常見問題處理
	https://blog.csdn.net/sqzhao/article/details/54834761
	https://blog.csdn.net/mayifan0/article/details/67633729
	https://blog.csdn.net/a906423355/article/details/78192828

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM