基於etcd的分布式鎖


1. 分布式鎖的特點

鎖是在執行多線程時用於強行限制資源訪問的同步機制,在單機系統上,單機鎖就可以很好地實現臨界資源的共享。而在分布式系統場景下,實例會運行在多台機器上,為了使多進程對共享資源的讀寫同步,保證數據的最終一致性,引入了分布式鎖。
 
分布式鎖應該具備以下特點:
  • 在分布式環境下,一個資源在同一時間只能被一個機器上的一個線程獲取
  • 高可用的獲取鎖和釋放鎖
  • 高性能的獲取鎖和釋放鎖
  • 具備可重入特性
  • 具備鎖實現機制,防止死鎖
  • 具備非阻塞鎖特性,獲取不到值直接返回

 

2. 分布式鎖的實現方式

分布式主要有三種主流的實現方式:

(1)基於數據庫實現的分布式鎖:采用樂觀鎖、悲觀鎖或者基於主鍵唯一約束實現

(2)基於分布式緩存實現的分布式鎖:redis和基於redis的redlock

(3)基於分布式一致性算法實現的分布式鎖:zookeeper、etcd

 

每種分布式鎖都有其所適用的生產環境,同時特各有利弊:

  • 數據庫實現的分布式鎖性能較差,而且不支持過期,但是不會引入更多的中間件
  • 緩存實現的分布式鎖高性能,支持非阻塞,適用大並發的場景
  • etcd實現的分布式鎖具備阻塞特性,適用於服務發現和注冊、任務調度等

 

3. 基於etcd的分布式鎖實現機制

etcd 支持以下功能,正是依賴這些功能來實現分布式鎖的:

  • Lease機制:即租約機制(TTL,Time To Live),etcd可以為存儲的kv對設置租約,當租約到期,kv將失效刪除;同時也支持續約,keepalive
  • Revision機制:每個key帶有一個Revision屬性值,etcd每進行一次事務對應的全局Revision值都會+1,因此每個key對應的Revision屬性值都是全局唯一的。通過比較Revision的大小就可以知道進行寫操作的順序
  • 在實現分布式鎖時,多個程序同時搶鎖,根據Revision值大小依次獲得鎖,避免“驚群效應”,實現公平鎖
  • Prefix機制:也稱為目錄機制,可以根據前綴獲得該目錄下所有的key及其對應的屬性值
  • watch機制:watch支持watch某個固定的key或者一個前綴目錄,當watch的key發生變化,客戶端將收到通知

 

4. 基於etcd的分布式鎖的實現過程

  • 步驟 1: 准備

客戶端連接 Etcd,以 /lock/mylock 為前綴創建全局唯一的 key,假設第一個客戶端對應的 key="/lock/mylock/UUID1",第二個為 key="/lock/mylock/UUID2";客戶端分別為自己的 key 創建租約 - Lease,租約的長度根據業務耗時確定,假設為 15s;

  • 步驟 2: 創建定時任務作為租約的“心跳”

當一個客戶端持有鎖期間,其它客戶端只能等待,為了避免等待期間租約失效,客戶端需創建一個定時任務作為“心跳”進行續約。此外,如果持有鎖期間客戶端崩潰,心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。

  • 步驟 3: 客戶端將自己全局唯一的 key 寫入 Etcd

進行 put 操作,將步驟 1 中創建的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,假設兩個客戶端 put 操作返回的 Revision 分別為 1、2,客戶端需記錄 Revision 用以接下來判斷自己是否獲得鎖。

  • 步驟 4: 客戶端判斷是否獲得鎖

客戶端以前綴 /lock/mylock 讀取 keyValue 列表(keyValue 中帶有 key 對應的 Revision),判斷自己 key 的 Revision 是否為當前列表中最小的,如果是則認為獲得鎖;否則監聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖。

  • 步驟 5: 執行業務

獲得鎖后,操作共享資源,執行業務代碼。

  • 步驟 6: 釋放鎖

完成業務流程后,刪除對應的key釋放鎖。

 

5. go實現etcd分布式鎖

func main() {
    config := clientv3.Config{
        Endpoints:   []string{"xxx.xxx.xxx.xxx:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 獲取客戶端連接
    client, err := clientv3.New(config)
    if err != nil {
        fmt.Println(err)
        return
    }

    // 1. 上鎖(創建租約,自動續租,拿着租約去搶占一個key )
    // 用於申請租約
    lease := clientv3.NewLease(client)

    // 申請一個10s的租約
    leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
    if err != nil {
        fmt.Println(err)
        return
    }

    // 拿到租約的id
    leaseID := leaseGrantResp.ID

    // 准備一個用於取消續租的context
    ctx, cancelFunc := context.WithCancel(context.TODO())

    // 確保函數退出后,自動續租會停止
    defer cancelFunc()
        // 確保函數退出后,租約會失效
    defer lease.Revoke(context.TODO(), leaseID)

    // 自動續租
    keepRespChan, err := lease.KeepAlive(ctx, leaseID)
    if err != nil {
        fmt.Println(err)
        return
    }

    // 處理續租應答的協程
    go func() {
        select {
        case keepResp := <-keepRespChan:
            if keepRespChan == nil {
                fmt.Println("lease has expired")
                goto END
            } else {
                // 每秒會續租一次
                fmt.Println("收到自動續租應答", keepResp.ID)
            }
        }
    END:
    }()

    // if key 不存在,then設置它,else搶鎖失敗
    kv := clientv3.NewKV(client)
    // 創建事務
    txn := kv.Txn(context.TODO())
    // 如果key不存在
    txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job7"), "=", 0)).
        Then(clientv3.OpPut("/cron/jobs/job7", "", clientv3.WithLease(leaseID))).
        Else(clientv3.OpGet("/cron/jobs/job7")) //如果key存在

    // 提交事務
    txnResp, err := txn.Commit()
    if err != nil {
        fmt.Println(err)
        return
    }

    // 判斷是否搶到了鎖
    if !txnResp.Succeeded {
        fmt.Println("鎖被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
        return
    }

    // 2. 處理業務(鎖內,很安全)

    fmt.Println("處理任務")
    time.Sleep(5 * time.Second)

    // 3. 釋放鎖(取消自動續租,釋放租約)
    // defer會取消續租,釋放鎖
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM