SpringBoot-RabbitMQ消息的消費與簽收機制


消息的簽收機制說明

消息消費成功后,我們在客戶端簽收后,消息就從MQ服務器里面刪除了若消息沒有消費成功,我們讓他回到MQ里面,讓別人再次重試消費。

自動簽收

消息只要被客戶端接收到,無論你客戶端發生了什么,我們服務器都不管你了,直接把消息刪除了,這是它是默認的行為。

手動簽收

創建項目 springboot-rabbitmq,創建方式和之前的方式一樣依賴也是。

修改application.yml配置文件:

server:
  port: 8080
spring:
  application:
    name: Springboot-RabbitMQ
  rabbitmq:
    username: user
    password: 123456
    host: 139.196.183.130
    port: 5672
    virtual-host: v-it6666
    # NONE 值是禁用發布確認模式,是默認值
    # CORRELATED 值是發布消息成功到交換機后會觸發回調方法
    publisher-confirm-type: correlated
    # 這個是老版本的用法
    # publisher-confirms: true
    # 消息由交換機到達隊列失敗時觸發
    publisher-returns: true
    listener:
      simple:
        # 自動簽收,這個是默認行為
        # acknowledge-mode: auto
        # 手動簽收
        acknowledge-mode: manual
      direct:
        # 設置直連交換機的簽收類型
        acknowledge-mode: manual

消息投遞的 ID 說明

獲取投遞的 ID

/**
 * @author BNTang
 */
@Component
public class MessageReceive {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"error"},
                    exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT)
            )
    })
    public void receiveMessage(String content, Message message, Channel channel) throws IOException {
        // 消息投遞ID
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // messageId 就是消息的唯一的標識,自己定義
        String messageId = message.getMessageProperties().getMessageId();

        System.out.println("消費者收到消息 → 消息對象:" + message);
        System.out.println("消費者收到消息 → 內容為:" + content);
        System.out.println("消費者收到消息 → 信道:" + channel);
        System.out.println("消息投遞ID → :" + deliveryTag);
        System.out.println("消息自定義ID → :" + messageId);

        channel.basicAck(deliveryTag, false);
    }
}

basicAck方法參數的解釋如下:

  • deliveryTag:消息投遞ID,要簽收的投遞ID。
  • multiple:是否批量簽收。

投遞 ID 存在的問題及消息永久 ID 設置的問題

什么能代表消息的唯一的標識,顯然投送的 ID 不行,因為一個消息可能會有多個投送的 ID,我們就需要給消息一個唯一的值,這個伴隨消息終身,不會變化!我們需要發送消息時,給消息設置一個 ID,然后保證該 ID 唯一就可以了,如下所示!

@Test
void sendMsg() throws IOException {
    for (int i = 0; i < 5; i++) {
        this.rabbitTemplate.convertAndSend("directs", "error", "我是一個測試消息" + i,
                message -> {
                    String messageId = UUID.randomUUID().toString().replace("-", "");
                    // 自己給消息設置自定義的ID
                    message.getMessageProperties().setMessageId(messageId);
                    return message;
                });
        System.out.println("消息發送成功");
        System.in.read();
    }
}

關於批量簽收消息

若我們此時簽收了編號為4的消息,但是前面的0,1,2,3 都沒有簽收,則MQ若是批量的簽收,它會把0,1,2,3 都簽收,因為MQ認為,比他晚投遞的已經簽收,前面的肯定已經消費成功了。

生產者

static int a = 1;

@Test
public void sendMessage() throws IOException {
    for (int i = 0; i <= 3; i++) {
        this.rabbitTemplate.convertAndSend("directs", "error", "ABC - " + i, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 自己給消息設置自定義的ID
                message.getMessageProperties().setMessageId((a++) + "");
                return message;
            }
        });
    }
    System.out.println("消息發送成功");
    System.in.read();
}

消費者

/**
 * @author BNTang
 */
@Component
public class MessageReceive {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("queue"),
                    key = {"error"},
                    exchange = @Exchange(value = "directs")
            )
    })
    public void receiveMessage(String content, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();

        System.out.println("消息投遞ID → :" + deliveryTag);
        System.out.println("消息自定義ID → :" + messageId);

        if (content.equals("ABC - 3")) {
            channel.basicAck(deliveryTag, true);
            System.out.println("消息簽收成功 → 內容為:" + content);
        }
    }
}

可以發現只簽收了ABC - 3 但是隊列里面沒有消息了,說明前面的12都被批量簽收了。

不簽收

當我們認為消息不合格時,或不是我們要的消息時,我們可以選擇不簽收它。

生產者

@Test
public void sendMessage() throws IOException {
    this.rabbitTemplate.convertAndSend("directs", "error", "1234567", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            String messageId = UUID.randomUUID().toString().replace("-", "");
            // 自己給消息設置自定義的ID
            message.getMessageProperties().setMessageId(messageId);
            return message;
        }
    });
    System.out.println("消息發送成功");
    System.in.read();
}

消費者

/**
 * @author BNTang
 */
@Component
public class MessageReceive {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("queue"),
                    key = {"error"},
                    exchange = @Exchange(value = "directs")
            )
    })
    public void receiveMessage(String content, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();

        System.out.println("消息投遞ID → :" + deliveryTag);
        System.out.println("消息自定義ID → :" + messageId);

        if (content.equals("1234567")) {
            channel.basicAck(deliveryTag, true);
            System.out.println("消息簽收成功");
        } else {
            // 如果不是 1234567 就決絕簽收
            channel.basicNack(deliveryTag, false, true);
            System.out.println("消息被決絕簽收");
        }
    }
}

如上的代碼測試方式你先發送一個消息,消息內容為 1234567 這是正常的情況,然后在發送一個 123456 就會發現效果,消息消費死循環了。

我們選擇不簽收,其實是為了保護消息,當消費消息發生異常時,我們可以把消息放在隊列里面,讓它重新投遞,重新讓別人消費!而不是丟了它!

解決不簽收消息的死循環

不簽收,並且讓它回到隊列里面,想法很好,但是很容易造成死循環,因為沒有任何人能消費她! 我們設計一個機制,當一個消息被消費3次還沒有消費成功,我們就直接把它記錄下來,人工處理! 消息消費3次(消息的標識,消息的計數)我們引入Redis,使用Redis計數,若超過3次,直接拒絕消息,並且不回到隊列里面。

引入 Redis 依賴,並使用 Docker 運行 Redis,Redis 依賴如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Docker 運行 Redis 命令腳本如下所示,當然也可以使用本地的Redis圖方便,我這里是有機子我就用我的機子了:

docker run -d --name myredis -p 6390:6379 redis --requirepass "1234"

修改消費者的配置文件

server:
  port: 8002
spring:
  application:
    name: consumer
  rabbitmq:
    host: 139.196.183.130
    port: 5672
    username: user
    password: 123456
    virtual-host: v-it6666
  # Redis的配置
  redis:
    host: 139.196.183.130
    port: 6390
    password: 1234

改造消費者,改造之后的代碼如下:

/**
 * @author BNTang
 */
@Component
public class MessageReceive {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 消息的前綴
     */
    private String MESSAGE = "message:";

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("queue"),
                    key = {"error"},
                    exchange = @Exchange(value = "directs")
            )
    })
    public void receiveMessage(String content, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();

        System.out.println("消息投遞ID → :" + deliveryTag);
        System.out.println("消息自定義ID → :" + messageId);

        if (content.equals("1234567")) {
            channel.basicAck(deliveryTag, true);
            System.out.println("消息簽收成功");
        } else {
            String count = this.redisTemplate.opsForValue().get(MESSAGE + messageId);

            if (count != null && Long.valueOf(count) >= 3) {
                channel.basicNack(deliveryTag, false, false);
                System.out.println("該消息消費【3】次都失敗,我們記錄它,人工處理" + content);
            } else {
                // 如果不是 1234567 就決絕簽收
                // 處理業務邏輯【可能邏輯處理的出現了問題啥的】
                channel.basicNack(deliveryTag, false, true);
                System.out.println("消息被決絕簽收");

                // 因為拒絕了,我們把消息ID放到Redis里面
                this.redisTemplate.opsForValue().increment(MESSAGE + messageId);
            }
        }
    }
}

如上basicNack方法參數的解釋如下所示:

  • deliveryTag:消息的投遞ID,要簽收的投遞ID是多少
  • multiple:是否批量簽收
  • requeue:true,代表決絕簽收,並把消息重新放回隊列里面,false,直接拒絕簽收

測試注意,因為統計計數時,消息的次數,是通過消息的 ID 來計數的,我們在發送消息時,要設置消息的頭:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM