ETCD分布式鎖實現選主機制(Golang實現)


ETCD分布式鎖實現選主機制(Golang)

為什么要寫這篇文章

做架構的時候,涉及到系統的一個功能,有一個服務必須在指定的節點執行,並且需要有個節點來做任務分發,想了半天,那就搞個主節點做這事唄,所以就有了這篇文章的誕生,我把踩的坑和收獲記錄下來,方便未來查看和各位兄弟們參考。

選主機制是什么

舉個例子,分布式系統內,好幾台機器,總得分個三六九等,發號施令的時候總得有個帶頭大哥站出來,告訴其他小弟我們今天要干嘛干嘛之類的,這個大哥就是master節點,master節點一般都是做信息處理分發,或者重要服務運行之類的。所以,選主機制就是,選一個master出來,這個master可用,並且可以順利發消息給其他小弟,其他小弟也認為你是master,就可以了。

ETCD的分布式鎖是什么

首先認為一點,它是唯一的全局的,一個key值
為什么一定要強調這個唯一和全局呢,因為分布式鎖就是指定只能讓一個客戶端訪問這個key值,其他的沒法訪問,這樣才能保證它的唯一性。
再一個,認為分布式鎖是一個限時的會過期的的key值
你創建了一個key,要保證訪問它的客戶端時刻online,類似一個“心跳”的機制,如果持有鎖的客戶端崩潰了,那么key值在過期后會被刪除,其他的客戶端也可以繼續搶key,繼續接力,實現高可用。

選主機制怎么設計

其實主要的邏輯前面都說清楚了,我在這里敘述下我該怎么做。
我們假設有三個節點,node1,node2,node3

  1. 三個節點都去創建一個全局的唯一key /dev/lock
  2. 誰先創建成功誰就是master主節點
  3. 其他節點持續待命繼續獲取,主節點繼續續租key值(key值會過期)
  4. 持有key的節點down機,key值過期被刪,其他節點創key成功,繼續接力。

ETCD分布式鎖簡單實現

看一下ETCD的golang代碼,還是給出了如何去實現一個分布式鎖,這個比較簡單,我先寫一個簡單的Demo說下幾個接口的功能

  1. 創建鎖
kv = clientv3.NewKV(client)
txn = kv.Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision("/dev/lock"),"=",0)).Then(
clientv3.OpPut("/dev/lock","占用",clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/dev/lock"))
txnResponse,err = txn.Commit()
if err !=nil{
    fmt.Println(err)
    return
    }
  1. 判斷是否搶到鎖
if txnResponse.Succeeded {
    fmt.Println("搶到鎖了")
    } else {
        fmt.Println("沒搶到鎖",txnResponse.Responses[0].GetResponseRange().Kvs[0].Value)
        }
  1. 續租邏輯
for {
    select {
        case leaseKeepAliveResponse = <-leaseKeepAliveChan:
        if leaseKeepAliveResponse != nil{
            fmt.Println("續租成功,leaseID :",leaseKeepAliveResponse.ID)
            }else {
                fmt.Println("續租失敗")
                }
            }
            time.Sleep(time.Second*1)
        }

我的實現邏輯

首先我的邏輯就是,大家一起搶,誰搶到誰就一直續,要是不續了就另外的老哥上,能者居之嘛!我上一下我的實現代碼

package main

import (
	"fmt"
	"context"
	"time"
	//"reflect"

	"go.etcd.io/etcd/clientv3"
)

var (
	lease                  clientv3.Lease
	ctx                    context.Context
	cancelFunc             context.CancelFunc
	leaseId                clientv3.LeaseID
	leaseGrantResponse     *clientv3.LeaseGrantResponse
	leaseKeepAliveChan     <-chan *clientv3.LeaseKeepAliveResponse
	leaseKeepAliveResponse *clientv3.LeaseKeepAliveResponse
	txn                    clientv3.Txn
	txnResponse            *clientv3.TxnResponse
	kv                     clientv3.KV
)

type ETCD struct {
	client                 *clientv3.Client
	cfg                    clientv3.Config
	err                    error
}

// 創建ETCD連接服務
func New(endpoints ...string) (*ETCD, error) {
	cfg := clientv3.Config{
		Endpoints: endpoints,
		DialTimeout: time.Second * 5,
	}

	client, err := clientv3.New(cfg)
	if err != nil {
		fmt.Println("連接ETCD失敗")
		return nil, err
	}

	etcd := &ETCD{
		cfg: cfg,
		client: client,
	}

	fmt.Println("連接ETCD成功")
	return etcd, nil
}

// 搶鎖邏輯
func (etcd *ETCD) Newleases_lock(ip string) (error) {
	lease := clientv3.NewLease(etcd.client)
	leaseGrantResponse, err := lease.Grant(context.TODO(), 5)
	if err != nil {
		fmt.Println(err)
		return err
	}
	leaseId := leaseGrantResponse.ID
	ctx, cancelFunc := context.WithCancel(context.TODO())
	defer cancelFunc()
	defer lease.Revoke(context.TODO(), leaseId)
	leaseKeepAliveChan, err := lease.KeepAlive(ctx, leaseId)
	if err != nil {
		fmt.Println(err)
		return err
	}

    // 初始化鎖
	kv := clientv3.NewKV(etcd.client)
	txn := kv.Txn(context.TODO())
	txn.If(clientv3.Compare(clientv3.CreateRevision("/dev/lock"), "=", 0)).Then(
		clientv3.OpPut("/dev/lock", ip, clientv3.WithLease(leaseId))).Else(
		clientv3.OpGet("/dev/lock"))
	txnResponse, err := txn.Commit()
	if err != nil {
		fmt.Println(err)
		return err
    }
    // 判斷是否搶鎖成功
	if txnResponse.Succeeded {
		fmt.Println("搶到鎖了")
        fmt.Println("選定主節點", ip)
            // 續租節點
			for {
				select {
				case leaseKeepAliveResponse = <-leaseKeepAliveChan:
					if leaseKeepAliveResponse != nil {
						fmt.Println("續租成功,leaseID :", leaseKeepAliveResponse.ID)
					} else {
						fmt.Println("續租失敗")
					}
	
				}
			}
	} else {
        // 繼續回頭去搶,不停請求
		fmt.Println("沒搶到鎖", txnResponse.Responses[0].GetResponseRange().Kvs[0].Value)
		fmt.Println("繼續搶")
		time.Sleep(time.Second * 1)
	}
	return nil
}

func main(){
    // 連接ETCD
	etcd, err := New("xxxxxxxx:2379")
	if err != nil {
		fmt.Println(err)
    }
    // 設定無限循環
	for {
		etcd.Newleases_lock("node1")
	}
}

總結

相關代碼寫入到github當中,其中的地址是
https://github.com/Alexanderklau/Go_poject/tree/master/Go-Etcd/lock_work
實現這個功能廢了不少功夫,好久沒寫go了,自己太菜了,如果有老哥發現問題請聯系我,好改正。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM