etcd實現分布式鎖


前言

分布式鎖要解決兩個問題:

1、鎖競爭

2、死鎖

以redis為例,redis提供了setnx來保證原子寫入,只有一個客戶端能寫入成功,也就能成功獲得鎖。同時為了防止客戶端異常導致鎖沒有及時釋放,可以對這個鎖設置過期s時間,命令如下:

SET lock_name my_random_value NX PX 30000

除了鎖自動過期以外,還需要能手動釋放鎖,命令如下:

del lock_name

etcd的實現方式

etcd提供了以下幾種特性來實現分布式鎖:

  • Lease機制

    租約機制(TTL,Time To Live),etcd 可以為存儲的 key-value 對設置租約,當租約到期,key-value 將失效刪除;

    同時也支持續約,通過客戶端可以在租約到期之前續約,以避免 key-value 對過期失效。

    Lease機制可以保證分布式鎖的安全性,為鎖對應的 key 配置租約,即使鎖的持有者因故障而不能主動釋放鎖,鎖也會因租約到期而自動釋放。

  • Revision機制

    每個 key 帶有一個 Revision 號,每進行一次事務便+1,它是全局唯一的,通過 Revision 的大小就可以知道進行寫操作的順序。

    在實現分布式鎖時,多個客戶端同時搶鎖,根據 Revision 號大小依次獲得鎖,可以避免 “羊群效應” ,實現公平鎖。這和zookeeper的臨時順序節點+監聽機制可避免羊群效應的原理是一致的。

  • Prefix機制

    即前綴機制。例如,一個名為 /etcd/lock 的鎖,兩個爭搶它的客戶端進行寫操作,實際寫入的 key 分別為:key1="/etcd/lock/UUID1",key2="/etcd/lock/UUID2"。其中,UUID 表示全局唯一的 ID,確保兩個 key 的唯一性。

    寫操作都會成功,但返回的 Revision 不一樣,那么,如何判斷誰獲得了鎖呢?通過前綴 /etcd/lock 查詢,返回包含兩個 key-value 對的的 KeyValue 列表,同時也包含它們的 Revision,通過 Revision 大小,客戶端可以判斷自己是否獲得鎖 。

  • Watch機制

    即監聽機制。Watch 機制支持 Watch 某個固定的 key,也支持 Watch 一個范圍(前綴機制)。當被 Watch 的 key 或范圍發生變化,客戶端將收到通知;在實現分布式鎖時,如果搶鎖失敗,可通過 Prefix 機制返回的 Key-Value 列表獲得 Revision 比自己小且相差最小的 key(稱為 pre-key),對 pre-key 進行監聽,因為只有它釋放鎖,自己才能獲得鎖,如果 Watch 到 pre-key 的 DELETE 事件,則說明 pre-key 已經釋放,自己將持有鎖。

實現流程如下:

  1. 建立連接

    客戶端連接 etcd,以 /etcd/lock 為前綴創建全局唯一的 key,假設第一個客戶端對應的 key="/etcd/lock/UUID1",第二個為 key="/etcd/lock/UUID2";客戶端分別為自己的 key 創建租約 - Lease,租約的長度根據業務耗時確定。

  2. 創建定時任務作為租約的“心跳”
    當一個客戶端持有鎖期間,其它客戶端只能等待,為了避免等待期間租約失效,客戶端需創建一個定時任務作為“心跳”進行續約。此外,如果持有鎖期間客戶端崩潰,心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。

  3. 客戶端將自己全局唯一的 key 寫入 etcd
    執行 put 操作,將步驟 1 中創建的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,
    假設兩個客戶端 put 操作返回的 Revision 分別為 1、2,客戶端需記錄 Revision 用來判斷自己是否獲得鎖。

  4. 客戶端判斷是否獲得鎖
    客戶端以前綴 /etcd/lock/ 讀取 key-Value 列表,判斷自己 key 的 Revision 是否為當前列表中最小的,如果是則認為獲得鎖;否則監聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖。

  5. 執行業務

    獲得鎖后,操作共享資源,執行業務代碼。

Demo

官方包里(github.com/coreos/etcd/clientv3/concurrency)已經實現了上述流程,我們只需要做下簡單的調用就可以實現分布式鎖了。

package main

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "os"
    "os/signal"
    "time"
)

func main() {
    c := make(chan os.Signal)
    signal.Notify(c)

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    prefix := "/lock"

    go func () {
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, prefix)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go1 get mutex failed " + err.Error())
        }
        fmt.Printf("go1 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(10) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go1 release lock\n")
    }()

    go func() {
        time.Sleep(time.Duration(2) * time.Second)
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, prefix)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go2 get mutex failed " + err.Error())
        }
        fmt.Printf("go2 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(2) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go2 release lock\n")
    }()

    <-c
}

上述代碼里起了兩個協程,分別實例化兩個session及Mutex對象,並執行加鎖和釋放鎖的操作,下面看看源碼包里是怎么進行加鎖的。

func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通過一個事務來嘗試加鎖,這個事務主要包含了4個操作: cmp、put、get、getOwner。需要注意的是,key是由pfx和Lease()組成的。

  • cmp: 比較加鎖的key的修訂版本是否是0。如果是0就代表這個鎖不存在。
  • put: 向加鎖的key中存儲一個空值,這個操作就是一個加鎖的操作,但是這把鎖是有超時時間的,超時的時間是session的默認時長。超時是為了防止鎖沒有被正常釋放導致死鎖。
  • get: get就是通過key來查詢
  • getOwner: 注意這里是用m.pfx來查詢的,並且帶了查詢參數WithFirstCreate(),它以m.pfx為前綴去查詢所有key,根據創建version正排序,取最前面的一個值,即最早的那個key ,即Revsion最小的那個。

接下來才是通過判斷來檢查是否持有鎖

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是當前的版本號,resp.Succeeded是cmp為true時值為true,否則是false。這里的判斷表明當同一個session非第一次嘗試加鎖,當前的版本號應該取這個key的最新的版本號。

然后判斷getOwner返回的最小key,如如果沒有這個key或者,或者這個key版本號和當前的版本號一致,則獲取到這個鎖。

繼續往下走:

// wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr

走到這里說明沒有獲取到鎖,那么這里等待鎖的刪除。

waitDeletes方法會監聽比當前會話版本號更低的key的刪除事件。一旦這個key刪除了,自己也就拿到鎖了。

總結

redis的實現方式會競爭同一個鎖,但在etcd里客戶端是拿自己key(鎖)的版本號和前綴里最小的版本號的key比較,如果相同則獲得鎖。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM