RabbitMQ --- 直連交換機 【 同步操作,等到消費者處理完后返回處理結果 】


1.前言

  RabbleMQ這種消息中間件,主要的功能是使用異步操作,來達到解耦的目的,比如,有系統a和系統b,

系統a通過消息中間件通知系統b來做業務,那么系統a只需要把要做的事情【也就是消息】發給消息中間件后,

消息中間件就會把消息轉發給系統b,系統a不需要關心系統b是怎么完成業務的,也不需要關心業務完成的結果,

這是就是異步操作。

  如果系統a希望獲得系統b的處理結果,那么系統a使用消息中間件發送消息后需要原地等待,做阻塞操作,但是

等待時長不能超過最大超時時間,可設置RabbleMQ自定義超時時間,這樣還不如直接調用該業務呢,何必再加個消息消息中間件通知他來做?

  所以,一般不會做這樣的同步阻塞操作,違背了消息中間件的開發初衷【提高吞吐量和系統業務的響應速度】,雖然

不影響解耦度,但是這樣的操作使得消息中間件變得不倫不類了。

  那么要問了,使用RabbleMQ消息中間件的具體好處是什么?

  第一,解耦,是非常明顯的好處,通過消息中間件,系統a通知系統b做什么業務,不需要關心系統b是怎么實現業務的。
  第二,異步,非必要的業務,不需要特別關心結果的業務,可以寫入消息中間件以異步方式操作,橫向編程,加快響應速度。
  第三,削峰,並發量大的時候直接將所有數據懟到數據庫,數據庫會異常的,使用消息隊列的特性,所有的操作會進行排隊,
    消息也不會丟失,當消息被消費后才會銷毀,比如秒殺系統就是這樣實現的。

  那么,雖然同步阻塞操作偶很大的閉端,那么到底該怎么操作?

這篇隨筆 以 隨筆 https://www.cnblogs.com/c2g5201314/p/13156932.html 為基礎 ,修改部分代碼實現演示

2.操作

(1)修改消息生產者的消息生產類的 rabbitTemplate 模板發送消息方法 ,同步則使用 convertSendAndReceive(),參數與異步非阻塞操作的一樣。

 

 

 

 源碼

package com.example.rabbitmqproducer1004.rabbitmqFactory;


import com.example.rabbitmqproducer1004.config.RabbitmqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * 消息生產類
 */
@Component
//實現接口
public class SendMessage  {

//需要設置回調方法,獲取消費結果才需要實現 RabbitTemplate.ConfirmCallback 接口,
//public class SendMessage implements RabbitTemplate.ConfirmCallback {
    Logger logger = LoggerFactory.getLogger(this.getClass());

//======================================================================
//    /**
//     * 方法一:設置回調方法,獲取消費結果,
//     * <p>
//     * 缺點是:必須手動配置RabbitTemplate模板 ,代碼量大
//     */
//
//    //存儲 rabbitmq模板的臨時變量
//    private final RabbitTemplate rabbitTemplate;
//
//    /**
//     * 構造注入rabbitmq模板,這樣可以設置回調方法,獲取消費結果,但是必須手動配置RabbitTemplate模板
//     */
//    @Autowired
//    public SendMessage(RabbitTemplate rabbitTemplate) {
//        this.rabbitTemplate = rabbitTemplate;
//        //設置確認回調的方法,參數類型為ConfirmCallback
//        this.rabbitTemplate.setConfirmCallback(this);
//    }
//
//    /**
//     * 回調方法,獲取消費結果
//     *
//     * @param correlationData 關聯數據
//     * @param b               消息是否被消費成功,成功為true ,失敗為false
//     * @param s               原因 ,消費成功則返回null,否則返回失敗原因
//     */
//    @Override
//    public void confirm(CorrelationData correlationData, boolean b, String s) {
//        logger.warn("回調的連接數據:" + correlationData);
//        if (correlationData != null) {
//            //CorrelationData [id=1bcab025-2b4c-4f74-a22d-41007e30f551]
//            logger.warn("獲取correlationData的id值:" + correlationData.getId());
//        }
//        //1bcab025-2b4c-4f74-a22d-41007e30f551
//        if (b) {
//            logger.warn("回調結果:消息消費成功");
//        } else {
//            logger.warn("回調結果:失敗。原因:" + s);
//        }
//    }

//========================================================================
//    /**
//     * 方法二 :不需要獲取獲取消費結果,只需要發送即可
//     *
//     * 優點:自動裝配,代碼量少
//     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

//========================================================================

    /**
     * 發送消息
     * <p>
     * 參數是消息內容
     */
    public void send(String message) {
        logger.warn("發送消息,內容:" + message);
        /**
         * 方法一:異步操作,不等待消費者端返回處理結果,設置在回調操作的關聯數據,用於識別是哪一條消息和確認是否執行成功
         */
////        實例關聯數據對象,使用UUID隨機數 作為 回調id
//        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
////        發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串、關聯數據
//        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message,correlationData);
        /**
         * 方法二:異步操作,不等待消費者端返回處理結果,且在消息回調操作的關聯數據為null,如果不做回調操作,則建議這樣使用
         */
//        //發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串
//        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message);
        /**
         * 方法三:同步操作,等待消費者端返回處理結果
         */
        Object dd = rabbitTemplate.convertSendAndReceive(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message);
        logger.warn("你大爺的,有么有延遲?是不是同步阻塞?");
        logger.warn("結果是什么???==" + dd);


    }
}
View Code

 

 (2)修改消息消費者的監聽方法 ,可設置任意類型返回值,但是在生產者端需要解析,我這里使用 字符串,休眠3秒

 

 源碼

package com.example.rabbitmqconsumer1002.rabbitmqListener;

import com.example.rabbitmqconsumer1002.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息監聽類--發短信
 */
//注冊bean
@Component
//設置需要監聽的消息隊列
@RabbitListener(queues = RabbitConfig.QUEUE_1)
public class SendMessageListener {
    Logger logger = LoggerFactory.getLogger(getClass());



    //消息事件處理--有返回結果
    @RabbitHandler
    public String sendMessage(String msg) throws InterruptedException {
        logger.warn("我是端口1002的消費者,收到信息:" + msg);
        logger.warn("休眠3秒");
        Thread.sleep(3000);
        logger.warn("休眠結束");
        return "發送成功,是同步阻塞的么?";
    }
}
View Code

(3)啟動工程

訪問網址     http://localhost:1004/mq?msg=你大爺,幫我發短信3999

 

 

查看消費者端控制台打印

 

 查看生產者端控制台打印

 

 

 生產者端等待了3秒后才收到結果

 

3.如果換成10秒會怎么樣?

 修改消息消費者的監聽方法

 

 

 啟動工程后 訪問 網址  http://localhost:1004/mq?msg=你大爺,幫我發短信3999

查看消費者端控制台打印

 

 

  查看生產者端控制台打印

 

 

 打印控制台源碼

2020-06-19 02:40:17.100 WARN  http-nio-1004-exec-4 | com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage | 發送消息,內容:你大爺,幫我發短信3999
2020-06-19 02:40:17.111 INFO  http-nio-1004-exec-4 | org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler | Initializing ExecutorService
2020-06-19 02:40:17.117 INFO  http-nio-1004-exec-4 | org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer | Container initialized for queues: [amq.rabbitmq.reply-to]
2020-06-19 02:40:17.128 INFO  http-nio-1004-exec-4 | org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer | SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-pPnAaXXfnkytocGRd7ES5Q identity=7ba94c9e] started
2020-06-19 02:40:22.137 WARN  http-nio-1004-exec-4 | com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage | 你大爺的,有么有延遲?是不是同步阻塞?
2020-06-19 02:40:22.137 WARN  http-nio-1004-exec-4 | com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage | 結果是什么???==null
2020-06-19 02:40:27.158 WARN  pool-1-thread-4 | org.springframework.amqp.rabbit.core.RabbitTemplate | Reply received after timeout for 1
2020-06-19 02:40:27.163 WARN  pool-1-thread-4 | org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler | Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1651) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:996) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2535) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    ... 11 common frames omitted
2020-06-19 02:40:27.164 ERROR pool-1-thread-4 | org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer | Failed to invoke listener
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1651) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:996) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
View Code

 

 拋出了監聽異常,顯然是等待超時了,然后繼續執行執行下面的操作,返回結果以null處理

 

4.分析與思考

如果等待10分鍾才能運行完這個業務,使用同步阻塞操作,豈不是需要等10分鍾?那還要消息中間件有什么用?

因此,一般是使用異步非阻塞操作消息中間件的,因此,會盡可能的把一些不需要返回結果的操作用消息中間件轉發完成。

對於失敗和拋出異常的操作,可以制作一個死信隊列,將這些無法完成的消息重寫放入死信隊列,讓專門的系統監聽進行處理,一般這樣的業務很少出現,基本上是可以成功完成的,

對那些重要的,必須獲取處理結果的業務,直接調用實現就好了,因為這樣是默認同步阻塞操作的,不需要再轉一趟消息中間件啦。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM