前言
分布式鎖要解決兩個問題:
1、鎖競爭
2、死鎖
以redis為例,redis提供了setnx來保證原子寫入,只有一個客戶端能寫入成功,也就能成功獲得鎖。同時為了防止客戶端異常導致鎖沒有及時釋放,可以對這個鎖設置過期s時間,命令如下:
SET lock_name my_random_value NX PX 30000
除了鎖自動過期以外,還需要能手動釋放鎖,命令如下:
del lock_name
etcd的實現方式
etcd提供了以下幾種特性來實現分布式鎖:
-
Lease機制
租約機制(TTL,Time To Live),etcd 可以為存儲的 key-value 對設置租約,當租約到期,key-value 將失效刪除;
同時也支持續約,通過客戶端可以在租約到期之前續約,以避免 key-value 對過期失效。
Lease機制可以保證分布式鎖的安全性,為鎖對應的 key 配置租約,即使鎖的持有者因故障而不能主動釋放鎖,鎖也會因租約到期而自動釋放。
-
Revision機制
每個 key 帶有一個 Revision 號,每進行一次事務便+1,它是全局唯一的,通過 Revision 的大小就可以知道進行寫操作的順序。
在實現分布式鎖時,多個客戶端同時搶鎖,根據 Revision 號大小依次獲得鎖,可以避免 “羊群效應” ,實現公平鎖。這和zookeeper的臨時順序節點+監聽機制可避免羊群效應的原理是一致的。
-
Prefix機制
即前綴機制。例如,一個名為 /etcd/lock 的鎖,兩個爭搶它的客戶端進行寫操作,實際寫入的 key 分別為:key1="/etcd/lock/UUID1",key2="/etcd/lock/UUID2"。其中,UUID 表示全局唯一的 ID,確保兩個 key 的唯一性。
寫操作都會成功,但返回的 Revision 不一樣,那么,如何判斷誰獲得了鎖呢?通過前綴 /etcd/lock 查詢,返回包含兩個 key-value 對的的 KeyValue 列表,同時也包含它們的 Revision,通過 Revision 大小,客戶端可以判斷自己是否獲得鎖 。
-
Watch機制
即監聽機制。Watch 機制支持 Watch 某個固定的 key,也支持 Watch 一個范圍(前綴機制)。當被 Watch 的 key 或范圍發生變化,客戶端將收到通知;在實現分布式鎖時,如果搶鎖失敗,可通過 Prefix 機制返回的 Key-Value 列表獲得 Revision 比自己小且相差最小的 key(稱為 pre-key),對 pre-key 進行監聽,因為只有它釋放鎖,自己才能獲得鎖,如果 Watch 到 pre-key 的 DELETE 事件,則說明 pre-key 已經釋放,自己將持有鎖。
實現流程如下:
-
建立連接
客戶端連接 etcd,以 /etcd/lock 為前綴創建全局唯一的 key,假設第一個客戶端對應的 key="/etcd/lock/UUID1",第二個為 key="/etcd/lock/UUID2";客戶端分別為自己的 key 創建租約 - Lease,租約的長度根據業務耗時確定。
-
創建定時任務作為租約的“心跳”
當一個客戶端持有鎖期間,其它客戶端只能等待,為了避免等待期間租約失效,客戶端需創建一個定時任務作為“心跳”進行續約。此外,如果持有鎖期間客戶端崩潰,心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。 -
客戶端將自己全局唯一的 key 寫入 etcd
執行 put 操作,將步驟 1 中創建的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,
假設兩個客戶端 put 操作返回的 Revision 分別為 1、2,客戶端需記錄 Revision 用來判斷自己是否獲得鎖。 -
客戶端判斷是否獲得鎖
客戶端以前綴 /etcd/lock/ 讀取 key-Value 列表,判斷自己 key 的 Revision 是否為當前列表中最小的,如果是則認為獲得鎖;否則監聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖。 -
執行業務
獲得鎖后,操作共享資源,執行業務代碼。
Demo
官方包里(github.com/coreos/etcd/clientv3/concurrency)已經實現了上述流程,我們只需要做下簡單的調用就可以實現分布式鎖了。
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"os"
"os/signal"
"time"
)
func main() {
c := make(chan os.Signal)
signal.Notify(c)
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
prefix := "/lock"
go func () {
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
m := concurrency.NewMutex(session, prefix)
if err := m.Lock(context.TODO()); err != nil {
log.Fatal("go1 get mutex failed " + err.Error())
}
fmt.Printf("go1 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(10) * time.Second)
m.Unlock(context.TODO())
fmt.Printf("go1 release lock\n")
}()
go func() {
time.Sleep(time.Duration(2) * time.Second)
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
m := concurrency.NewMutex(session, prefix)
if err := m.Lock(context.TODO()); err != nil {
log.Fatal("go2 get mutex failed " + err.Error())
}
fmt.Printf("go2 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(2) * time.Second)
m.Unlock(context.TODO())
fmt.Printf("go2 release lock\n")
}()
<-c
}
上述代碼里起了兩個協程,分別實例化兩個session及Mutex對象,並執行加鎖和釋放鎖的操作,下面看看源碼包里是怎么進行加鎖的。
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
首先通過一個事務來嘗試加鎖,這個事務主要包含了4個操作: cmp、put、get、getOwner。需要注意的是,key是由pfx和Lease()組成的。
- cmp: 比較加鎖的key的修訂版本是否是0。如果是0就代表這個鎖不存在。
- put: 向加鎖的key中存儲一個空值,這個操作就是一個加鎖的操作,但是這把鎖是有超時時間的,超時的時間是session的默認時長。超時是為了防止鎖沒有被正常釋放導致死鎖。
- get: get就是通過key來查詢
- getOwner: 注意這里是用m.pfx來查詢的,並且帶了查詢參數WithFirstCreate(),它以m.pfx為前綴去查詢所有key,根據創建version正排序,取最前面的一個值,即最早的那個key ,即Revsion最小的那個。
接下來才是通過判斷來檢查是否持有鎖
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
m.myRev是當前的版本號,resp.Succeeded是cmp為true時值為true,否則是false。這里的判斷表明當同一個session非第一次嘗試加鎖,當前的版本號應該取這個key的最新的版本號。
然后判斷getOwner返回的最小key,如如果沒有這個key或者,或者這個key版本號和當前的版本號一致,則獲取到這個鎖。
繼續往下走:
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
走到這里說明沒有獲取到鎖,那么這里等待鎖的刪除。
waitDeletes方法會監聽比當前會話版本號更低的key的刪除事件。一旦這個key刪除了,自己也就拿到鎖了。
總結
redis的實現方式會競爭同一個鎖,但在etcd里客戶端是拿自己key(鎖)的版本號和前綴里最小的版本號的key比較,如果相同則獲得鎖。