使用场景
首先,我们看这样一个场景:客户下单的时候,我们调用库存中心进行减库存,那我们一般的操作都是:
update store set num = $num where id = $id
这种通过设置库存的修改方式,我们知道在并发量高的时候会存在数据库的丢失更新,比如 a, b 当前两个事务,查询出来的库存都是 5,a 买了 3 个单子要把库存设置为 2,而 b 买了 1 个单子要把库存设置为 4,那这个时候就会出现 a 会覆盖 b 的更新,所以我们更多的都是会加个条件:
update store set num = $num where id = $id and num = $query_num
即乐观锁的方式来处理,当然也可以通过版本号来处理乐观锁,都是一样的,但是这是更新一个表,如果我们牵扯到多个表呢,我们希望和这个单子关联的所有的表同一时间只能被一个线程来处理更新,多个线程按照不同的顺序去更新同一个单子关联的不同数据,出现死锁的概率比较大。对于非敏感的数据,我们也没有必要去都加乐观锁处理,我们的服务都是多机器部署的,要保证多进程多线程同时只能有一个进程的一个线程去处理,这个时候我们就需要用到分布式锁。分布式锁的实现方式有很多,我们今天分别通过数据库,Zookeeper, Redis 以及 Tair 的实现逻辑。
(一)数据库实现
加 xx 锁
更新一个单子关联的所有的数据,先查询出这个单子,并加上排他锁,在进行一系列的更新操作
begin transaction; select ...for update; doSomething(); commit();
这种处理主要依靠排他锁来阻塞其他线程,不过这个需要注意几点:
- 查询的数据一定要在数据库里存在,如果不存在的话,数据库会加 gap 锁,而 gap 锁之间是兼容的,这种如果两个线程都加了gap 锁,另一个再更新的话会出现死锁。不过一般能更新的数据都是存在的
- 后续的处理流程需要尽可能的时间短,即在更新的时候提前准备好数据,保证事务处理的时间足够的短,流程足够的短,因为开启事务是一直占着连接的,如果流程比较长会消耗过多的数据库连接的
唯一键
通过在一张表里创建唯一键来获取锁,比如执行 saveStore 这个方法
insert table lock_store ('method_name') values($method_name)
其中 method_name
是个唯一键,通过这种方式也可以做到,解锁的时候直接删除改行记录就行。不过这种方式,锁就不会是阻塞式的,因为插入数据是立马可以得到返回结果的。
那针对以上数据库实现的两种分布式锁,存在什么样的优缺点呢?
优点
简单,方便,快速实现
缺点
- 基于数据库,开销比较大,性能可能会存在影响
- 基于数据库的当前读来实现,数据库会在底层做优化,可能用到索引,可能不用到索引,这个依赖于查询计划的分析
(二)Zookeeper 实现
获取锁
- 先有一个锁跟节点,lockRootNode,这可以是一个永久的节点
- 客户端获取锁,先在 lockRootNode 下创建一个顺序的瞬时节点,保证客户端断开连接,节点也自动删除
- 调用 lockRootNode 父节点的 getChildren() 方法,获取所有的节点,并从小到大排序,如果创建的最小的节点是当前节点,则返回 true,获取锁成功,否则,关注比自己序号小的节点的释放动作(exist watch),这样可以保证每一个客户端只需要关注一个节点,不需要关注所有的节点,避免羊群效应。
- 如果有节点释放操作,重复步骤 3
释放锁
只需要删除步骤 2 中创建的节点即可
使用 Zookeeper 的分布式锁存在什么样的优缺点呢?
优点
- 客户端如果出现宕机故障的话,锁可以马上释放
- 可以实现阻塞式锁,通过 watcher 监听,实现起来也比较简单
- 集群模式,稳定性比较高
缺点
- 一旦网络有任何的抖动,Zookeeper 就会认为客户端已经宕机,就会断掉连接,其他客户端就可以获取到锁。当然 Zookeeper 有重试机制,这个就比较依赖于其重试机制的策略了
- 性能上不如缓存
(三)Redis 实现
我们先举个例子,比如现在我要更新产品的信息,产品的唯一键就是 productId
#简单实现 1
public boolean lock(String key, V v, int expireTime){ int retry = 0; //获取锁失败最多尝试10次 while (retry < failRetryTimes){ //获取锁 Boolean result = redis.setNx(key, v, expireTime); if (result){ return true; } try { //获取锁失败间隔一段时间重试 TimeUnit.MILLISECONDS.sleep(sleepInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } return false; } public boolean unlock(String key){ return redis.delete(key); } public static void main(String[] args) { Integer productId = 324324; RedisLock<Integer> redisLock = new RedisLock<Integer>(); redisLock.lock(productId+"", productId, 1000); } }
这是一个简单的实现,存在的问题:
- 可能会导致当前线程的锁误被其他线程释放,比如 a 线程获取到了锁正在执行,但是由于内部流程处理超时或者 gc 导致锁过期,这个时候b线程获取到了锁,a 和 b 线程处理的是同一个 productId,b还在处理的过程中,这个时候 a 处理完了,a 去释放锁,可能就会导致 a 把 b 获取的锁释放了。
- 不能实现可重入
- 客户端如果第一次已经设置成功,但是由于超时返回失败,此后客户端尝试会一直失败
针对以上问题我们改进下:
- v 传 requestId,然后我们在释放锁的时候判断一下,如果是当前 requestId,那就可以释放,否则不允许释放
- 加入 count 的锁计数,在获取锁的时候查询一次,如果是当前线程已经持有的锁,那锁技术加 1,直接返回 true
#简单实现 2
private static volatile int count = 0; public boolean lock(String key, V v, int expireTime){ int retry = 0; //获取锁失败最多尝试10次 while (retry < failRetryTimes){ //1.先获取锁,如果是当前线程已经持有,则直接返回 //2.防止后面设置锁超时,其实是设置成功,而网络超时导致客户端返回失败,所以获取锁之前需要查询一下 V value = redis.get(key); //如果当前锁存在,并且属于当前线程持有,则锁计数+1,直接返回 if (null != value && value.equals(v)){ count ++; return true; } //如果锁已经被持有了,那需要等待锁的释放 if (value == null || count <= 0){ //获取锁 Boolean result = redis.setNx(key, v, expireTime); if (result){ count = 1; return true; } } try { //获取锁失败间隔一段时间重试 TimeUnit.MILLISECONDS.sleep(sleepInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } return false; } public boolean unlock(String key, String requestId){ String value = redis.get(key); if (Strings.isNullOrEmpty(value)){ count = 0; return true; } //判断当前锁的持有者是否是当前线程,如果是的话释放锁,不是的话返回false if (value.equals(requestId)){ if (count > 1){ count -- ; return true; } boolean delete = redis.delete(key); if (delete){ count = 0; } return delete; } return false; } public static void main(String[] args) { Integer productId = 324324; RedisLock<String> redisLock = new RedisLock<String>(); String requestId = UUID.randomUUID().toString(); redisLock.lock(productId+"", requestId, 1000)