利用Kubernetes中的leaderelection實現組件高可用


在Kubernetes中,通常kube-schduler和kube-controller-manager都是多副本進行部署的來保證高可用,而真正在工作的實例其實只有一個。這里就利用到 leaderelection 的選主機制,保證leader是處於工作狀態,並且在leader掛掉之后,從其他節點選取新的leader保證組件正常工作。

不單單只是k8s中的這兩個組件用到,在其他服務中也可以看到這個包的使用,比如cluster-autoscaler等都能看得到這個包的,今天就來看看這個包的使用以及它內部是如何實現的。

使用

以下是一個簡單使用的例子,編譯完成之后同時啟動多個進程,但是只有一個進程在工作,當把leader進程kill掉之后,會重新選舉出一個leader進行工作,即執行其中的 run 方法:

/*
例子來源於client-go中的example包中
*/

package main

import (
	"context"
	"flag"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/klog"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
	if kubeconfig != "" {
		cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
		if err != nil {
			return nil, err
		}
		return cfg, nil
	}

	cfg, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}
	return cfg, nil
}

func main() {
	klog.InitFlags(nil)

	var kubeconfig string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
	flag.Parse()

	if leaseLockName == "" {
		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
	}
	if leaseLockNamespace == "" {
		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
	}

	// leader election uses the Kubernetes API by writing to a
	// lock object, which can be a LeaseLock object (preferred),
	// a ConfigMap, or an Endpoints (deprecated) object.
	// Conflicting writes are detected and each client handles those actions
	// independently.
	config, err := buildConfig(kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}
	client := clientset.NewForConfigOrDie(config)

	run := func(ctx context.Context) {
		// complete your controller loop here
		klog.Info("Controller loop...")

		select {}
	}

	// use a Go context so we can tell the leaderelection code when we
	// want to step down
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// listen for interrupts or the Linux SIGTERM signal and cancel
	// our context, which the leader election code will observe and
	// step down
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	// we use the Lease lock type since edits to Leases are less common
	// and fewer objects in the cluster watch "all Leases".
    // 指定鎖的資源對象,這里使用了Lease資源,還支持configmap,endpoint,或者multilock(即多種配合使用)
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// start the leader election code loop
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock: lock,
		// IMPORTANT: you MUST ensure that any code you have that
		// is protected by the lease must terminate **before**
		// you call cancel. Otherwise, you could have a background
		// loop still running and another process could
		// get elected before your background loop finished, violating
		// the stated goal of the lease.
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second,//租約時間
		RenewDeadline:   15 * time.Second,//更新租約的
		RetryPeriod:     5 * time.Second,//非leader節點重試時間
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
                //變為leader執行的業務代碼
				// we're notified when we start - this is where you would
				// usually put your code
				run(ctx)
			},
			OnStoppedLeading: func() {
                 // 進程退出
				// we can do cleanup here
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
                //當產生新的leader后執行的方法
				// we're notified when new leader elected
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

關鍵啟動參數說明:

kubeconfig: 指定kubeconfig文件地址
lease-lock-name:指定lock的名稱
lease-lock-namespace:指定lock存儲的namespace
id: 例子中提供的區別參數,用於區分實例
logtostderr:klog提供的參數,指定log輸出到控制台
v: 指定日志輸出級別

同時啟動兩個進程:
啟動進程1

go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
I0215 19:56:37.049658   48045 leaderelection.go:242] attempting to acquire leader lease  default/example...
I0215 19:56:37.080368   48045 leaderelection.go:252] successfully acquired lease default/example
I0215 19:56:37.080437   48045 main.go:87] Controller loop...

啟動進程2:

➜  leaderelection git:(master) ✗ go run main.go -kubeconfig=/Users/silenceper/.kube/config -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
I0215 19:57:35.870051   48791 leaderelection.go:242] attempting to acquire leader lease  default/example...
I0215 19:57:35.894735   48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:35.894769   48791 leaderelection.go:247] failed to acquire lease default/example
I0215 19:57:35.894790   48791 main.go:151] new leader elected: 1
I0215 19:57:44.532991   48791 leaderelection.go:352] lock is held by 1 and has not yet expired
I0215 19:57:44.533028   48791 leaderelection.go:247] failed to acquire lease default/example

這里可以看出來id=1的進程持有鎖,並且運行的程序,而id=2的進程表示無法獲取到鎖,在不斷的進程嘗試。

現在kill掉id=1進程,在等待lock釋放之后(有個LeaseDuration時間),leader變為id=2的進程執行工作

I0215 20:01:41.489300   48791 leaderelection.go:252] successfully acquired lease default/example
I0215 20:01:41.489577   48791 main.go:87] Controller loop...

深入理解

基本原理其實就是利用通過Kubernetes中 configmap , endpoints 或者 lease 資源實現一個分布式鎖,搶(acqure)到鎖的節點成為leader,並且定期更新(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次循環。當leader節點掛掉之后,租約到期,其他節點就成為新的leader。

入口

通過 leaderelection.RunOrDie 啟動,

func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
	le, err := NewLeaderElector(lec)
	if err != nil {
		panic(err)
	}
	if lec.WatchDog != nil {
		lec.WatchDog.SetLeaderElection(le)
	}
	le.Run(ctx)
}

傳入參數 LeaderElectionConfig :

type LeaderElectionConfig struct {
	// Lock 的類型
	Lock rl.Interface
	//持有鎖的時間
	LeaseDuration time.Duration
	//在更新租約的超時時間
	RenewDeadline time.Duration
    //競爭獲取鎖的時間
	RetryPeriod time.Duration
    //狀態變化時執行的函數,支持三種:
    //1、OnStartedLeading 啟動是執行的業務代碼
    //2、OnStoppedLeading leader停止執行的方法
    //3、OnNewLeader 當產生新的leader后執行的方法
	Callbacks LeaderCallbacks

    //進行監控檢查
	// WatchDog is the associated health checker
	// WatchDog may be null if its not needed/configured.
	WatchDog *HealthzAdaptor
    //leader退出時,是否執行release方法
	ReleaseOnCancel bool
    
	// Name is the name of the resource lock for debugging
	Name string
}

LeaderElectionConfig.lock 支持保存在以下三種資源中:
configmap 
endpoint 
lease 
包中還提供了一個 multilock ,即可以進行選擇兩種,當其中一種保存失敗時,選擇第二張
可以在interface.go中看到:

	switch lockType {
	case EndpointsResourceLock://保存在endpoints
		return endpointsLock, nil
	case ConfigMapsResourceLock://保存在configmaps
		return configmapLock, nil
	case LeasesResourceLock://保存在leases
		return leaseLock, nil
	case EndpointsLeasesResourceLock://優先嘗試保存在endpoint失敗時保存在lease
		return &MultiLock{
			Primary:   endpointsLock,
			Secondary: leaseLock,
		}, nil
	case ConfigMapsLeasesResourceLock://優先嘗試保存在configmap,失敗時保存在lease
		return &MultiLock{
			Primary:   configmapLock,
			Secondary: leaseLock,
		}, nil
	default:
		return nil, fmt.Errorf("Invalid lock-type %s", lockType)
	}

以lease資源對象為例,可以在查看到保存的內容:

➜  ~ kubectl get lease example -n default -o yaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2020-02-15T11:56:37Z"
  name: example
  namespace: default
  resourceVersion: "210675"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example
  uid: a3470a06-6fc3-42dc-8242-9d6cebdf5315
spec:
  acquireTime: "2020-02-15T12:01:41.476971Z"//獲得鎖時間
  holderIdentity: "2"//持有鎖進程的標識
  leaseDurationSeconds: 60//lease租約
  leaseTransitions: 1//leader更換次數
  renewTime: "2020-02-15T12:05:37.134655Z"//更新租約的時間

關注其spec中的字段,分別進行標注,對應結構體如下:

type LeaderElectionRecord struct {
	HolderIdentity       string      `json:"holderIdentity"`//持有鎖進程的標識,一般可以利用主機名
	LeaseDurationSeconds int         `json:"leaseDurationSeconds"`//  lock的租約
	AcquireTime          metav1.Time `json:"acquireTime"`//持有鎖的時間
	RenewTime            metav1.Time `json:"renewTime"`//更新時間
	LeaderTransitions    int         `json:"leaderTransitions"`//leader更換的次數
}

獲取的鎖以及更新鎖

Run方法中包含了獲取鎖以及更新鎖的入口

// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
	defer func() {
        //進行退出執行
		runtime.HandleCrash()
        //停止時執行回調方法
		le.config.Callbacks.OnStoppedLeading()
	}()
    //不斷的進行獲得鎖,如果獲得鎖成功則執行后面的方法,否則不斷的進行重試
	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
    //獲取鎖成功,當前進程變為leader,執行回調函數中的業務代碼
	go le.config.Callbacks.OnStartedLeading(ctx)
    //不斷的循環進行進行租約的更新,保證鎖一直被當前進行持有
	le.renew(ctx)
}

le.acquirele.renew 內部都是調用了 le.tryAcquireOrRenew 函數,只是對於返回結果的處理不一樣。

le.acquire 對於 le.tryAcquireOrRenew 返回成功則退出,失敗則繼續。

le.renew 則相反,成功則繼續,失敗則退出。

我們來看看 tryAcquireOrRenew 方法:

func (le *LeaderElector) tryAcquireOrRenew() bool {
	now := metav1.Now()
    //鎖資源對象內容
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),//唯一標識
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. obtain or create the ElectionRecord
    // 第一步:從k8s資源中獲取原有的鎖
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
	if err != nil {
		if !errors.IsNotFound(err) {
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
        //資源對象不存在,進行鎖資源創建
		if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}
		le.observedRecord = leaderElectionRecord
		le.observedTime = le.clock.Now()
		return true
	}

	// 2. Record obtained, check the Identity & Time
    // 第二步,對比存儲在k8s中的鎖資源與上一次獲取的鎖資源是否一致
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
		le.observedRecord = *oldLeaderElectionRecord
		le.observedRawRecord = oldLeaderElectionRawRecord
		le.observedTime = le.clock.Now()
	}
    //判斷持有的鎖是否到期以及是否被自己持有
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
    //第三步:自己現在是leader,但是分兩組情況,上一次也是leader和首次變為leader
	if le.IsLeader() {
        //自己本身就是leader則不需要更新AcquireTime和LeaderTransitions
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
        //首次自己變為leader則更新leader的更換次數
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

    //更新鎖資源,這里如果在 Get 和 Update 之間有變化,將會更新失敗
	// update the lock itself
	if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}

	le.observedRecord = leaderElectionRecord
	le.observedTime = le.clock.Now()
	return true
}

在這一步如果發生並發操作怎么樣?

這里很重要一點就是利用到了k8s api操作的原子性:

le.config.Lock.Get() 中會獲取到鎖的對象,其中有一個 resourceVersion 字段用於標識一個資源對象的內部版本,每次更新操作都會更新其值。如果一個更新操作附加上了 resourceVersion 字段,那么 apiserver 就會通過驗證當前 resourceVersion 的值與指定的值是否相匹配來確保在此次更新操作周期內沒有其他的更新操作,從而保證了更新操作的原子性。

總結

leaderelection 主要是利用了k8s API操作的原子性實現了一個分布式鎖,在不斷的競爭中進行選舉。選中為leader的進行才會執行具體的業務代碼,這在k8s中非常的常見,而且我們很方便的利用這個包完成組件的編寫,從而實現組件的高可用,比如部署為一個多副本的Deployment,當leader的pod退出后會重新啟動,可能鎖就被其他pod獲取繼續執行。

完整代碼:https://github.com/go-demo/leaderelection

關注"學點程序"公眾號,了解更多干貨內容 學點程序


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM