目录
六、消息的重复消费问题
1.什么幂等性
幂等性:多次操作造成的结果是一致的。对于非幂等的操作,幂等性如何保证?——使用分布式锁。
1)在请求方式中的幂等性的体现
- get:多次get 结果是一致的
- post:添加,非幂等
- put:修改:幂等,根据id修改
- delete:根据id删除,幂等
对于非幂等的请求,我们在业务里要做幂等性保证。
2)在消息队列中的幂等性体现
消息队列中,很可能一条消息被冗余部署的多个消费者收到,对于非幂等的操作,比如用户的注册,就需要做幂等性保证,否则消息将会被重复消费。使用分布式锁解决幂等性问题
2.业务代码中实现幂等性
1)生产者端修改配置文件
publisher-confirm-type: correlated 开启confirm 请求头中将会使用“spring_returned_message_correlation”键来传递业务id
server:
port: 9090
spring:
rabbitmq:
host: 172.16.253.8
port: 5672
username: xiaoming
password: 123456
virtual-host: java2007
# # 开启confirm simple:简单的执行ack的判断;correlated: 执行ack的时候还会携带数据;none: 不ack 默认的
publisher-confirm-type: correlated
# # 开启return机制
# publisher-returns: true
2)生产者端传递业务id
@Test
public void testSendMessage(){
//业务id
String id = UUID.randomUUID().toString();
//封装了业务id的消息元数据
CorrelationData correlationData = new CorrelationData(id);
//发送消息,并且携带消息的业务id
rabbitTemplate.convertAndSend("my_boot_topic_exchange",
"product.add",
"hello message",
correlationData
);
}
3)消费者端进行业务逻辑判断
/**
* 消费端的幂等性的实现
*/
@RabbitListener(queues = "my_boot_topic_queue")
public void processByMSG(Message message,Channel channel) throws IOException {
//如何获得消息的业务id
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//设置分布式锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(messageId, 1,100000, TimeUnit.MILLISECONDS);
if(lock){
//做消费
System.out.println("添加用户成功");
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//不做消费
System.out.println("已重复消费");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
为什么redis-cluster在极端情况下不适合做分布式锁