消息的重復消費及處理


重復消費概述

當消息回退到隊列里面后,會被再次消費,但是我們不能讓消息消費成功2次其實, MQ 自己就可以保證消息不被重復消費,因為 MQ 可以把消息投遞給消費者時,是阻塞的,不會把一個消息投遞給多個消費者!但是面試時,有人問你,消息怎么保證不被重復消費!無論在 RabbitMQ, 或者 ActiveMQ 里面,解決思路都是一樣的!!對於重復消費 → 去重操作 → 接口的冪等操作( → 找到該操作的有個唯一標識 → 具體的情況,具體討論)Eg: 微信里面,若有人關注我了,我給他發紅包,我的紅包只能發送一次,我們可以使用用戶的 openId 做一個去重的操作訂單不重復(訂單的編號)(先按訂單編號查詢,再操作)數據庫不能重復(數據庫的 id)(先查詢是否存在,再進行操作)總結:去重操作就是找到一個該操作的唯一標識,把該標識,放在一個空間里面,在此操作時,我們可以先判斷該空間是否已經操作過它了。

利用 BloomFilter 實現去重

官方地址:https://www.hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0

添加 Hutool 的依賴

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.1.5</version>
</dependency>

配置 BitMapBloomFilter

/**
 * @author BNTang
 */
@Configuration
public class BloomFilterConfig {

    @Bean
    public BitMapBloomFilter bitMapBloomFilter() {
        return new BitMapBloomFilter(1024);
    }

}

使用 Hutool-BloomFilter 去重

改造之前的消費代碼如下所示:

/**
 * @author BNTang
 */
@Component
public class MessageReceive {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private BitMapBloomFilter bitMapBloomFilter;

    /**
     * 消息的前綴
     */
    private String MESSAGE = "message:";

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("queue"),
                    key = {"error"},
                    exchange = @Exchange(value = "directs")
            )
    })
    public void receiveMessage(String content, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();

        System.out.println("消息投遞ID → :" + deliveryTag);
        System.out.println("消息自定義ID → :" + messageId);

        if (this.bitMapBloomFilter.contains(messageId)) {
            System.out.println("該消息被消費過,不能重復進行消費");
            try {
                // 如果進入到這里面,說明這個消息之前被消費過,但是 MQ 認為你沒有消費,所以我們要簽收這條消息
                channel.basicAck(deliveryTag, false);
                return;
            } catch (Exception e) {
                System.out.println(e);
            }
        }

        // 如果消息內容為 123456 就簽收它
        if (content.equals("123456")) {
            channel.basicAck(deliveryTag, true);
            System.out.println("消息簽收成功");

            // 消費成功之后放到布隆過濾器里面
            this.bitMapBloomFilter.add(messageId);
        } else {
            String count = this.redisTemplate.opsForValue().get(MESSAGE + messageId);

            if (count != null && Long.valueOf(count) >= 3) {
                channel.basicNack(deliveryTag, false, false);
                System.out.println("該消息消費【3】次都失敗,我們記錄它,人工處理" + content);
            } else {
                // 如果不是 123456 就決絕簽收
                channel.basicNack(deliveryTag, false, true);
                System.out.println("消息被決絕簽收");

                // 因為拒絕了,我們把消息ID放到Redis里面
                this.redisTemplate.opsForValue().increment(MESSAGE + messageId);
            }
        }
    }
}

發消息的代碼和前面章節一樣,不在貼了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM