1.前言
RabbleMQ這種消息中間件,主要的功能是使用異步操作,來達到解耦的目的,比如,有系統a和系統b,
系統a通過消息中間件通知系統b來做業務,那么系統a只需要把要做的事情【也就是消息】發給消息中間件后,
消息中間件就會把消息轉發給系統b,系統a不需要關心系統b是怎么完成業務的,也不需要關心業務完成的結果,
這是就是異步操作。
如果系統a希望獲得系統b的處理結果,那么系統a使用消息中間件發送消息后需要原地等待,做阻塞操作,但是
等待時長不能超過最大超時時間,可設置RabbleMQ自定義超時時間,這樣還不如直接調用該業務呢,何必再加個消息消息中間件通知他來做?
所以,一般不會做這樣的同步阻塞操作,違背了消息中間件的開發初衷【提高吞吐量和系統業務的響應速度】,雖然
不影響解耦度,但是這樣的操作使得消息中間件變得不倫不類了。
那么要問了,使用RabbleMQ消息中間件的具體好處是什么?
第一,解耦,是非常明顯的好處,通過消息中間件,系統a通知系統b做什么業務,不需要關心系統b是怎么實現業務的。
第二,異步,非必要的業務,不需要特別關心結果的業務,可以寫入消息中間件以異步方式操作,橫向編程,加快響應速度。
第三,削峰,並發量大的時候直接將所有數據懟到數據庫,數據庫會異常的,使用消息隊列的特性,所有的操作會進行排隊,
消息也不會丟失,當消息被消費后才會銷毀,比如秒殺系統就是這樣實現的。
那么,雖然同步阻塞操作偶很大的閉端,那么到底該怎么操作?
這篇隨筆 以 隨筆 https://www.cnblogs.com/c2g5201314/p/13156932.html 為基礎 ,修改部分代碼實現演示
2.操作
(1)修改消息生產者的消息生產類的 rabbitTemplate 模板發送消息方法 ,同步則使用 convertSendAndReceive(),參數與異步非阻塞操作的一樣。

源碼
package com.example.rabbitmqproducer1004.rabbitmqFactory; import com.example.rabbitmqproducer1004.config.RabbitmqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.UUID; /** * 消息生產類 */ @Component //實現接口 public class SendMessage { //需要設置回調方法,獲取消費結果才需要實現 RabbitTemplate.ConfirmCallback 接口, //public class SendMessage implements RabbitTemplate.ConfirmCallback { Logger logger = LoggerFactory.getLogger(this.getClass()); //====================================================================== // /** // * 方法一:設置回調方法,獲取消費結果, // * <p> // * 缺點是:必須手動配置RabbitTemplate模板 ,代碼量大 // */ // // //存儲 rabbitmq模板的臨時變量 // private final RabbitTemplate rabbitTemplate; // // /** // * 構造注入rabbitmq模板,這樣可以設置回調方法,獲取消費結果,但是必須手動配置RabbitTemplate模板 // */ // @Autowired // public SendMessage(RabbitTemplate rabbitTemplate) { // this.rabbitTemplate = rabbitTemplate; // //設置確認回調的方法,參數類型為ConfirmCallback // this.rabbitTemplate.setConfirmCallback(this); // } // // /** // * 回調方法,獲取消費結果 // * // * @param correlationData 關聯數據 // * @param b 消息是否被消費成功,成功為true ,失敗為false // * @param s 原因 ,消費成功則返回null,否則返回失敗原因 // */ // @Override // public void confirm(CorrelationData correlationData, boolean b, String s) { // logger.warn("回調的連接數據:" + correlationData); // if (correlationData != null) { // //CorrelationData [id=1bcab025-2b4c-4f74-a22d-41007e30f551] // logger.warn("獲取correlationData的id值:" + correlationData.getId()); // } // //1bcab025-2b4c-4f74-a22d-41007e30f551 // if (b) { // logger.warn("回調結果:消息消費成功"); // } else { // logger.warn("回調結果:失敗。原因:" + s); // } // } //======================================================================== // /** // * 方法二 :不需要獲取獲取消費結果,只需要發送即可 // * // * 優點:自動裝配,代碼量少 // */ @Autowired private RabbitTemplate rabbitTemplate; //======================================================================== /** * 發送消息 * <p> * 參數是消息內容 */ public void send(String message) { logger.warn("發送消息,內容:" + message); /** * 方法一:異步操作,不等待消費者端返回處理結果,設置在回調操作的關聯數據,用於識別是哪一條消息和確認是否執行成功 */ //// 實例關聯數據對象,使用UUID隨機數 作為 回調id // CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //// 發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串、關聯數據 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message,correlationData); /** * 方法二:異步操作,不等待消費者端返回處理結果,且在消息回調操作的關聯數據為null,如果不做回調操作,則建議這樣使用 */ // //發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message); /** * 方法三:同步操作,等待消費者端返回處理結果 */ Object dd = rabbitTemplate.convertSendAndReceive(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message); logger.warn("你大爺的,有么有延遲?是不是同步阻塞?"); logger.warn("結果是什么???==" + dd); } }
(2)修改消息消費者的監聽方法 ,可設置任意類型返回值,但是在生產者端需要解析,我這里使用 字符串,休眠3秒

源碼
package com.example.rabbitmqconsumer1002.rabbitmqListener; import com.example.rabbitmqconsumer1002.config.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息監聽類--發短信 */ //注冊bean @Component //設置需要監聽的消息隊列 @RabbitListener(queues = RabbitConfig.QUEUE_1) public class SendMessageListener { Logger logger = LoggerFactory.getLogger(getClass()); //消息事件處理--有返回結果 @RabbitHandler public String sendMessage(String msg) throws InterruptedException { logger.warn("我是端口1002的消費者,收到信息:" + msg); logger.warn("休眠3秒"); Thread.sleep(3000); logger.warn("休眠結束"); return "發送成功,是同步阻塞的么?"; } }
(3)啟動工程
訪問網址 http://localhost:1004/mq?msg=你大爺,幫我發短信3999

查看消費者端控制台打印

查看生產者端控制台打印

生產者端等待了3秒后才收到結果
3.如果換成10秒會怎么樣?
修改消息消費者的監聽方法

啟動工程后 訪問 網址 http://localhost:1004/mq?msg=你大爺,幫我發短信3999
查看消費者端控制台打印

查看生產者端控制台打印


打印控制台源碼
2020-06-19 02:40:17.100 WARN http-nio-1004-exec-4 | com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage | 發送消息,內容:你大爺,幫我發短信3999 2020-06-19 02:40:17.111 INFO http-nio-1004-exec-4 | org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler | Initializing ExecutorService 2020-06-19 02:40:17.117 INFO http-nio-1004-exec-4 | org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer | Container initialized for queues: [amq.rabbitmq.reply-to] 2020-06-19 02:40:17.128 INFO http-nio-1004-exec-4 | org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer | SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-pPnAaXXfnkytocGRd7ES5Q identity=7ba94c9e] started 2020-06-19 02:40:22.137 WARN http-nio-1004-exec-4 | com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage | 你大爺的,有么有延遲?是不是同步阻塞? 2020-06-19 02:40:22.137 WARN http-nio-1004-exec-4 | com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage | 結果是什么???==null 2020-06-19 02:40:27.158 WARN pool-1-thread-4 | org.springframework.amqp.rabbit.core.RabbitTemplate | Reply received after timeout for 1 2020-06-19 02:40:27.163 WARN pool-1-thread-4 | org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler | Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1651) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:996) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3] at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221] Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2535) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] ... 11 common frames omitted 2020-06-19 02:40:27.164 ERROR pool-1-thread-4 | org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer | Failed to invoke listener org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1651) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:996) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE] at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3] at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221] Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
拋出了監聽異常,顯然是等待超時了,然后繼續執行執行下面的操作,返回結果以null處理
4.分析與思考
如果等待10分鍾才能運行完這個業務,使用同步阻塞操作,豈不是需要等10分鍾?那還要消息中間件有什么用?
因此,一般是使用異步非阻塞操作消息中間件的,因此,會盡可能的把一些不需要返回結果的操作用消息中間件轉發完成。
對於失敗和拋出異常的操作,可以制作一個死信隊列,將這些無法完成的消息重寫放入死信隊列,讓專門的系統監聽進行處理,一般這樣的業務很少出現,基本上是可以成功完成的,
對那些重要的,必須獲取處理結果的業務,直接調用實現就好了,因為這樣是默認同步阻塞操作的,不需要再轉一趟消息中間件啦。
