Redis之分布式锁的使用


一、分布式锁

  分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。

二、分布式锁的演进

业务:电商网站卖东西需要去减库存,本篇文章假设下的订单数量都为1;

第1版的代码

@Service
public class RedisLockDemo {

    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        //获取redis中的库存
        int stock = Integer.valueOf(valueOperations.get("stock"));
        if(stock > 0) {
            int newStock = stock - 1;
            valueOperations.set("stock", newStock + "");
            System.out.println("扣减库存成功, 剩余库存:" + newStock);
        }
        else {
            System.out.println("库存已经为0,不能继续扣减");
        }
     return "success"; } }

以上代码在高并发的场景下会产生超卖的问题,所以我们修改一下代码(增加synchronized);

第2版代码

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        synchronized (this) {
            //获取redis中的库存
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //减库存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
        }
     
return "success"; } }

以上代码在服务为多实例的情况下,还是会出现超卖的问题,这个时候就要引入分布式锁来解决了。

第3版代码

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        String lockKey = "lockKey";

        //加锁: setnx
        Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
        if(null == isSuccess || isSuccess) {
            System.out.println("服务器繁忙, 请稍后重试");
            return "error";
        }
        
        //------ 执行业务逻辑 ----start------
        int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int newStock = stock - 1;
            //执行业务操作减库存
            redisTemplate.opsForValue().set("stock", newStock + "");
            System.out.println("扣减库存成功, 剩余库存:" + newStock);
        } else {
            System.out.println("库存已经为0,不能继续扣减");
        }
        //------ 执行业务逻辑 ----end------

        //释放锁
        redisTemplate.delete(lockKey);
        return "success";
    }
}

以上代码的问题:

(1)若在执行业务逻辑的过程中出现了异常,则会造成锁不会被释放,使其他有关的线程全部阻塞住(死锁);我们可以把锁释放操作放入到 finally 语句中来解决;

(2)若在执行业务逻辑的过程中服务给挂掉了,仍然会造成锁不会被释放,使其他有关的线程全部阻塞住(死锁);我们可以给 redis 的 key 增加一个超时时间(超过指定的时间则会删除key及其对应的数据),虽然在超时时间到达之前其他有关的线程会一直阻塞住,但是这个时间比较小,且可以解决死锁的问题,所以这个解决方案也是可以接受的。代码如下:

第4版代码:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        String lockKey = "lockKey";

        try {
            //加锁: setnx,expire(10秒超时)
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            //释放锁 redisTemplate.delete(lockKey);
        }
        return "success";
    }
}

以上代码还是会出现问题:

  当线程1的业务执行到一半的时候,设置的锁超时时间到了,则锁的key会被删除;线程2就加锁成功了,线程2还在执行的时候,线程1的业务执行完了,线程1接着执行删除锁的操作,但是线程1删除的锁实际上是线程2加的锁,导致锁失效的问题。

方法一:可以使用 “只要自己加锁,只能自己去释放” 来解决这个问题(第5版代码);

第5版代码:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        String lockKey = "lockKey";

        String clientId = UUID.randomUUID().toString();
        try {
            //加锁: setnx,expire Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            if(clientId.equals(redisTemplate.opsForValue().get(lockKey))) { //释放锁 redisTemplate.delete(lockKey); }
        }
        return "success";
    }
}

以上代码虽然解决了锁被其他线程释放的问题,但是还是会出现问题;当前线程的业务还没有执行完,锁的超时时间到了,这样其他线程就可以去加锁并执行业务逻辑了,这样就有两个线程都在执行了,有可能导致bug。

方法二:可以给锁进行续命,每次锁快超时的时候就给锁重新在设置一个时间(引入另一个redis的java客户端 Redisson

三、分布式锁的Redisson实现

(1)引入maven坐标;

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.4</version>
</dependency>

(2)增加配置文件,将Redisson注入到容器中;

@Configuration
public class RedissonConfig {

    @Bean
    public Redisson redisson() {
        Config config = new Config();
        //单机版
        //config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);

        //集群版
        config.useClusterServers()
                .addNodeAddress("redis://192.168.1.1:8001")
                .addNodeAddress("redis://192.168.1.1:8002")
                .addNodeAddress("redis://192.168.1.2:8001")
                .addNodeAddress("redis://192.168.1.2:8002")
                .addNodeAddress("redis://192.168.1.3:8001")
                .addNodeAddress("redis://192.168.1.3:8002");
        return (Redisson) Redisson.create(config);
    }
}

(3)分布式锁的实现

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired private Redisson redisson;

    public String deduceStock() {
        String lockKey = "lockKey";
        RLock redissonLock = redisson.getLock(lockKey);

        try {
            //加锁(超时默认30s), 实现锁续命的功能(后台启动一个timer, 默认每10s检测一次是否持有锁) redissonLock.lock();

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            //解锁 redissonLock.unlock();
        }
        return "success";
    }
}

Redisson的实现原理

RedissonLock的使用介绍 

Redisson的官网:https://redisson.org/

// 锁默认有效时间30秒,每10秒去检查并重新设置超时时间
void lock();  

// 超过锁有效时间 leaseTime,就会释放锁
void lock(long leaseTime, TimeUnit unit);

// 尝试获取锁;成功则返回true,失败则返回false
boolean tryLock();

// 不会去启动定时任务;在 time 时间内还没有获取到锁,则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

// 不会去启动定时任务;当 waitTime 的时间到了,还没有获取到锁则返回false;若获取到锁了,锁的有效时间设置为 leaseTime
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

  

 

Jedis和Redisson的比较 

Jedis提供了比Redisson更丰富的操作;

Redisson底层多使用 lua 脚本实现,对原子性的操作封装较好,尤其是在分布式锁上的封装;

 

Redis实现的分布式锁还会出现一点问题:

线程1加了锁去执行业务了,此时Redis的 master 挂掉了,还没有将数据同步到 slave 上。因为集群会选举一个新的 master 出来,但是新的 master 上并没有这个锁;线程2可以在新选举产生的 master 上去加锁,然后处理业务。

(1)针对以上问题,我们可以使用 zookeeper 去实现分布式锁,因为它是强一致性的。但是zookeeper的性能是低于Redis,使用Redis是完全够了。

(2)当然,对于以上的问题,我们也可以使用 RedLock 去解决Redis上的那个问题,RedLock 实现的原理:给多个Redis节点发送加锁的消息,只有超过一半以上的节点加锁成功才算加锁成功。

但是不推荐使用RedLock,当前的 RedLock 是有bug的,它的实现原理和 zookeeper 是差不多的。

 

高并发的高性能的Redis

怎么在高并发的场景去实现一个高性能的分布式锁呢?

电商网站在大促的时候并发量很大:

(1)若抢购不是同一个商品,则可以增加Redis集群的cluster来实现,因为不是同一个商品,所以通过计算 key 的hash会落到不同的 cluster上;

(2)若抢购的是同一个商品,则计算key的hash值会落同一个cluster上,所以加机器也是么有用的。

我们可以使用库存分段锁的方式去实现。

分段锁

  假如产品1有200个库存,我们可以将这200个库存分为10个段存储(每段20个),每段存储到一个cluster上;将key使用hash计算,使这些key最后落在不同的cluster上。

  每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。

可以参照 ConcurrentHashMap 的源码去实现,它使用的就是分段锁。

高性能分布式锁参考链接:https://blog.csdn.net/eluanshi12/article/details/84616173

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM