ETCD分布式鎖實現選主機制(Golang)
為什么要寫這篇文章
做架構的時候,涉及到系統的一個功能,有一個服務必須在指定的節點執行,並且需要有個節點來做任務分發,想了半天,那就搞個主節點做這事唄,所以就有了這篇文章的誕生,我把踩的坑和收獲記錄下來,方便未來查看和各位兄弟們參考。
選主機制是什么
舉個例子,分布式系統內,好幾台機器,總得分個三六九等,發號施令的時候總得有個帶頭大哥站出來,告訴其他小弟我們今天要干嘛干嘛之類的,這個大哥就是master節點,master節點一般都是做信息處理分發,或者重要服務運行之類的。所以,選主機制就是,選一個master出來,這個master可用,並且可以順利發消息給其他小弟,其他小弟也認為你是master,就可以了。
ETCD的分布式鎖是什么
首先認為一點,它是唯一的,全局的,一個key值
為什么一定要強調這個唯一和全局呢,因為分布式鎖就是指定只能讓一個客戶端訪問這個key值,其他的沒法訪問,這樣才能保證它的唯一性。
再一個,認為分布式鎖是一個限時的,會過期的的key值
你創建了一個key,要保證訪問它的客戶端時刻online,類似一個“心跳”的機制,如果持有鎖的客戶端崩潰了,那么key值在過期后會被刪除,其他的客戶端也可以繼續搶key,繼續接力,實現高可用。
選主機制怎么設計
其實主要的邏輯前面都說清楚了,我在這里敘述下我該怎么做。
我們假設有三個節點,node1,node2,node3
- 三個節點都去創建一個全局的唯一key /dev/lock
- 誰先創建成功誰就是master主節點
- 其他節點持續待命繼續獲取,主節點繼續續租key值(key值會過期)
- 持有key的節點down機,key值過期被刪,其他節點創key成功,繼續接力。
ETCD分布式鎖簡單實現
看一下ETCD的golang代碼,還是給出了如何去實現一個分布式鎖,這個比較簡單,我先寫一個簡單的Demo說下幾個接口的功能
- 創建鎖
kv = clientv3.NewKV(client)
txn = kv.Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision("/dev/lock"),"=",0)).Then(
clientv3.OpPut("/dev/lock","占用",clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/dev/lock"))
txnResponse,err = txn.Commit()
if err !=nil{
fmt.Println(err)
return
}
- 判斷是否搶到鎖
if txnResponse.Succeeded {
fmt.Println("搶到鎖了")
} else {
fmt.Println("沒搶到鎖",txnResponse.Responses[0].GetResponseRange().Kvs[0].Value)
}
- 續租邏輯
for {
select {
case leaseKeepAliveResponse = <-leaseKeepAliveChan:
if leaseKeepAliveResponse != nil{
fmt.Println("續租成功,leaseID :",leaseKeepAliveResponse.ID)
}else {
fmt.Println("續租失敗")
}
}
time.Sleep(time.Second*1)
}
我的實現邏輯
首先我的邏輯就是,大家一起搶,誰搶到誰就一直續,要是不續了就另外的老哥上,能者居之嘛!我上一下我的實現代碼
package main
import (
"fmt"
"context"
"time"
//"reflect"
"go.etcd.io/etcd/clientv3"
)
var (
lease clientv3.Lease
ctx context.Context
cancelFunc context.CancelFunc
leaseId clientv3.LeaseID
leaseGrantResponse *clientv3.LeaseGrantResponse
leaseKeepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
leaseKeepAliveResponse *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
txnResponse *clientv3.TxnResponse
kv clientv3.KV
)
type ETCD struct {
client *clientv3.Client
cfg clientv3.Config
err error
}
// 創建ETCD連接服務
func New(endpoints ...string) (*ETCD, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Second * 5,
}
client, err := clientv3.New(cfg)
if err != nil {
fmt.Println("連接ETCD失敗")
return nil, err
}
etcd := &ETCD{
cfg: cfg,
client: client,
}
fmt.Println("連接ETCD成功")
return etcd, nil
}
// 搶鎖邏輯
func (etcd *ETCD) Newleases_lock(ip string) (error) {
lease := clientv3.NewLease(etcd.client)
leaseGrantResponse, err := lease.Grant(context.TODO(), 5)
if err != nil {
fmt.Println(err)
return err
}
leaseId := leaseGrantResponse.ID
ctx, cancelFunc := context.WithCancel(context.TODO())
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
leaseKeepAliveChan, err := lease.KeepAlive(ctx, leaseId)
if err != nil {
fmt.Println(err)
return err
}
// 初始化鎖
kv := clientv3.NewKV(etcd.client)
txn := kv.Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision("/dev/lock"), "=", 0)).Then(
clientv3.OpPut("/dev/lock", ip, clientv3.WithLease(leaseId))).Else(
clientv3.OpGet("/dev/lock"))
txnResponse, err := txn.Commit()
if err != nil {
fmt.Println(err)
return err
}
// 判斷是否搶鎖成功
if txnResponse.Succeeded {
fmt.Println("搶到鎖了")
fmt.Println("選定主節點", ip)
// 續租節點
for {
select {
case leaseKeepAliveResponse = <-leaseKeepAliveChan:
if leaseKeepAliveResponse != nil {
fmt.Println("續租成功,leaseID :", leaseKeepAliveResponse.ID)
} else {
fmt.Println("續租失敗")
}
}
}
} else {
// 繼續回頭去搶,不停請求
fmt.Println("沒搶到鎖", txnResponse.Responses[0].GetResponseRange().Kvs[0].Value)
fmt.Println("繼續搶")
time.Sleep(time.Second * 1)
}
return nil
}
func main(){
// 連接ETCD
etcd, err := New("xxxxxxxx:2379")
if err != nil {
fmt.Println(err)
}
// 設定無限循環
for {
etcd.Newleases_lock("node1")
}
}
總結
相關代碼寫入到github當中,其中的地址是
https://github.com/Alexanderklau/Go_poject/tree/master/Go-Etcd/lock_work
實現這個功能廢了不少功夫,好久沒寫go了,自己太菜了,如果有老哥發現問題請聯系我,好改正。