RocketMq學習筆記03---JAVA代碼實現RocketMQ消息發送和接收


1.引入rocketmq的依賴

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

2.啟動Rocketmq-dashboard管理台

具體參見:https://www.cnblogs.com/luckyplj/p/16007605.html

3. 向RocketMq發送消息

3.1 配置RocketMq發送消息的相關屬性

rocketmq.address=127.0.0.1:9876
rocketmq.producer.groupId=FLEP_FILE
rocketmq.producer.sendMsgTimeout=10000
rocketmq.producer.retryWhenSendFailed=3

3.2 發送fileId和createDate等內容到RocketMq

package com.ttbank.flep.core.controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ttbank.flep.core.util.PropertyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Random;

/**
 * @Author lucky
 * @Date 2022/3/15 15:50
 */
@Slf4j
@RestController
@RequestMapping("/rocketmq")
public class RocketMqController {

    @PostMapping("/sendMq")
    public void sendMq(){
        // 1 獲取消息生產者
        DefaultMQProducer defaultMQProducer = getRocketMqProducer();

        // 2 啟動生產者
        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        // 3 構建消息對象,主要是設置消息的主題、標簽、內容
        JSONObject jsonObject = generateMsgContent();
        Message message = new Message("lucky-topic", "lucky-tag", jsonObject.toString().getBytes());
        // 4 發送消息
        SendResult result = null;
        try {
            result = defaultMQProducer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("SendResult-->" + result);
        // TODO 6 關閉生產者
        defaultMQProducer.shutdown();

    }

    /**
     * 讀取配置文件中設置的rocketmq相關屬性,創建消息生產者
     */
    private DefaultMQProducer getRocketMqProducer(){
        String mqAddress = PropertyUtil.getProperties("rocketmq.address");
        String groupId = PropertyUtil.getProperties("rocketmq.producer.groupId");
        String msgTimeout = PropertyUtil.getProperties("rocketmq.producer.sendMsgTimeout");
        String retryWhenSendFailed = PropertyUtil.getProperties("rocketmq.producer.retryWhenSendFailed");
        // 1 創建消息生產者,指定生成組名
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(groupId);
        // 2 指定NameServer的地址
        defaultMQProducer.setNamesrvAddr(mqAddress);
        // 3 設置消息超時時間
        defaultMQProducer.setSendMsgTimeout(Integer.parseInt(msgTimeout));
        // 4 同步發送消息,如果SendMsgTimeout時間內沒有發送成功,則重試retryWhenSendFailed次
        defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt(retryWhenSendFailed));
        return defaultMQProducer;

    }

    /**
     * 模擬生成消息體的內容
     */
    private JSONObject generateMsgContent(){
        JSONObject jsonObject=new JSONObject();
        Random random=new Random();
        int fileId = random.nextInt(10000);
        jsonObject.put("fileId",String.valueOf(fileId));
        LocalDateTime localDateTime=LocalDateTime.now();
        String fileCreateDate = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        jsonObject.put("fileCreateDate",fileCreateDate );
        return jsonObject;
    }
}

3.3 postman測試

控制台輸出:

2022-03-16 17:39:19.776 |  INFO  | http-nio-7010-exec-3 | com.ttbank.flep.core.interceptor.AccessLogInterceptor:18 | [] -進入到攔截器AccessLogInterceptor中:preHandle() 方法
2022-03-16 17:39:19.780 |  INFO  | http-nio-7010-exec-3 | com.ttbank.flep.core.interceptor.AccessLogInterceptor:20 | [] -接收到來自[null]請求
SendResult-->SendResult [sendStatus=SEND_OK, msgId=AC1210C6D1C818B4AAC2510941600002, offsetMsgId=AC1210C600002A9F000000000000027D, messageQueue=MessageQueue [topic=lucky-topic, brokerName=LAPTOP-NCTKP3GJ, queueId=2], queueOffset=0]
2022-03-16 17:39:21.076 |  INFO  | NettyClientSelector_1 | RocketmqRemoting:95 | [] -closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
2022-03-16 17:39:21.077 |  INFO  | NettyClientSelector_1 | RocketmqRemoting:95 | [] -closeChannel: close the connection to remote address[172.18.16.198:10911] result: true
2022-03-16 17:39:21.078 |  INFO  | NettyClientSelector_1 | RocketmqRemoting:95 | [] -closeChannel: close the connection to remote address[172.18.16.198:10909] result: true

3.4 Rocketmq-dashboard管理台查看消息內容

4.從RocketMq接收消息

4.1 配置RocketMq發送消息的相關屬性

rocketmq.address=127.0.0.1:9876
rocketmq.consumer.consumerGroup=FLEP-CONSUMER-TEST

4.2 java代碼

package com.ttbank.flep.core.controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ttbank.flep.core.util.PropertyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Random;

/**
 * @Author lucky
 * @Date 2022/3/15 15:50
 */
@Slf4j
@RestController
@RequestMapping("/rocketmq")
public class RocketMqController {

    @PostMapping("/receiveMqMsg")
    public void receiveMqMsg(){
        // 1 獲取消息消費者
        DefaultMQPushConsumer defaultMQPushConsumer = getRocketMqConsumer();

        // 2 進行訂閱:注冊回調函數,編寫處理消息的邏輯
        defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {

            // try catch(throwable)確保不會因為業務邏輯的異常,導致消息出現重復消費的現象
            // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中會對Throwable進行捕獲,
            //並且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
            try {
                System.out.println("收到消息--》" + list);
                for (MessageExt messageExt : list) {
                   String message=new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET);
                   JSONObject object=JSONObject.parseObject(message);
                    String fileId = (String) object.get("fileId");
                    String fileCreateDate = (String) object.get("fileCreateDate");
                    log.info(fileId+":"+fileCreateDate);
                }

            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 5 啟動消費者
        try {
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("消費者啟動成功。。。");

    }


    private DefaultMQPushConsumer getRocketMqConsumer(){
        String mqAddress = PropertyUtil.getProperties("rocketmq.address");
        String consumerGroup = PropertyUtil.getProperties("rocketmq.consumer.consumerGroup");

        // 1 創建消費者,指定所屬的消費者組名
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        // 2 指定NameServer的地址
        defaultMQPushConsumer.setNamesrvAddr(mqAddress);
        // 3 指定消費者訂閱的主題和標簽
        try {
            defaultMQPushConsumer.subscribe("lucky-topic", "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return defaultMQPushConsumer;

    }

}

4.3 postman測試

控制台輸出:

消費者啟動成功。。。
收到消息--》[MessageExt [queueId=9, storeSize=225, queueOffset=1, sysFlag=0, bornTimestamp=1647425585106, bornHost=/172.18.16.198:50655, storeTimestamp=1647425585109, storeHost=/172.18.16.198:10911, msgId=AC1210C600002A9F00000000000005FB, commitLogOffset=1531, bodyCRC=1406563058, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='lucky-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1647425739660, UNIQ_KEY=AC1210C6E21818B4AAC2512823D20001, WAIT=true, TAGS=lucky-tag}, body=[123, 34, 102, 105, 108, 101, 67, 114, 101, 97, 116, 101, 68, 97, 116, 101, 34, 58, 34, 50, 48, 50, 50, 45, 48, 51, 45, 49, 54, 32, 49, 56, 58, 49, 51, 58, 48, 53, 34, 44, 34, 102, 105, 108, 101, 73, 100, 34, 58, 34, 56, 55, 52, 55, 34, 125], transactionId='null'}]]
收到消息--》[MessageExt [queueId=1, storeSize=224, queueOffset=0, sysFlag=0, bornTimestamp=1647425511883, bornHost=/172.18.16.198:50575, storeTimestamp=1647425511888, storeHost=/172.18.16.198:10911, msgId=AC1210C600002A9F000000000000051B, commitLogOffset=1307, bodyCRC=1064386839, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='lucky-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1647425739660, UNIQ_KEY=AC1210C6E21818B4AAC2512705CA0000, WAIT=true, TAGS=lucky-tag}, body=[123, 34, 102, 105, 108, 101, 67, 114, 101, 97, 116, 101, 68, 97, 116, 101, 34, 58, 34, 50, 48, 50, 50, 45, 48, 51, 45, 49, 54, 32, 49, 56, 58, 49, 49, 58, 53, 48, 34, 44, 34, 102, 105, 108, 101, 73, 100, 34, 58, 34, 51, 50, 55, 34, 125], transactionId='null'}]]
2022-03-16 18:15:39.660 |  INFO  | ConsumeMessageThread_2 | com.ttbank.flep.core.controller.RocketMqController:117 | [] -8747:2022-03-16 18:13:05
2022-03-16 18:15:39.660 |  INFO  | ConsumeMessageThread_1 | com.ttbank.flep.core.controller.RocketMqController:117 | [] -327:2022-03-16 18:11:50

此時,對比Rocketmq-dashboard管理台中查看到的消息內容

 

參考文獻:https://blog.csdn.net/qq_31155349/article/details/108626778


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM