一、分布式锁
分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。
二、分布式锁的演进
业务:电商网站卖东西需要去减库存,本篇文章假设下的订单数量都为1;
第1版的代码:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
//获取redis中的库存
int stock = Integer.valueOf(valueOperations.get("stock"));
if(stock > 0) {
int newStock = stock - 1;
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
}
else {
System.out.println("库存已经为0,不能继续扣减");
}
return "success";
}
}
以上代码在高并发的场景下会产生超卖的问题,所以我们修改一下代码(增加synchronized);
第2版代码:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
synchronized (this) {
//获取redis中的库存
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//减库存
valueOperations.set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
}
return "success";
}
}
以上代码在服务为多实例的情况下,还是会出现超卖的问题,这个时候就要引入分布式锁来解决了。
第3版代码:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
String lockKey = "lockKey";
//加锁: setnx
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
//释放锁
redisTemplate.delete(lockKey);
return "success";
}
}
以上代码的问题:
(1)若在执行业务逻辑的过程中出现了异常,则会造成锁不会被释放,使其他有关的线程全部阻塞住(死锁);我们可以把锁释放操作放入到 finally 语句中来解决;
(2)若在执行业务逻辑的过程中服务给挂掉了,仍然会造成锁不会被释放,使其他有关的线程全部阻塞住(死锁);我们可以给 redis 的 key 增加一个超时时间(超过指定的时间则会删除key及其对应的数据),虽然在超时时间到达之前其他有关的线程会一直阻塞住,但是这个时间比较小,且可以解决死锁的问题,所以这个解决方案也是可以接受的。代码如下:
第4版代码:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
String lockKey = "lockKey";
try {
//加锁: setnx,expire(10秒超时)
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
//释放锁 redisTemplate.delete(lockKey);
}
return "success";
}
}
以上代码还是会出现问题:
当线程1的业务执行到一半的时候,设置的锁超时时间到了,则锁的key会被删除;线程2就加锁成功了,线程2还在执行的时候,线程1的业务执行完了,线程1接着执行删除锁的操作,但是线程1删除的锁实际上是线程2加的锁,导致锁失效的问题。
方法一:可以使用 “只要自己加锁,只能自己去释放” 来解决这个问题(第5版代码);
第5版代码:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
String lockKey = "lockKey";
String clientId = UUID.randomUUID().toString();
try {
//加锁: setnx,expire Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服务器繁忙, 请稍后重试");
return "error";
}
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
if(clientId.equals(redisTemplate.opsForValue().get(lockKey))) { //释放锁 redisTemplate.delete(lockKey); }
}
return "success";
}
}
以上代码虽然解决了锁被其他线程释放的问题,但是还是会出现问题;当前线程的业务还没有执行完,锁的超时时间到了,这样其他线程就可以去加锁并执行业务逻辑了,这样就有两个线程都在执行了,有可能导致bug。
方法二:可以给锁进行续命,每次锁快超时的时候就给锁重新在设置一个时间(引入另一个redis的java客户端 Redisson)
三、分布式锁的Redisson实现
(1)引入maven坐标;
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.4</version>
</dependency>
(2)增加配置文件,将Redisson注入到容器中;
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
//单机版
//config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
//集群版
config.useClusterServers()
.addNodeAddress("redis://192.168.1.1:8001")
.addNodeAddress("redis://192.168.1.1:8002")
.addNodeAddress("redis://192.168.1.2:8001")
.addNodeAddress("redis://192.168.1.2:8002")
.addNodeAddress("redis://192.168.1.3:8001")
.addNodeAddress("redis://192.168.1.3:8002");
return (Redisson) Redisson.create(config);
}
}
(3)分布式锁的实现
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired private Redisson redisson;
public String deduceStock() {
String lockKey = "lockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加锁(超时默认30s), 实现锁续命的功能(后台启动一个timer, 默认每10s检测一次是否持有锁) redissonLock.lock();
//------ 执行业务逻辑 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//执行业务操作减库存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣减库存成功, 剩余库存:" + newStock);
} else {
System.out.println("库存已经为0,不能继续扣减");
}
//------ 执行业务逻辑 ----end------
} finally {
//解锁 redissonLock.unlock();
}
return "success";
}
}
Redisson的实现原理
RedissonLock的使用介绍
Redisson的官网:https://redisson.org/
// 锁默认有效时间30秒,每10秒去检查并重新设置超时时间
void lock();
// 超过锁有效时间 leaseTime,就会释放锁
void lock(long leaseTime, TimeUnit unit);
// 尝试获取锁;成功则返回true,失败则返回false
boolean tryLock();
// 不会去启动定时任务;在 time 时间内还没有获取到锁,则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 不会去启动定时任务;当 waitTime 的时间到了,还没有获取到锁则返回false;若获取到锁了,锁的有效时间设置为 leaseTime
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
Jedis和Redisson的比较
Jedis提供了比Redisson更丰富的操作;
Redisson底层多使用 lua 脚本实现,对原子性的操作封装较好,尤其是在分布式锁上的封装;
Redis实现的分布式锁还会出现一点问题:
线程1加了锁去执行业务了,此时Redis的 master 挂掉了,还没有将数据同步到 slave 上。因为集群会选举一个新的 master 出来,但是新的 master 上并没有这个锁;线程2可以在新选举产生的 master 上去加锁,然后处理业务。
(1)针对以上问题,我们可以使用 zookeeper 去实现分布式锁,因为它是强一致性的。但是zookeeper的性能是低于Redis,使用Redis是完全够了。
(2)当然,对于以上的问题,我们也可以使用 RedLock 去解决Redis上的那个问题,RedLock 实现的原理:给多个Redis节点发送加锁的消息,只有超过一半以上的节点加锁成功才算加锁成功。
但是不推荐使用RedLock,当前的 RedLock 是有bug的,它的实现原理和 zookeeper 是差不多的。
高并发的高性能的Redis
怎么在高并发的场景去实现一个高性能的分布式锁呢?
电商网站在大促的时候并发量很大:
(1)若抢购不是同一个商品,则可以增加Redis集群的cluster来实现,因为不是同一个商品,所以通过计算 key 的hash会落到不同的 cluster上;
(2)若抢购的是同一个商品,则计算key的hash值会落同一个cluster上,所以加机器也是么有用的。
我们可以使用库存分段锁的方式去实现。
分段锁
假如产品1有200个库存,我们可以将这200个库存分为10个段存储(每段20个),每段存储到一个cluster上;将key使用hash计算,使这些key最后落在不同的cluster上。
每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。
可以参照 ConcurrentHashMap 的源码去实现,它使用的就是分段锁。
高性能分布式锁参考链接:https://blog.csdn.net/eluanshi12/article/details/84616173