目錄
六、消息的重復消費問題

1.什么冪等性
冪等性:多次操作造成的結果是一致的。對於非冪等的操作,冪等性如何保證?——使用分布式鎖。
1)在請求方式中的冪等性的體現
- get:多次get 結果是一致的
- post:添加,非冪等
- put:修改:冪等,根據id修改
- delete:根據id刪除,冪等
對於非冪等的請求,我們在業務里要做冪等性保證。
2)在消息隊列中的冪等性體現
消息隊列中,很可能一條消息被冗余部署的多個消費者收到,對於非冪等的操作,比如用戶的注冊,就需要做冪等性保證,否則消息將會被重復消費。使用分布式鎖解決冪等性問題

2.業務代碼中實現冪等性
1)生產者端修改配置文件
publisher-confirm-type: correlated 開啟confirm 請求頭中將會使用“spring_returned_message_correlation”鍵來傳遞業務id
server:
port: 9090
spring:
rabbitmq:
host: 172.16.253.8
port: 5672
username: xiaoming
password: 123456
virtual-host: java2007
# # 開啟confirm simple:簡單的執行ack的判斷;correlated: 執行ack的時候還會攜帶數據;none: 不ack 默認的
publisher-confirm-type: correlated
# # 開啟return機制
# publisher-returns: true
2)生產者端傳遞業務id
@Test
public void testSendMessage(){
//業務id
String id = UUID.randomUUID().toString();
//封裝了業務id的消息元數據
CorrelationData correlationData = new CorrelationData(id);
//發送消息,並且攜帶消息的業務id
rabbitTemplate.convertAndSend("my_boot_topic_exchange",
"product.add",
"hello message",
correlationData
);
}
3)消費者端進行業務邏輯判斷
/**
* 消費端的冪等性的實現
*/
@RabbitListener(queues = "my_boot_topic_queue")
public void processByMSG(Message message,Channel channel) throws IOException {
//如何獲得消息的業務id
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//設置分布式鎖
Boolean lock = redisTemplate.opsForValue().setIfAbsent(messageId, 1,100000, TimeUnit.MILLISECONDS);
if(lock){
//做消費
System.out.println("添加用戶成功");
//手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//不做消費
System.out.println("已重復消費");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
為什么redis-cluster在極端情況下不適合做分布式鎖

