重復消費概述
當消息回退到隊列里面后,會被再次消費,但是我們不能讓消息消費成功2次其實, MQ 自己就可以保證消息不被重復消費,因為 MQ 可以把消息投遞給消費者時,是阻塞的,不會把一個消息投遞給多個消費者!但是面試時,有人問你,消息怎么保證不被重復消費!無論在 RabbitMQ, 或者 ActiveMQ 里面,解決思路都是一樣的!!對於重復消費 → 去重操作 → 接口的冪等操作( → 找到該操作的有個唯一標識 → 具體的情況,具體討論)Eg: 微信里面,若有人關注我了,我給他發紅包,我的紅包只能發送一次,我們可以使用用戶的 openId 做一個去重的操作訂單不重復(訂單的編號)(先按訂單編號查詢,再操作)數據庫不能重復(數據庫的 id)(先查詢是否存在,再進行操作)總結:去重操作就是找到一個該操作的唯一標識,把該標識,放在一個空間里面,在此操作時,我們可以先判斷該空間是否已經操作過它了。
利用 BloomFilter 實現去重
官方地址:https://www.hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0
添加 Hutool 的依賴
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.1.5</version>
</dependency>
配置 BitMapBloomFilter
/**
* @author BNTang
*/
@Configuration
public class BloomFilterConfig {
@Bean
public BitMapBloomFilter bitMapBloomFilter() {
return new BitMapBloomFilter(1024);
}
}
使用 Hutool-BloomFilter 去重
改造之前的消費代碼如下所示:
/**
* @author BNTang
*/
@Component
public class MessageReceive {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private BitMapBloomFilter bitMapBloomFilter;
/**
* 消息的前綴
*/
private String MESSAGE = "message:";
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投遞ID → :" + deliveryTag);
System.out.println("消息自定義ID → :" + messageId);
if (this.bitMapBloomFilter.contains(messageId)) {
System.out.println("該消息被消費過,不能重復進行消費");
try {
// 如果進入到這里面,說明這個消息之前被消費過,但是 MQ 認為你沒有消費,所以我們要簽收這條消息
channel.basicAck(deliveryTag, false);
return;
} catch (Exception e) {
System.out.println(e);
}
}
// 如果消息內容為 123456 就簽收它
if (content.equals("123456")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息簽收成功");
// 消費成功之后放到布隆過濾器里面
this.bitMapBloomFilter.add(messageId);
} else {
String count = this.redisTemplate.opsForValue().get(MESSAGE + messageId);
if (count != null && Long.valueOf(count) >= 3) {
channel.basicNack(deliveryTag, false, false);
System.out.println("該消息消費【3】次都失敗,我們記錄它,人工處理" + content);
} else {
// 如果不是 123456 就決絕簽收
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被決絕簽收");
// 因為拒絕了,我們把消息ID放到Redis里面
this.redisTemplate.opsForValue().increment(MESSAGE + messageId);
}
}
}
}
發消息的代碼和前面章節一樣,不在貼了。