consul實現分布式鎖


3篇關於分布式鎖的文章,可以結合看:
consul實現分布式鎖:https://www.cnblogs.com/jiujuan/p/10527786.html
redis實現分布式鎖:https://www.cnblogs.com/jiujuan/p/10595838.html
etcd實現分布式鎖:https://www.cnblogs.com/jiujuan/p/12147809.html

分布式一致性問題:

分布式的CAP理論告訴我們“任何一個分布式系統都無法同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多只能同時滿足兩項。”所以,很多系統在設計之初就要對這三者做出取舍。在互聯網領域的絕大多數的場景中,都需要犧牲強一致性來換取系統的高可用性,系統往往只需要保證“最終一致性”,只要這個最終時間是在用戶可以接受的范圍內即可
在很多場景中,我們為了保證數據的最終一致性,需要很多的技術方案來支持,比如分布式事務分布式鎖等。有的時候,我們需要保證一個方法在同一時間內只能被同一個線程執行


分布式鎖:

是在分布式系統之間同步訪問共享資源的一種方式。
不同的系統或者同一個系統不同的主機共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾,從而保證數據的一致性,怎么保證數據的一致性,就用到了分布式鎖


那么用鎖來解決資源搶占時,又有哪些問題:

1、死鎖

死鎖是在並發編程中理論上都會出現的問題

什么是死鎖:
搶占資源的各方,彼此都在等待對方釋放資源,以便自己能獲取系統資源,但是沒有哪一方退出,這時候就死鎖了

產生死鎖的4個條件:

  • 互斥條件
  • 不可搶占條件
  • 占用並申請條件
  • 循環等待條件

解決方法:
解決死鎖的問題只要解決了上面的4個條件之一即可。那怎么解決呢?
一般用 session + TTL 打破循環等待條件

當一個客戶端嘗試操作一把分布式鎖的時候,我們必須校驗其 session 是否為鎖的擁有者,不是則無法進行操作。
當一個客戶端已經持有一把分布式鎖后,發生了
掉線**,在超出了 TTL 時間后無法連接上,則回收其鎖的擁有權。

2、驚群效應

什么是驚群效應?
簡單說來,多線程/多進程等待同一個socket事件,當這個事件發生時,這些線程/進程被同時喚醒,就是驚群。可以想見,效率很低下,許多進程被內核重新調度喚醒,同時去響應這一個事件,當然只有一個進程能處理事件成功,其他的進程在處理該事件失敗后重新休眠(也有其他選擇)。這種性能浪費現象就是驚群。

為了更好的理解何為驚群,舉一個很簡單的例子,當你往一群鴿子中間扔一粒谷子,所有的各自都被驚動前來搶奪這粒食物,但是最終注定只可能有一個鴿子滿意的搶到食物,沒有搶到的鴿子只好回去繼續睡覺,等待下一粒谷子的到來。這里鴿子表示進程(線程),那粒谷子就是等待處理的事件

解決方法:
為了避免發生驚群效應, NginxZooKeeper 分別使用了不同的方案解決,但是他們的核心解決思路都是一致的,

下面我們來看看 ZooKeeper 是怎么解決 驚群效應 的
我們都知道,在 ZooKeeper 內部會使用臨時目錄節點的形式創建分布式鎖,其中每個節點對應一個客戶端的申請鎖請求。

當客戶端來請求該鎖的時候, ZooKeeper 會生成一個 ${lock-name}-${i} 的臨時目錄,此后每次請求該鎖的時候,就會生成 ${lock-name}-${i+1} 的目錄,如果此時在 ${lock-name} 中擁有最小的 i 的客戶端會獲得該鎖,而該客戶端使用結束之后,就會刪除掉自己的臨時目錄,並通知后續節點進行鎖的獲取。

沒錯,這個 iZooKeeper 解決驚群效應的利器,它被稱為 順序節點


Nginx怎么解決驚群
Nginx中處理epoll驚群問題的思路很簡單,多個子進程有一個鎖,誰拿到鎖,誰才將accept的fd加入到epoll隊列中,其他的子進程拿不到鎖,也就不會將fd加入到epoll中,連接到來也就不會導致所有子進程的epoll被喚醒返回

3、腦裂(brain-split)

什么是腦裂?
腦裂主要是仲裁之間網絡中斷或不穩定導致

當集群中出現 腦裂 的時候,往往會出現多個 master 的情況,這樣數據的一致性會無法得到保障,從而導致整個服務無法正常運行

解決方法:

  1. 可以將集群中的服務作為 P2P 節點,避免 Leader 與 Salve 的切換
  2. 向客戶端發起重試,如果一段時間后依然無法連接上,再讓下一個順序客戶端獲取鎖



consul怎么解決上面3個問題

Consul 是 Go 實現的一個輕量級 服務發現 、KV存儲 的工具,它通過強一致性的KV存儲實現了簡易的 分布式鎖 ,下面我們根據源碼看下 Consul 是怎么解決以上分布式鎖的難點的

// api/lock.go

// Lock 分布式鎖數據結構
type Lock struct {
    c    *Client   // 提供訪問consul的API客戶端
    opts *LockOptions // 分布式鎖的可選項
    
    isHeld       bool          // 該鎖當前是否已經被持有
    sessionRenew chan struct{} // 通知鎖持有者需要更新session
    locksession  string        // 鎖持有者的session
    l            sync.Mutex    // 鎖變量的互斥鎖
}

// LockOptions 提供分布式鎖的可選項參數
type LockOptions struct {
    Key              string        // 鎖的 Key,必填項,且必須有 KV 的寫權限
    Value            []byte        // 鎖的內容,以下皆為選填項
    Session          string        // 鎖的session,用於判斷鎖是否被創建
    SessionOpt       *SessionEntry // 自定義創建session條目,用於創建session,避免驚群
    SessionName      string        // 自定義鎖的session名稱,默認為 "Consul API Lock"
    SessionTTL       string        // 自定義鎖的TTL時間,默認為 "15s"
    MonitorRetries   int           // 自定義監控的重試次數,避免腦裂問題
    MonitorRetryTime time.Duration // 自定義監控的重試時長,避免腦裂問題
    LockWaitTime     time.Duration // 自定義鎖的等待市場,避免死鎖問題
    LockTryOnce      bool          // 是否只重試一次,默認為false,則為無限重試
}

驚群

SessionOpt       *SessionEntry // 自定義創建session條目,用於創建session,避免驚群

死鎖

LockWaitTime     time.Duration // 自定義鎖的等待市場,避免死鎖問題

腦裂

MonitorRetries   int           // 自定義監控的重試次數,避免腦裂問題
MonitorRetryTime time.Duration // 自定義監控的重試時長,避免腦裂問題
LockTryOnce      bool               // 是否只重試一次,默認為false,則為無限重試

從 LockOptions 中帶有 session / TTL / monitor / wait 等字眼的成員變量可以看出,consul 已經考慮到解決我們上一節提到的三個難點,下面來看看實現代碼中是如何使用的

先來看看生成可用的分布式鎖的函數 LockOpts :

// api/lock.go

// LockOpts 通過傳入鎖的參數,返回一個可用的鎖
// 必須注意的是 opts.Key 必須在 KV 中有寫權限
func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
    if opts.Key == "" {
        return nil, fmt.Errorf("missing key")
    }
    if opts.SessionName == "" {
        opts.SessionName = DefaultLockSessionName // "Consul API Lock"
    }
    if opts.SessionTTL == "" {
        opts.SessionTTL = DefaultLockSessionTTL // "15s"
    } else {
        if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
            return nil, fmt.Errorf("invalid SessionTTL: %v", err)
        }
    }
    if opts.MonitorRetryTime == 0 {
        opts.MonitorRetryTime = DefaultMonitorRetryTime  // 2 * time.Second
    }
    if opts.LockWaitTime == 0 {
        opts.LockWaitTime = DefaultLockWaitTime   // 15 * time.Second
    }
    l := &Lock{
        c:    c,
        opts: opts,
    }
    return l, nil
}



我們可以在這個函數中可以注意到:

  • 15s 的 SessionTTL 用於解決死鎖、腦裂問題。
  • 2s 的 MonitorRetryTime 是一個長期運行的協程用於監聽當前鎖持有者,用於解決腦裂問題。
  • 15s 的 LockWaitTime 用於設置嘗試獲取鎖的超時時間,用於解決死鎖問題。

Lock 有3個可供其他包訪問的函數,分別為 Lock / Unlock / Destroy ,下面按照順序展開細說
Lock()函數

// Lock嘗試獲取一個可用的鎖,可以通過一個非空的 stopCh 來提前終止獲取
// 如果返回的鎖發生異常,則返回一個被關閉的 chan struct ,應用程序必須要處理該情況
func (l *Lock) Lock(stopCh <-chan struct) (<-chan struct{}, error) {
    // 先鎖定本地互斥鎖
    l.l.Lock()
    defer l.l.Unlock()
    
    // 本地已經獲取到分布式鎖了
    if l.isHeld {
        return nil, ErrLockHeld
    }
    
    // 檢查是否需要創建session
    l.lockSession = l.opts.Session
    if l.lockSession == "" {
        s, err := l.createSession()
        if err != nil {
            return nil, fmt.Errorf("failed to create session: %v", err)
        }
        
        l.sessionRenew = make(chan struct{})
        l.lockSession = s
        session := l.c.Session()
        go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
        
        // 如果我們無法鎖定該分布式鎖,清除本地session
        defer func() {
            if !l.isHeld {
                close(l.sessionRenew)
                l.sessionRenew = nil
            } 
        }()
        
        // 准備向consul KV發送查詢鎖操作的參數
        kv := l.c.KV()
        qOpts := &QueryOptions{
            WaitTime: l.opts.LockWaitTime,
        }
        
        start := time.Now()
        attempts := 0
WAIT:
        // 判斷是否需要退出鎖爭奪的循環
        select {
        case <-stopCh:
            return nil, nil
        default:
        }
        
        // 處理只重試一次的邏輯
        if l.opts.LockTryOnce && attempts > 0 { // 配置該鎖只重試一次且已經重試至少一次了
            elapsed := time.Since(start)  // 獲取當前時間偏移量
            if elapsed > qOpts.WaitTime { // 當超過設置中的剩余等待時間
                return nil, nil           // 返回空結果
            }
            
            qOpts.WaitTime -= elapsed  // 重設剩余等待時間
        }
        attempts++  // 已嘗試次數自增1
        
        // 阻塞查詢該存在的分布式鎖,直至無法獲取成功
        pair, meta, err := kv.Get(l.opts.Key, qOpts)
        if err != nil {
            return nil, fmt.Errorf("failed to read lock: %v", err)
        }
    }
}

Unlock()函數

// Unlock 嘗試釋放 consul 分布式鎖,如果發生異常則返回 error
func (l *Lock) Unlock() error {
    // 在釋放鎖之前必須先把 Lock 結構鎖住
    l.l.Lock()
    defer l.l.Unlock()
    
    // 確認我們依然持有該鎖
    if !isHeld {
        return ErrLockNotHeld
    }
    
    // 提前先將鎖的持有權釋放
    l.isHeld = false
    
    // 清除刷新 session 通道
    if l.sessionRenew != nil {
        defer func() {
            close(l.sessionRenew)
            l.sessionRenew = nil
        }()
    }
    
    // 獲取當前 session 持有的鎖信息
    lockEnt := l.lockEntry(l.lockSession)
    l.lockSession = ""
    
    kv := l.c.KV()
    _, _, err := kv.Release(lockEnt, nil) // 將持有的鎖嘗試釋放
    if err != nil {
        return fmt.Errorf("failed to release lock: %v", err)
    }
    return nil
}

Destry()函數

// Destroy 嘗試銷毀分布式鎖,雖然這不是必要的操作。
// 如果該鎖正在被使用,則返回error
func (l *Lock) Destroy() error {
    // 在釋放鎖之前必須先把 Lock 結構鎖住
    l.l.Lock()
    defer l.l.Unlock()
    
    // 確認我們依然持有該鎖
    if !isHeld {
        return ErrLockNotHeld
    }
    
    // 獲取鎖
    kv := l.c.KV()
    pair, _, err := kv.Get(l.opts.Key, nil)
    if err != nil {
        return fmt.Errorf("failed to read lock: %v", err)
    }
    
    if pair == nil {
        return nil
    }
    
    // 檢查是否有可能狀態沖突
    if pair.Flags != LockFlagValue {
        return ErrLockConflict
    }
    
    // 如果鎖正在被持有,則返回異常
    if pair.Session != "" {
        return ErrLockUse
    }
    
    // 嘗試通過 CAS 刪除分布式鎖
    didRemove, _, err := kv.DeleteCAS(pair, nil)
    if err != nil {
        return fmt.Errorf("failed to remove lock: %v", err)
    }
    if !didRemove { // 如果沒有刪除成功,則返回異常
        return ErrLockInUse
    }
    return nil
}

用golang實現的小demo

package main

import (
	api "github.com/hashicorp/consul/api"
	"github.com/satori/go.uuid"
	"log"
)

func main() {
	client, err := api.NewClient(&api.Config{
		Address: "127.0.0.1:8500",
	})

	lockKey := "demo-lock-key"

	lock, err := client.lockOpts(&api.LockOptions{
		Key:        lockKey,
		Value:      []byte("sender 1"),
		SessionTTL: "10s",
		SessionOpts: &spi.SessionEntry{
			Checks:   []string{"check1", "check2"},
			Behavior: "release",
		},
		SessionName: uuid.Must(uuid.NewV4()),
	})

	if err != nil {
		log.Fatalf("failed to created lock %v", err)
	}

	result, err := lock.Lock(nil)
	if err != nil {
		log.Fatalf("failed to accquired lock")
	}
}

參考:  https://www.jianshu.com/p/44307a394fe1,特別感謝


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM