3篇關於分布式鎖的文章,可以結合看:
consul實現分布式鎖:https://www.cnblogs.com/jiujuan/p/10527786.html
redis實現分布式鎖:https://www.cnblogs.com/jiujuan/p/10595838.html
etcd實現分布式鎖:https://www.cnblogs.com/jiujuan/p/12147809.html
分布式一致性問題:
分布式的CAP理論告訴我們“任何一個分布式系統都無法同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多只能同時滿足兩項。”所以,很多系統在設計之初就要對這三者做出取舍。在互聯網領域的絕大多數的場景中,都需要犧牲強一致性來換取系統的高可用性,系統往往只需要保證“最終一致性”,只要這個最終時間是在用戶可以接受的范圍內即可
在很多場景中,我們為了保證數據的最終一致性,需要很多的技術方案來支持,比如分布式事務
、分布式鎖
等。有的時候,我們需要保證一個方法在同一時間內只能被同一個線程執行
分布式鎖:
是在分布式系統之間同步訪問共享資源的一種方式。
不同的系統或者同一個系統不同的主機共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾,從而保證數據的一致性,怎么保證數據的一致性,就用到了分布式鎖
那么用鎖來解決資源搶占時,又有哪些問題:
1、死鎖
死鎖是在並發編程中理論上都會出現的問題
什么是死鎖:
搶占資源的各方,彼此都在等待對方釋放資源,以便自己能獲取系統資源,但是沒有哪一方退出,這時候就死鎖了
產生死鎖的4個條件:
- 互斥條件
- 不可搶占條件
- 占用並申請條件
- 循環等待條件
解決方法:
解決死鎖的問題只要解決了上面的4個條件之一即可。那怎么解決呢?
一般用 session + TTL 打破循環等待條件
當一個客戶端嘗試操作一把分布式鎖的時候,我們必須校驗其 session 是否為鎖的擁有者,不是則無法進行操作。
當一個客戶端已經持有一把分布式鎖后,發生了掉線**,在超出了 TTL 時間后無法連接上,則回收其鎖的擁有權。
2、驚群效應
什么是驚群效應?
簡單說來,多線程/多進程等待同一個socket事件,當這個事件發生時,這些線程/進程被同時喚醒,就是驚群。可以想見,效率很低下,許多進程被內核重新調度喚醒,同時去響應這一個事件,當然只有一個進程能處理事件成功,其他的進程在處理該事件失敗后重新休眠(也有其他選擇)。這種性能浪費現象就是驚群。
為了更好的理解何為驚群,舉一個很簡單的例子,當你往一群鴿子中間扔一粒谷子,所有的各自都被驚動前來搶奪這粒食物,但是最終注定只可能有一個鴿子滿意的搶到食物,沒有搶到的鴿子只好回去繼續睡覺,等待下一粒谷子的到來。這里鴿子表示進程(線程),那粒谷子就是等待處理的事件
解決方法:
為了避免發生驚群效應, Nginx 和 ZooKeeper 分別使用了不同的方案解決,但是他們的核心解決思路都是一致的,
下面我們來看看 ZooKeeper 是怎么解決 驚群效應 的。
我們都知道,在 ZooKeeper 內部會使用臨時目錄節點的形式創建分布式鎖,其中每個節點對應一個客戶端的申請鎖請求。
當客戶端來請求該鎖的時候, ZooKeeper 會生成一個
${lock-name}-${i}
的臨時目錄,此后每次請求該鎖的時候,就會生成${lock-name}-${i+1}
的目錄,如果此時在${lock-name}
中擁有最小的i
的客戶端會獲得該鎖,而該客戶端使用結束之后,就會刪除掉自己的臨時目錄,並通知后續節點進行鎖的獲取。
沒錯,這個 i
是 ZooKeeper 解決驚群效應的利器,它被稱為 順序節點
Nginx怎么解決驚群
Nginx中處理epoll驚群問題的思路很簡單,多個子進程有一個鎖,誰拿到鎖,誰才將accept的fd加入到epoll隊列中,其他的子進程拿不到鎖,也就不會將fd加入到epoll中,連接到來也就不會導致所有子進程的epoll被喚醒返回
3、腦裂(brain-split)
什么是腦裂?
腦裂主要是仲裁之間網絡中斷或不穩定導致
當集群中出現 腦裂 的時候,往往會出現多個 master 的情況,這樣數據的一致性會無法得到保障,從而導致整個服務無法正常運行
解決方法:
- 可以將集群中的服務作為 P2P 節點,避免 Leader 與 Salve 的切換
- 向客戶端發起重試,如果一段時間后依然無法連接上,再讓下一個順序客戶端獲取鎖
consul怎么解決上面3個問題
Consul 是 Go 實現的一個輕量級 服務發現 、KV存儲 的工具,它通過強一致性的KV存儲實現了簡易的 分布式鎖 ,下面我們根據源碼看下 Consul 是怎么解決以上分布式鎖的難點的
// api/lock.go
// Lock 分布式鎖數據結構
type Lock struct {
c *Client // 提供訪問consul的API客戶端
opts *LockOptions // 分布式鎖的可選項
isHeld bool // 該鎖當前是否已經被持有
sessionRenew chan struct{} // 通知鎖持有者需要更新session
locksession string // 鎖持有者的session
l sync.Mutex // 鎖變量的互斥鎖
}
// LockOptions 提供分布式鎖的可選項參數
type LockOptions struct {
Key string // 鎖的 Key,必填項,且必須有 KV 的寫權限
Value []byte // 鎖的內容,以下皆為選填項
Session string // 鎖的session,用於判斷鎖是否被創建
SessionOpt *SessionEntry // 自定義創建session條目,用於創建session,避免驚群
SessionName string // 自定義鎖的session名稱,默認為 "Consul API Lock"
SessionTTL string // 自定義鎖的TTL時間,默認為 "15s"
MonitorRetries int // 自定義監控的重試次數,避免腦裂問題
MonitorRetryTime time.Duration // 自定義監控的重試時長,避免腦裂問題
LockWaitTime time.Duration // 自定義鎖的等待市場,避免死鎖問題
LockTryOnce bool // 是否只重試一次,默認為false,則為無限重試
}
驚群
SessionOpt *SessionEntry // 自定義創建session條目,用於創建session,避免驚群
死鎖
LockWaitTime time.Duration // 自定義鎖的等待市場,避免死鎖問題
腦裂
MonitorRetries int // 自定義監控的重試次數,避免腦裂問題
MonitorRetryTime time.Duration // 自定義監控的重試時長,避免腦裂問題
LockTryOnce bool // 是否只重試一次,默認為false,則為無限重試
從 LockOptions
中帶有 session / TTL / monitor / wait 等字眼的成員變量可以看出,consul 已經考慮到解決我們上一節提到的三個難點,下面來看看實現代碼中是如何使用的
先來看看生成可用的分布式鎖的函數 LockOpts
:
// api/lock.go
// LockOpts 通過傳入鎖的參數,返回一個可用的鎖
// 必須注意的是 opts.Key 必須在 KV 中有寫權限
func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
if opts.Key == "" {
return nil, fmt.Errorf("missing key")
}
if opts.SessionName == "" {
opts.SessionName = DefaultLockSessionName // "Consul API Lock"
}
if opts.SessionTTL == "" {
opts.SessionTTL = DefaultLockSessionTTL // "15s"
} else {
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
if opts.MonitorRetryTime == 0 {
opts.MonitorRetryTime = DefaultMonitorRetryTime // 2 * time.Second
}
if opts.LockWaitTime == 0 {
opts.LockWaitTime = DefaultLockWaitTime // 15 * time.Second
}
l := &Lock{
c: c,
opts: opts,
}
return l, nil
}
我們可以在這個函數中可以注意到:
- 15s 的 SessionTTL 用於解決死鎖、腦裂問題。
- 2s 的 MonitorRetryTime 是一個長期運行的協程用於監聽當前鎖持有者,用於解決腦裂問題。
- 15s 的 LockWaitTime 用於設置嘗試獲取鎖的超時時間,用於解決死鎖問題。
Lock
有3個可供其他包訪問的函數,分別為 Lock
/ Unlock
/ Destroy
,下面按照順序展開細說
Lock()函數
// Lock嘗試獲取一個可用的鎖,可以通過一個非空的 stopCh 來提前終止獲取
// 如果返回的鎖發生異常,則返回一個被關閉的 chan struct ,應用程序必須要處理該情況
func (l *Lock) Lock(stopCh <-chan struct) (<-chan struct{}, error) {
// 先鎖定本地互斥鎖
l.l.Lock()
defer l.l.Unlock()
// 本地已經獲取到分布式鎖了
if l.isHeld {
return nil, ErrLockHeld
}
// 檢查是否需要創建session
l.lockSession = l.opts.Session
if l.lockSession == "" {
s, err := l.createSession()
if err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
}
l.sessionRenew = make(chan struct{})
l.lockSession = s
session := l.c.Session()
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
// 如果我們無法鎖定該分布式鎖,清除本地session
defer func() {
if !l.isHeld {
close(l.sessionRenew)
l.sessionRenew = nil
}
}()
// 准備向consul KV發送查詢鎖操作的參數
kv := l.c.KV()
qOpts := &QueryOptions{
WaitTime: l.opts.LockWaitTime,
}
start := time.Now()
attempts := 0
WAIT:
// 判斷是否需要退出鎖爭奪的循環
select {
case <-stopCh:
return nil, nil
default:
}
// 處理只重試一次的邏輯
if l.opts.LockTryOnce && attempts > 0 { // 配置該鎖只重試一次且已經重試至少一次了
elapsed := time.Since(start) // 獲取當前時間偏移量
if elapsed > qOpts.WaitTime { // 當超過設置中的剩余等待時間
return nil, nil // 返回空結果
}
qOpts.WaitTime -= elapsed // 重設剩余等待時間
}
attempts++ // 已嘗試次數自增1
// 阻塞查詢該存在的分布式鎖,直至無法獲取成功
pair, meta, err := kv.Get(l.opts.Key, qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
}
}
Unlock()函數
// Unlock 嘗試釋放 consul 分布式鎖,如果發生異常則返回 error
func (l *Lock) Unlock() error {
// 在釋放鎖之前必須先把 Lock 結構鎖住
l.l.Lock()
defer l.l.Unlock()
// 確認我們依然持有該鎖
if !isHeld {
return ErrLockNotHeld
}
// 提前先將鎖的持有權釋放
l.isHeld = false
// 清除刷新 session 通道
if l.sessionRenew != nil {
defer func() {
close(l.sessionRenew)
l.sessionRenew = nil
}()
}
// 獲取當前 session 持有的鎖信息
lockEnt := l.lockEntry(l.lockSession)
l.lockSession = ""
kv := l.c.KV()
_, _, err := kv.Release(lockEnt, nil) // 將持有的鎖嘗試釋放
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}
return nil
}
Destry()函數
// Destroy 嘗試銷毀分布式鎖,雖然這不是必要的操作。
// 如果該鎖正在被使用,則返回error
func (l *Lock) Destroy() error {
// 在釋放鎖之前必須先把 Lock 結構鎖住
l.l.Lock()
defer l.l.Unlock()
// 確認我們依然持有該鎖
if !isHeld {
return ErrLockNotHeld
}
// 獲取鎖
kv := l.c.KV()
pair, _, err := kv.Get(l.opts.Key, nil)
if err != nil {
return fmt.Errorf("failed to read lock: %v", err)
}
if pair == nil {
return nil
}
// 檢查是否有可能狀態沖突
if pair.Flags != LockFlagValue {
return ErrLockConflict
}
// 如果鎖正在被持有,則返回異常
if pair.Session != "" {
return ErrLockUse
}
// 嘗試通過 CAS 刪除分布式鎖
didRemove, _, err := kv.DeleteCAS(pair, nil)
if err != nil {
return fmt.Errorf("failed to remove lock: %v", err)
}
if !didRemove { // 如果沒有刪除成功,則返回異常
return ErrLockInUse
}
return nil
}
用golang實現的小demo
package main
import (
api "github.com/hashicorp/consul/api"
"github.com/satori/go.uuid"
"log"
)
func main() {
client, err := api.NewClient(&api.Config{
Address: "127.0.0.1:8500",
})
lockKey := "demo-lock-key"
lock, err := client.lockOpts(&api.LockOptions{
Key: lockKey,
Value: []byte("sender 1"),
SessionTTL: "10s",
SessionOpts: &spi.SessionEntry{
Checks: []string{"check1", "check2"},
Behavior: "release",
},
SessionName: uuid.Must(uuid.NewV4()),
})
if err != nil {
log.Fatalf("failed to created lock %v", err)
}
result, err := lock.Lock(nil)
if err != nil {
log.Fatalf("failed to accquired lock")
}
}