rabbitmq監控之消息確認ack


 

 

一直以來,學習rabbitmq都是跟着各種各樣的教程、博客、視頻和文檔,擼起袖子就是干!!!最后,也成功了。

當然,成功的標志也僅僅是生產者發送了消息,消費者消費了消息enter description here

真正在實際項目中,一旦出問題,需要分析問題的時候,僅僅了解這些是不夠的。

老祖宗說過:實踐,是檢驗真理的唯一標准。所以,研究分析一下消息確認模式ack的整個過程,到底怎么回事

一、測試環境

使用springboot環境:

  • 一個Fanout交換機fanout.exchange
  • 兩個隊列:fanout.queue1fanout.queue2

pom依賴:

<!-- 添加springboot對amqp的支持 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application配置:

# RabbitMQ 基本配置
spring.rabbitmq.host=192.168.183.220
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

## 生產端配置
# 開啟發布確認,就是confirm模式. 消費端ack應答后,才將消息從隊列中刪除
spring.rabbitmq.publisher-confirms=true
# 發布返回
spring.rabbitmq.publisher-returns=true

## 消費端配置
# 手動ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 消費者最小數量
spring.rabbitmq.listener.simple.concurrency=1
# 消費者最大數量
spring.rabbitmq.listener.simple.max-concurrency=10
# 在單個請求中處理的消息個數,他應該大於等於事務數量(unack的最大數量)
spring.rabbitmq.listener.simple.prefetch=1

## 模板配置
#設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除
spring.rabbitmq.template.mandatory=true

RabbitConfig.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);

    @Bean
    public Queue queue() {
        return new Queue("queue");
    }

    @Bean(name = "FQ1")
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    @Bean(name = "FQ2")
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }

    @Bean
    public Binding bindingFQ1(@Qualifier("FQ1") Queue queue, FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding bindingFQ2(@Qualifier("FQ2") Queue queue, FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    /** * 定制化amqp模版 * * ConfirmCallback接口用於ack回調 即消息發送到exchange ack * ReturnCallback接口用於消息發送失敗回調 即消息發送不到任何一個隊列中 ack */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 消息返回, 需要配置 publisher-returns: true
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });

        // 消息確認, 需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
// log.debug("消息發送到exchange成功,id: {}", correlationData.getId());
                 log.debug("消息發送到exchange成功");
            } else {
                log.debug("消息發送到exchange失敗,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }
}

HelloSender.java

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate template;

    public void sendAck(String msg) {
        template.convertAndSend("fanout.exchange","",msg);
    }
	
}

HelloReceive.java

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class HelloReceive {

    //手動確認消息
    @RabbitListener(queues = "fanout.queue1")
    public void FQ1(Message message, Channel channel) throws IOException {
        // 采用手動應答模式, 手動確認應答更為安全穩定
        System.out.println("FQ1:" + new String(message.getBody()));
        // 第一個參數是消息標識, 第二個是批量確認; false當前消息確認, true此次之前的消息確認
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    // 不確認消息,消息會重回隊列
    @RabbitListener(queues = "fanout.queue2")
    public void FQ2(String str) {
        System.out.println("FQ2:" + str);
    }

}

單元測試


import com.lyf.springboot.SpringbootApplication;
import com.lyf.springboot.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

// SpringbootApplication Springboo啟動類
@SpringBootTest(classes= SpringbootApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class TestRabbitMQ {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void testRabbit2() {
        for (int i = 0; i < 10; i++) {
            helloSender.sendAck("haha~"+i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

二、啟動測試

在確認消息的地方加上斷點,方便查看消息確認的過程。

 

enter description here

斷點

 

rabbitmq后台管理界面:

 

enter description here

監控平台

 

Message

  • Ready: 隊列中等待消費的消息
  • Unacked:隊列中等待被確認的消息(此時消息已到達消費者,但是未被確認)
  • Total:隊列中消息總數

啟動測試

 

enter description here

第一次

 

一開始兩個隊列都收到了1條消息,因為開啟了confirm模式,所以Message的Unacked狀態都為1,Total為1。

 

enter description here

第二次

收到第2條消息后,隊列queue1執行了ack確認,所以隊列中只有1條消息,1條消息等待被確認;隊列queue2沒有被ack確認,所以Ready=1,Unacked=1,Total=2。

 

 

enter description here

第十次

收到第10條消息后,隊列queue1依然是Ready=0,Unacked=1,Total=1;而隊列queue2一直沒有被ack確認,所以Ready=9,Unacked=1,Total=10。

 

 

enter description here

最終結果

消息發送完后,隊列queue1已經沒有消息了,隊列queue2還有10條等待被消費的消息。默認未被ack的消息重回隊列中。
spring.rabbitmq.listener.simple.default-requeue-rejected=true

 

 



參考文檔:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM