1.引入rocketmq的依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
2.啟動Rocketmq-dashboard管理台
具體參見:https://www.cnblogs.com/luckyplj/p/16007605.html
3. 向RocketMq發送消息
3.1 配置RocketMq發送消息的相關屬性
rocketmq.address=127.0.0.1:9876 rocketmq.producer.groupId=FLEP_FILE rocketmq.producer.sendMsgTimeout=10000 rocketmq.producer.retryWhenSendFailed=3
3.2 發送fileId和createDate等內容到RocketMq
package com.ttbank.flep.core.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ttbank.flep.core.util.PropertyUtil; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Random; /** * @Author lucky * @Date 2022/3/15 15:50 */ @Slf4j @RestController @RequestMapping("/rocketmq") public class RocketMqController { @PostMapping("/sendMq") public void sendMq(){ // 1 獲取消息生產者 DefaultMQProducer defaultMQProducer = getRocketMqProducer(); // 2 啟動生產者 try { defaultMQProducer.start(); } catch (MQClientException e) { e.printStackTrace(); } // 3 構建消息對象,主要是設置消息的主題、標簽、內容 JSONObject jsonObject = generateMsgContent(); Message message = new Message("lucky-topic", "lucky-tag", jsonObject.toString().getBytes()); // 4 發送消息 SendResult result = null; try { result = defaultMQProducer.send(message); } catch (Exception e) { e.printStackTrace(); } System.out.println("SendResult-->" + result); // TODO 6 關閉生產者 defaultMQProducer.shutdown(); } /** * 讀取配置文件中設置的rocketmq相關屬性,創建消息生產者 */ private DefaultMQProducer getRocketMqProducer(){ String mqAddress = PropertyUtil.getProperties("rocketmq.address"); String groupId = PropertyUtil.getProperties("rocketmq.producer.groupId"); String msgTimeout = PropertyUtil.getProperties("rocketmq.producer.sendMsgTimeout"); String retryWhenSendFailed = PropertyUtil.getProperties("rocketmq.producer.retryWhenSendFailed"); // 1 創建消息生產者,指定生成組名 DefaultMQProducer defaultMQProducer = new DefaultMQProducer(groupId); // 2 指定NameServer的地址 defaultMQProducer.setNamesrvAddr(mqAddress); // 3 設置消息超時時間 defaultMQProducer.setSendMsgTimeout(Integer.parseInt(msgTimeout)); // 4 同步發送消息,如果SendMsgTimeout時間內沒有發送成功,則重試retryWhenSendFailed次 defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt(retryWhenSendFailed)); return defaultMQProducer; } /** * 模擬生成消息體的內容 */ private JSONObject generateMsgContent(){ JSONObject jsonObject=new JSONObject(); Random random=new Random(); int fileId = random.nextInt(10000); jsonObject.put("fileId",String.valueOf(fileId)); LocalDateTime localDateTime=LocalDateTime.now(); String fileCreateDate = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); jsonObject.put("fileCreateDate",fileCreateDate ); return jsonObject; } }
3.3 postman測試
控制台輸出:
2022-03-16 17:39:19.776 | INFO | http-nio-7010-exec-3 | com.ttbank.flep.core.interceptor.AccessLogInterceptor:18 | [] -進入到攔截器AccessLogInterceptor中:preHandle() 方法 2022-03-16 17:39:19.780 | INFO | http-nio-7010-exec-3 | com.ttbank.flep.core.interceptor.AccessLogInterceptor:20 | [] -接收到來自[null]請求 SendResult-->SendResult [sendStatus=SEND_OK, msgId=AC1210C6D1C818B4AAC2510941600002, offsetMsgId=AC1210C600002A9F000000000000027D, messageQueue=MessageQueue [topic=lucky-topic, brokerName=LAPTOP-NCTKP3GJ, queueId=2], queueOffset=0] 2022-03-16 17:39:21.076 | INFO | NettyClientSelector_1 | RocketmqRemoting:95 | [] -closeChannel: close the connection to remote address[127.0.0.1:9876] result: true 2022-03-16 17:39:21.077 | INFO | NettyClientSelector_1 | RocketmqRemoting:95 | [] -closeChannel: close the connection to remote address[172.18.16.198:10911] result: true 2022-03-16 17:39:21.078 | INFO | NettyClientSelector_1 | RocketmqRemoting:95 | [] -closeChannel: close the connection to remote address[172.18.16.198:10909] result: true
3.4 Rocketmq-dashboard管理台查看消息內容
4.從RocketMq接收消息
4.1 配置RocketMq發送消息的相關屬性
rocketmq.address=127.0.0.1:9876
rocketmq.consumer.consumerGroup=FLEP-CONSUMER-TEST
4.2 java代碼
package com.ttbank.flep.core.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ttbank.flep.core.util.PropertyUtil; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Random; /** * @Author lucky * @Date 2022/3/15 15:50 */ @Slf4j @RestController @RequestMapping("/rocketmq") public class RocketMqController { @PostMapping("/receiveMqMsg") public void receiveMqMsg(){ // 1 獲取消息消費者 DefaultMQPushConsumer defaultMQPushConsumer = getRocketMqConsumer(); // 2 進行訂閱:注冊回調函數,編寫處理消息的邏輯 defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> { // try catch(throwable)確保不會因為業務邏輯的異常,導致消息出現重復消費的現象 // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中會對Throwable進行捕獲, //並且返回ConsumeConcurrentlyStatus.RECONSUME_LATER try { System.out.println("收到消息--》" + list); for (MessageExt messageExt : list) { String message=new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET); JSONObject object=JSONObject.parseObject(message); String fileId = (String) object.get("fileId"); String fileCreateDate = (String) object.get("fileCreateDate"); log.info(fileId+":"+fileCreateDate); } } catch (Throwable throwable) { throwable.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 5 啟動消費者 try { defaultMQPushConsumer.start(); } catch (MQClientException e) { e.printStackTrace(); } System.out.println("消費者啟動成功。。。"); } private DefaultMQPushConsumer getRocketMqConsumer(){ String mqAddress = PropertyUtil.getProperties("rocketmq.address"); String consumerGroup = PropertyUtil.getProperties("rocketmq.consumer.consumerGroup"); // 1 創建消費者,指定所屬的消費者組名 DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup); // 2 指定NameServer的地址 defaultMQPushConsumer.setNamesrvAddr(mqAddress); // 3 指定消費者訂閱的主題和標簽 try { defaultMQPushConsumer.subscribe("lucky-topic", "*"); } catch (MQClientException e) { e.printStackTrace(); } return defaultMQPushConsumer; } }
4.3 postman測試
控制台輸出:
消費者啟動成功。。。 收到消息--》[MessageExt [queueId=9, storeSize=225, queueOffset=1, sysFlag=0, bornTimestamp=1647425585106, bornHost=/172.18.16.198:50655, storeTimestamp=1647425585109, storeHost=/172.18.16.198:10911, msgId=AC1210C600002A9F00000000000005FB, commitLogOffset=1531, bodyCRC=1406563058, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='lucky-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1647425739660, UNIQ_KEY=AC1210C6E21818B4AAC2512823D20001, WAIT=true, TAGS=lucky-tag}, body=[123, 34, 102, 105, 108, 101, 67, 114, 101, 97, 116, 101, 68, 97, 116, 101, 34, 58, 34, 50, 48, 50, 50, 45, 48, 51, 45, 49, 54, 32, 49, 56, 58, 49, 51, 58, 48, 53, 34, 44, 34, 102, 105, 108, 101, 73, 100, 34, 58, 34, 56, 55, 52, 55, 34, 125], transactionId='null'}]] 收到消息--》[MessageExt [queueId=1, storeSize=224, queueOffset=0, sysFlag=0, bornTimestamp=1647425511883, bornHost=/172.18.16.198:50575, storeTimestamp=1647425511888, storeHost=/172.18.16.198:10911, msgId=AC1210C600002A9F000000000000051B, commitLogOffset=1307, bodyCRC=1064386839, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='lucky-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1647425739660, UNIQ_KEY=AC1210C6E21818B4AAC2512705CA0000, WAIT=true, TAGS=lucky-tag}, body=[123, 34, 102, 105, 108, 101, 67, 114, 101, 97, 116, 101, 68, 97, 116, 101, 34, 58, 34, 50, 48, 50, 50, 45, 48, 51, 45, 49, 54, 32, 49, 56, 58, 49, 49, 58, 53, 48, 34, 44, 34, 102, 105, 108, 101, 73, 100, 34, 58, 34, 51, 50, 55, 34, 125], transactionId='null'}]] 2022-03-16 18:15:39.660 | INFO | ConsumeMessageThread_2 | com.ttbank.flep.core.controller.RocketMqController:117 | [] -8747:2022-03-16 18:13:05 2022-03-16 18:15:39.660 | INFO | ConsumeMessageThread_1 | com.ttbank.flep.core.controller.RocketMqController:117 | [] -327:2022-03-16 18:11:50
此時,對比Rocketmq-dashboard管理台中查看到的消息內容
參考文獻:https://blog.csdn.net/qq_31155349/article/details/108626778