1. 分布式锁的特点
- 在分布式环境下,一个资源在同一时间只能被一个机器上的一个线程获取
- 高可用的获取锁和释放锁
- 高性能的获取锁和释放锁
- 具备可重入特性
- 具备锁实现机制,防止死锁
- 具备非阻塞锁特性,获取不到值直接返回
2. 分布式锁的实现方式
分布式主要有三种主流的实现方式:
(1)基于数据库实现的分布式锁:采用乐观锁、悲观锁或者基于主键唯一约束实现
(2)基于分布式缓存实现的分布式锁:redis和基于redis的redlock
(3)基于分布式一致性算法实现的分布式锁:zookeeper、etcd
每种分布式锁都有其所适用的生产环境,同时特各有利弊:
- 数据库实现的分布式锁性能较差,而且不支持过期,但是不会引入更多的中间件
- 缓存实现的分布式锁高性能,支持非阻塞,适用大并发的场景
- etcd实现的分布式锁具备阻塞特性,适用于服务发现和注册、任务调度等
3. 基于etcd的分布式锁实现机制
etcd 支持以下功能,正是依赖这些功能来实现分布式锁的:
- Lease机制:即租约机制(TTL,Time To Live),etcd可以为存储的kv对设置租约,当租约到期,kv将失效删除;同时也支持续约,keepalive
- Revision机制:每个key带有一个Revision属性值,etcd每进行一次事务对应的全局Revision值都会+1,因此每个key对应的Revision属性值都是全局唯一的。通过比较Revision的大小就可以知道进行写操作的顺序
- 在实现分布式锁时,多个程序同时抢锁,根据Revision值大小依次获得锁,避免“惊群效应”,实现公平锁
- Prefix机制:也称为目录机制,可以根据前缀获得该目录下所有的key及其对应的属性值
- watch机制:watch支持watch某个固定的key或者一个前缀目录,当watch的key发生变化,客户端将收到通知
4. 基于etcd的分布式锁的实现过程
- 步骤 1: 准备
客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1",第二个为 key="/lock/mylock/UUID2";客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定,假设为 15s;
- 步骤 2: 创建定时任务作为租约的“心跳”
当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。
- 步骤 3: 客户端将自己全局唯一的 key 写入 Etcd
进行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。
- 步骤 4: 客户端判断是否获得锁
客户端以前缀 /lock/mylock 读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。
- 步骤 5: 执行业务
获得锁后,操作共享资源,执行业务代码。
- 步骤 6: 释放锁
完成业务流程后,删除对应的key释放锁。
5. go实现etcd分布式锁
func main() {
config := clientv3.Config{
Endpoints: []string{"xxx.xxx.xxx.xxx:2379"},
DialTimeout: 5 * time.Second,
}
// 获取客户端连接
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 1. 上锁(创建租约,自动续租,拿着租约去抢占一个key )
// 用于申请租约
lease := clientv3.NewLease(client)
// 申请一个10s的租约
leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
if err != nil {
fmt.Println(err)
return
}
// 拿到租约的id
leaseID := leaseGrantResp.ID
// 准备一个用于取消续租的context
ctx, cancelFunc := context.WithCancel(context.TODO())
// 确保函数退出后,自动续租会停止
defer cancelFunc()
// 确保函数退出后,租约会失效
defer lease.Revoke(context.TODO(), leaseID)
// 自动续租
keepRespChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
fmt.Println(err)
return
}
// 处理续租应答的协程
go func() {
select {
case keepResp := <-keepRespChan:
if keepRespChan == nil {
fmt.Println("lease has expired")
goto END
} else {
// 每秒会续租一次
fmt.Println("收到自动续租应答", keepResp.ID)
}
}
END:
}()
// if key 不存在,then设置它,else抢锁失败
kv := clientv3.NewKV(client)
// 创建事务
txn := kv.Txn(context.TODO())
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job7"), "=", 0)).
Then(clientv3.OpPut("/cron/jobs/job7", "", clientv3.WithLease(leaseID))).
Else(clientv3.OpGet("/cron/jobs/job7")) //如果key存在
// 提交事务
txnResp, err := txn.Commit()
if err != nil {
fmt.Println(err)
return
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2. 处理业务(锁内,很安全)
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3. 释放锁(取消自动续租,释放租约)
// defer会取消续租,释放锁
}