spring boot整合RabbitMQ詳解;消息的確認機制,發送確認(ConfirmCallback, ReturnsCallback),消費手動確認(ACK)


簡介

什么叫消息隊列?

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

消息隊列的應用場景

可以看出消息隊列是一種應用間的異步協作機制,那什么時候需要使用 MQ 呢?

以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,隨着業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。

以上是用於業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。

RabbitMQ 特點

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標准,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

RabbitMQ 最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ 基本概念

RabbitMQ 內部結構
image

1、Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
2、Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
3、Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
4、Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
5、Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
6、Connection
網絡連接,比如一個TCP連接。
7、Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
8、Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
9、Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
10、Broker
表示消息隊列服務器實體。

Exchange 類型

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。
headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了
下面的代碼中也不會編寫 headers類型的代碼

Direct Exchange (直連型交換機)

直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。

大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶着路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。
image

Fanout Exchange(扇型交換機)

扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
image

Topic Exchange(主題交換機)

主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:

*  (星號) 用來表示一個單詞 (必須出現的)
#  (井號) 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 *.TT.*          隊列Q2綁定鍵為  TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;

image

RabbitMQ安裝

一、這是RabbitMQ的官網地址(https://www.rabbitmq.com/install-windows.html) 可以按照官網的文檔進行安裝
ps:看了下官方文檔rabbitMQ是用Erlang 語言開發的,所以需要先安裝Erlang 語言。
二、docker-compose安裝
建議通過docker進行安裝,畢竟用docker命令安裝很方便,下面就是我用docker-compose安裝rabbitmq的yml文件配置:

version: '3'
services:
  my_rabbitMQ:
    image: "rabbitmq:3.8.3-management"
    container_name: my_rabbitMQ
    restart: always
    privileged: true
    ports:
      - "15672:15672"
      - "5672:5672"
    environment: 
     - "RABBITMQ_DEFAULT_USER=rabbitMQ"
     - "RABBITMQ_DEFAULT_PASS=rabbitMQ"

docker-compose 安裝rabbitMQ命令

docker-compose -f rabbitMq-docker-compose.yml up -d

1、docker-compose安裝成功后
2、瀏覽器訪問 http://127.0.0.1:15672/ ,並輸入賬號/密碼 rabbitMQ/rabbitMQ 能正常訪問rabbitMQ管理界面,就代表安裝完成。

image
image
image

spring boot集成RabbitMQ(編碼)

項目有兩個rabbit-provider(生產者)和rabbit-consumer(消費者)

集成rabbitMQ 主要需要依賴spring-boot-starter-amqp;
java-testdata-generator 是我在gitee上看到 隨機測試數據生成器,包括身份證號碼,銀行卡號,姓名,漢字、手機號,電子郵箱地址和生成insert sql參數列表字符串等的工具包。主要是用來不讓測試數據看起來那么單調;

maven依賴
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.4</version>
        </dependency>

        <!--測試接口 添加swagger start-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.1</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.1</version>
        </dependency>
        <!--測試接口 添加swagger end-->

        <!--Java實現的各種隨機測試數據生成器,包括身份證號碼,銀行卡號,姓名,漢字、手機號 start-->
        <dependency>
            <groupId>com.github.binarywang</groupId>
            <artifactId>java-testdata-generator</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!--Java實現的各種隨機測試數據生成器,包括身份證號碼,銀行卡號,姓名,漢字、手機號 end-->
application.yml配置
server:
  port: 8999
spring:
  #項目名稱
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服務器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: rabbitMQ
    password: rabbitMQ
#    #虛擬host 可以不設置,使用server默認host
#    virtual-host: xkq
    #確認消息已發送到交換機(Exchange)
#    publisher-confirm-type: SIMPLE
    publisher-confirm-type: CORRELATED
    #確認消息已發送到隊列(Queue)
    publisher-returns: true

Direct Exchange (直連型交換機)

項目rabbit-provider(生產者)
DirectRabbitConfig配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {

    public static final String TestDirectQueue = "TestDirectQueue";
    public static final String TestDirectExchange = "TestDirectExchange";
    public static final String TestDirectRouting = "TestDirectRouting";

    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue(TestDirectQueue,true);
    }

    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange(TestDirectExchange,true,false);
    }

    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue())
                .to(TestDirectExchange()).with(TestDirectRouting);
    }

}

SendDirectMessageController業務發送消息
import cn.binarywang.tools.generator.ChineseAddressGenerator;
import cn.binarywang.tools.generator.ChineseNameGenerator;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.example.rabbitmqprovider.direct.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


@RestController
public class SendDirectMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法

    @ResponseBody
    @GetMapping("/sendDirectMessage")
    public Object sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = StrUtil.format("hello 我叫:{} 我住在:{}", ChineseNameGenerator.getInstance().generate(),
                ChineseAddressGenerator.getInstance()
                        .generate());
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime", DateUtil.now());
        //將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
        rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange, DirectRabbitConfig.TestDirectRouting, map);
        return map;
    }

}
項目rabbit-consumer(消費者)
DirectReceiver接收消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
public class DirectReceiver {
    public static final String TestDirectQueue = "TestDirectQueue";
    public static final String TestDirectExchange = "TestDirectExchange";
    public static final String TestDirectRouting = "TestDirectRouting";

    @RabbitListener(queues = DirectReceiver.TestDirectQueue)
    @RabbitHandler
    public void receiver(@Payload HashMap message) {
        log.info("receiver 消費者1號 收到消息  --- message:{}" ,message);
    }

    @RabbitListener(queues = TestDirectQueue)
    @RabbitHandler
    public void receiver2(Map testMessage) {
        log.info("receiver2 消費者2號 收到消息 getClass:{}  ---  {}" ,testMessage.getClass(), testMessage);
    }
}

測試Direct Exchange (直連型交換機)

正常調用接口【http://127.0.0.1:8999/sendDirectMessage 】 ;並且rabbitMQ管理界面看到TestDirectQueue隊列有消息新增,代表消息發送成功;
image
image
image

看到控制台日志輸出如下,代表消費者 成功接收到推送的消息;多個消費者的情況下 默認是采用輪詢的方式進行消費。
image

Fanout Exchange(扇型交換機)
項目rabbit-provider(生產者)
FanoutRabbitConfig配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

    public final static String fanoutExchange = "fanoutExchange";
    public static final String fanout_A = "fanout.A";
    public static final String fanout_B = "fanout.B";
    public static final String fanout_C = "fanout.C";
    /**
     *  創建三個隊列 :fanout.A   fanout.B  fanout.C
     *  將三個隊列都綁定在交換機 fanoutExchange 上
     *  因為是扇型交換機, 路由鍵無需配置,配置也不起作用
     */
    @Bean
    public Queue queueA() {
        return new Queue(fanout_A);
    }

    @Bean
    public Queue queueB() {
        return new Queue(fanout_B);
    }

    @Bean
    public Queue queueC() {
        return new Queue(fanout_C);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}
SendFanoutMessageController業務發送消息
import cn.hutool.core.date.DateUtil;
import com.example.rabbitmqprovider.direct.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * 扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。
 * 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
 */
@RestController
public class SendFanoutMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法

    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage() {
        String messageData = "FanoutMessage routingKey is null";
        Map<String, Object> map = new HashMap<>();
        map.put("createTime", DateUtil.now());
        map.put("messageData", messageData);
        rabbitTemplate.convertAndSend(FanoutRabbitConfig.fanoutExchange,null, map);
        return "ok";
    }
    @GetMapping("/sendFanoutMessage1")
    public String sendFanoutMessage1() {
        String messageData = "FanoutMessage routingKey is 'xxx'";
        Map<String, Object> map = new HashMap<>();
        map.put("createTime", DateUtil.now());
        map.put("messageData", messageData);
        //扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個接口綁定一下路由鍵
        rabbitTemplate.convertAndSend(FanoutRabbitConfig.fanoutExchange,"xxx", map);
        return "ok";
    }
}
項目rabbit-consumer(消費者)
FanoutReceiver接收消息
import java.util.Map;

@Slf4j
@Component
public class FanoutReceiver {
    public static final String fanout_A = "fanout.A";
    public static final String fanout_B = "fanout.B";
    public static final String fanout_C = "fanout.C";

    @RabbitListener(queues = fanout_A)
    @RabbitHandler
    public void fanout_A(Map testMessage) {
        log.info("fanout_A  {}" , testMessage);
    }

    @RabbitListener(queues = fanout_B)
    @RabbitHandler
    public void fanout_B(Map testMessage) {
        log.info("fanout_B  {}" , testMessage);
    }

    @RabbitListener(queues = fanout_C)
    @RabbitHandler
    public void fanout_C(Map testMessage) {
        log.info("fanout_C  {}" , testMessage);
    }

}

測試 Fanout Exchange(扇型交換機)

先調用http://127.0.0.1:8999/sendFanoutMessage ,在調用接口 http://127.0.0.1:8999/sendFanoutMessage1
日志輸出如下,可以看到 扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。
這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
image

Topic Exchange(主題交換機)
項目rabbit-provider(生產者)
TopicRabbitConfig配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicRabbitConfig {
    //綁定鍵
    public final static String man = "topic.man";
    public final static String woman = "topic.woman";
    public final static String xxx = "xxx";
    public final static String topicExchange = "topicExchange";

    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }

    @Bean
    public Queue thirdQueue() {
        return new Queue(TopicRabbitConfig.xxx);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchange);
    }


    //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
    //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    }

    //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#
    // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

    @Bean
    Binding bindingExchangeMessage3() {
        return BindingBuilder.bind(thirdQueue()).to(exchange()).with("#");
    }

}
SendTopicMessageController業務發送消息
import com.example.rabbitmqprovider.direct.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;


@RestController
public class SendTopicMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法


    @ResponseBody
    @GetMapping("/sendTopicMessage1")
    public Object sendTopicMessage1() {
        String messageData = "message: M A N ";
        Map<String, Object> manMap = new HashMap<>();
        manMap.put("messageData", messageData);
        rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "topic.man", manMap);
        return "ok";
    }

    @ResponseBody
    @GetMapping("/sendTopicMessage2")
    public Object sendTopicMessage2() {
        String messageData = "message: woman is all ";
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageData", messageData);
        rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "topic.woman", womanMap);
        return "ok";
    }

    @ResponseBody
    @GetMapping("/sendTopicMessage3")
    public Object sendTopicMessage3() {
        String messageData = "message: xxx ";
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageData", messageData);
        //routingKey 設置'abc';xxx隊列 routingKey配置了# 看能否收到消息
        rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "abc", womanMap);
        return "ok";
    }

}
項目rabbit-consumer(消費者)
TopicReceiver接收消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class TopicReceiver {
    public static final String topic_man = "topic.man";
    public static final String topic_woman = "topic.woman";
    public final static String xxx = "xxx";

    @RabbitListener(queues = topic_man)
    @RabbitHandler
    public void topic_man(Map testMessage) {
        log.info("我是隊列{} 收到消息:{}" ,topic_man, testMessage);
    }

    @RabbitListener(queues = topic_woman)
    @RabbitHandler
    public void topic_woman(Map testMessage) {
        log.info("我是隊列{} 收到消息:{}" ,topic_woman, testMessage);
    }

    @RabbitListener(queues = xxx)
    @RabbitHandler
    public void xxx(Map testMessage) {
        log.info("我是隊列{} 收到消息:{}" ,xxx, testMessage);
    }

}

測試Topic Exchange(主題交換機)
  1. 調用sendTopicMessage1接口
    routingKey設置的是"topic.man",所以三個隊列都收到消息;
    image
  2. 調用sendTopicMessage2接口
    routingKey設置的是"topic.woman",“topic.man”隊列設置的routingKey是“topic.man”所以沒收到消息;
    image
  3. 調用sendTopicMessage3接口
    routingKey設置的是"abc",隊列“xxx”配置的routingKey是“#”,所以只有隊列“xxx”收到了消息
    image

消息可靠性

rabbitmq 的消息確認分為兩部分:發送消息確認 和 消息接收確認。

項目rabbit-provider(生產者)消息發送確認

發送消息確認:用來確認生產者 producer 將消息發送到 broker ,broker 上的交換機 exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。

消息從 producer 到 rabbitmq broker有一個 confirmCallback 確認模式。

消息從 exchange 到 queue 投遞失敗有一個 returnCallback 退回模式。

我們可以利用這兩個Callback來確保消的100%送達。

1、 ConfirmCallback確認模式

  • ConfirmCallback機制只確認消息是否到達exchange(交換器),不保證消息可以路由到正確的queue;
  • 配置參數需要設置:publisher-confirm-type: CORRELATED;springboot版本較低的話參數設置改成:publisher-confirms: true

2、 ReturnCallback 退回模式

  • ReturnsCallback 消息機制用於處理一個不可路由的消息。在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定路由 key 路由不到,這個時候我們需要監聽這種不可達的消息

  • 配置參數需要設置:publisher-returns: true

  • 配置參數在application.yml文件(publisher-confirm-type、publisher-returns)

消息發送確認配置如下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
       // Mandatory為true時,消息通過交換器無法匹配到隊列會返回給生產者,為false時匹配不到會直接被丟棄
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *  ConfirmCallback機制只確認消息是否到達exchange(交換器),不保證消息可以路由到正確的queue;
             *  需要設置:publisher-confirm-type: CORRELATED;
             *  springboot版本較低 參數設置改成:publisher-confirms: true
             *
             *  以實現方法confirm中ack屬性為標准,true到達
             *  config : 需要開啟rabbitmq得ack publisher-confirm-type
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("ConfirmCallback  確認結果 (true代表發送成功) : {}  消息唯一標識 : {} 失敗原因 :{}",ack,correlationData,cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            /**
             *  ReturnsCallback 消息機制用於處理一個不可路由的消息。在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定路由 key 路由不到,這個時候我們需要監聽這種不可達的消息
             *   就需要這種return機制
             *
             *  config : 需要開啟rabbitmq發送失敗回退; publisher-returns 或rabbitTemplate.setMandatory(true); 設置為true
             */
            @Override
            public void returnedMessage(ReturnedMessage returned) {
//                實現接口ReturnCallback,重寫 returnedMessage() 方法,
//                方法有五個參數
//                message(消息體)、
//                replyCode(響應code)、
//                replyText(響應內容)、
//                exchange(交換機)、
//                routingKey(隊列)。

                log.info("ReturnsCallback    returned : {}",returned);
            }
        });

        return rabbitTemplate;
    }

}

消息發送確認測試接口:

import cn.binarywang.tools.generator.ChineseAddressGenerator;
import cn.binarywang.tools.generator.ChineseNameGenerator;
import cn.hutool.core.util.StrUtil;
import com.example.rabbitmqprovider.direct.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


@RestController
public class SendCallbackMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法

    @ResponseBody
    @GetMapping("/sendMessageToExchangeFail")
    public Object sendMessageToExchangeFail() {
        String messageData = StrUtil.format("hello 我叫:{} 我住在:{}", ChineseNameGenerator.getInstance().generate(),
                ChineseAddressGenerator.getInstance()
                        .generate());
        Map<String,Object> map = new HashMap<>();
        map.put("messageData",messageData);
        //發送一個消息到 不存在到exchange
        rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange.concat("test"), DirectRabbitConfig.TestDirectRouting, map,new CorrelationData(UUID.randomUUID().toString()));
        return map;
    }
    @ResponseBody
    @GetMapping("/sendMessageToQueueFail")
    public Object sendMessageToQueueFail() {
        String messageData = StrUtil.format("hello 我叫:{} 我住在:{}", ChineseNameGenerator.getInstance().generate(),
                ChineseAddressGenerator.getInstance()
                        .generate());
        Map<String,Object> map = new HashMap<>();
        map.put("messageData",messageData);
        //發送一個消息到 不存的隊列里;
        rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange, "xxx", map,new CorrelationData(UUID.randomUUID().toString()));
        return map;
    }
}

消息發送確認測試

  1. 調用sendMessageToExchangeFail接口
2022-03-06 20:31:00.488 ERROR 32087 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchangetest' in vhost '/', class-id=60, method-id=40)
2022-03-06 20:31:00.489  INFO 32087 --- [nectionFactory2] c.e.r.direct.config.RabbitConfig         : ConfirmCallback  確認結果 (true代表發送成功) : false  消息唯一標識 : CorrelationData [id=5aaf1d44-85cc-42ae-8ae5-cbca5074cefd] 失敗原因 :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchangetest' in vhost '/', class-id=60, method-id=40)
  1. 調用sendMessageToQueueFail接口
2022-03-06 20:31:30.186  INFO 32087 --- [nectionFactory2] c.e.r.direct.config.RabbitConfig         : ReturnsCallback    returned : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={spring_returned_message_correlation=8699b6b8-0b12-420f-b866-73f54ba0a002}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=TestDirectExchange, routingKey=xxx]
2022-03-06 20:31:30.186  INFO 32087 --- [nectionFactory1] c.e.r.direct.config.RabbitConfig         : ConfirmCallback  確認結果 (true代表發送成功) : true  消息唯一標識 : CorrelationData [id=8699b6b8-0b12-420f-b866-73f54ba0a002] 失敗原因 :null

項目rabbit-consumer(消費者)消息接收確認

消息通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK

消息確認模式有:
  • AcknowledgeMode.NONE:自動確認
  • AcknowledgeMode.AUTO:根據情況確認(默認值)
  • AcknowledgeMode.MANUAL:手動確認
手動確認消息
1、basicAck

表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple)
  • deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數,delivery tag 的范圍僅限於 Channel

  • multiple:是否批量確認,值為 true 則會一次性 ack所有小於當前消息 deliveryTag 的消息。

  • 舉個栗子: 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。

2、basicNack

表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  • deliveryTag:表示消息投遞序號。
  • multiple:是否批量確認。
  • requeue:值為 true 消息將重新入隊列。
3、basicReject

拒絕消息,與basicNack區別在於不能進行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)
  • deliveryTag:表示消息投遞序號。
  • requeue:值為 true 消息將重新入隊列。

開啟手動確認消息

  • application.yml配置文件開啟
    acknowledge-mode設置manual
spring:
  #項目名稱
  application:
    name: rabbitmq-custom
  #配置rabbitMq 服務器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: rabbitMQ
    password: rabbitMQ
    listener:
      simple:
        acknowledge-mode: manual
  • 注解開啟手動確認
    @RabbitListener注解中設置參數ackMode= "MANUAL"開啟
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;

/**
 * 先注釋 DirectReceiver的@Component
 * 用TestAckDirectReceiver方法來驗證 手動ack
 */
@Slf4j
@Component
public class TestAckDirectReceiver {
    public static final String TestDirectQueue = "TestDirectQueue";
    public static final String TestDirectExchange = "TestDirectExchange";
    public static final String TestDirectRouting = "TestDirectRouting";

    @RabbitListener(queues = TestAckDirectReceiver.TestDirectQueue,
            ackMode= "MANUAL")
    @RabbitHandler
    public void receiver(@Payload HashMap dataMsg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("消費者1號 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
        try {
           int i = 1/0;
        } catch (Exception e) {
            log.error("MyAckReceiver error : {}  deliveryTag:{}",e.getMessage(),deliveryTag);
            channel.basicReject(deliveryTag, true);
        }

    }

    @RabbitListener(queues = TestAckDirectReceiver.TestDirectQueue,
            ackMode= "MANUAL")
    @RabbitHandler
    public void receiver1(@Payload HashMap dataMsg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("消費者2號 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
        channel.basicAck(deliveryTag,true);
    }

}

手動消息確認消費測試

可以看到消費者1號調用basicReject方法拒絕了消息;消費者2號調用channel.basicAck方法手動確認了消費

2022-03-06 21:23:16.816  INFO 51530 --- [ntContainer#4-1] c.e.c.direct.TestAckDirectReceiver       : 消費者1號 deliveryTag:5 dataMsg:{createTime=2022-03-06 21:23:16, messageId=33ae8439-4635-4499-ae7d-c167f100c26a, messageData=hello 我叫:毛兒 我住在:湖南省湘西土家族苗族自治州露勁路7725號純辟頑小區8單元1122室} 
2022-03-06 21:23:16.816 ERROR 51530 --- [ntContainer#4-1] c.e.c.direct.TestAckDirectReceiver       : MyAckReceiver error : / by zero  deliveryTag:5
2022-03-06 21:23:16.818  INFO 51530 --- [ntContainer#3-1] c.e.c.direct.TestAckDirectReceiver       : 消費者2號 deliveryTag:6 dataMsg:{createTime=2022-03-06 21:23:16, messageId=33ae8439-4635-4499-ae7d-c167f100c26a, messageData=hello 我叫:毛兒 我住在:湖南省湘西土家族苗族自治州露勁路7725號純辟頑小區8單元1122室} 

代碼我已經上傳到gitee 代碼傳送門

參考資料:
https://www.jianshu.com/p/79ca08116d57
https://blog.csdn.net/qq_35387940/article/details/100514134
https://blog.csdn.net/weixin_32820639/article/details/111240447
https://www.cnblogs.com/gyjx2016/p/13705307.html
https://zhuanlan.zhihu.com/p/152325703


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM