1. 分布式鎖的特點
- 在分布式環境下,一個資源在同一時間只能被一個機器上的一個線程獲取
- 高可用的獲取鎖和釋放鎖
- 高性能的獲取鎖和釋放鎖
- 具備可重入特性
- 具備鎖實現機制,防止死鎖
- 具備非阻塞鎖特性,獲取不到值直接返回
2. 分布式鎖的實現方式
分布式主要有三種主流的實現方式:
(1)基於數據庫實現的分布式鎖:采用樂觀鎖、悲觀鎖或者基於主鍵唯一約束實現
(2)基於分布式緩存實現的分布式鎖:redis和基於redis的redlock
(3)基於分布式一致性算法實現的分布式鎖:zookeeper、etcd
每種分布式鎖都有其所適用的生產環境,同時特各有利弊:
- 數據庫實現的分布式鎖性能較差,而且不支持過期,但是不會引入更多的中間件
- 緩存實現的分布式鎖高性能,支持非阻塞,適用大並發的場景
- etcd實現的分布式鎖具備阻塞特性,適用於服務發現和注冊、任務調度等
3. 基於etcd的分布式鎖實現機制
etcd 支持以下功能,正是依賴這些功能來實現分布式鎖的:
- Lease機制:即租約機制(TTL,Time To Live),etcd可以為存儲的kv對設置租約,當租約到期,kv將失效刪除;同時也支持續約,keepalive
- Revision機制:每個key帶有一個Revision屬性值,etcd每進行一次事務對應的全局Revision值都會+1,因此每個key對應的Revision屬性值都是全局唯一的。通過比較Revision的大小就可以知道進行寫操作的順序
- 在實現分布式鎖時,多個程序同時搶鎖,根據Revision值大小依次獲得鎖,避免“驚群效應”,實現公平鎖
- Prefix機制:也稱為目錄機制,可以根據前綴獲得該目錄下所有的key及其對應的屬性值
- watch機制:watch支持watch某個固定的key或者一個前綴目錄,當watch的key發生變化,客戶端將收到通知
4. 基於etcd的分布式鎖的實現過程
- 步驟 1: 准備
客戶端連接 Etcd,以 /lock/mylock 為前綴創建全局唯一的 key,假設第一個客戶端對應的 key="/lock/mylock/UUID1",第二個為 key="/lock/mylock/UUID2";客戶端分別為自己的 key 創建租約 - Lease,租約的長度根據業務耗時確定,假設為 15s;
- 步驟 2: 創建定時任務作為租約的“心跳”
當一個客戶端持有鎖期間,其它客戶端只能等待,為了避免等待期間租約失效,客戶端需創建一個定時任務作為“心跳”進行續約。此外,如果持有鎖期間客戶端崩潰,心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。
- 步驟 3: 客戶端將自己全局唯一的 key 寫入 Etcd
進行 put 操作,將步驟 1 中創建的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,假設兩個客戶端 put 操作返回的 Revision 分別為 1、2,客戶端需記錄 Revision 用以接下來判斷自己是否獲得鎖。
- 步驟 4: 客戶端判斷是否獲得鎖
客戶端以前綴 /lock/mylock 讀取 keyValue 列表(keyValue 中帶有 key 對應的 Revision),判斷自己 key 的 Revision 是否為當前列表中最小的,如果是則認為獲得鎖;否則監聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖。
- 步驟 5: 執行業務
獲得鎖后,操作共享資源,執行業務代碼。
- 步驟 6: 釋放鎖
完成業務流程后,刪除對應的key釋放鎖。
5. go實現etcd分布式鎖
func main() {
config := clientv3.Config{
Endpoints: []string{"xxx.xxx.xxx.xxx:2379"},
DialTimeout: 5 * time.Second,
}
// 獲取客戶端連接
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
// 1. 上鎖(創建租約,自動續租,拿着租約去搶占一個key )
// 用於申請租約
lease := clientv3.NewLease(client)
// 申請一個10s的租約
leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
if err != nil {
fmt.Println(err)
return
}
// 拿到租約的id
leaseID := leaseGrantResp.ID
// 准備一個用於取消續租的context
ctx, cancelFunc := context.WithCancel(context.TODO())
// 確保函數退出后,自動續租會停止
defer cancelFunc()
// 確保函數退出后,租約會失效
defer lease.Revoke(context.TODO(), leaseID)
// 自動續租
keepRespChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
fmt.Println(err)
return
}
// 處理續租應答的協程
go func() {
select {
case keepResp := <-keepRespChan:
if keepRespChan == nil {
fmt.Println("lease has expired")
goto END
} else {
// 每秒會續租一次
fmt.Println("收到自動續租應答", keepResp.ID)
}
}
END:
}()
// if key 不存在,then設置它,else搶鎖失敗
kv := clientv3.NewKV(client)
// 創建事務
txn := kv.Txn(context.TODO())
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job7"), "=", 0)).
Then(clientv3.OpPut("/cron/jobs/job7", "", clientv3.WithLease(leaseID))).
Else(clientv3.OpGet("/cron/jobs/job7")) //如果key存在
// 提交事務
txnResp, err := txn.Commit()
if err != nil {
fmt.Println(err)
return
}
// 判斷是否搶到了鎖
if !txnResp.Succeeded {
fmt.Println("鎖被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2. 處理業務(鎖內,很安全)
fmt.Println("處理任務")
time.Sleep(5 * time.Second)
// 3. 釋放鎖(取消自動續租,釋放租約)
// defer會取消續租,釋放鎖
}