Kubernetes基於leaderelection選舉策略實現組件高可用


1、概述

在Kubernetes中,為了實現組件高可用,同一個組件需要部署多個副本,例如多個apiserver、scheduler、controller-manager等,其中apiserver是無狀態的,每個組件都可以工作,而scheduler與controller-manager是有狀態的,同一時刻只能存在一個活躍的,需要進行選主。

Kubernetes中是通過leaderelection來實現組件的高可用的。在Kubernetes本身的組件中,kube-scheduler和kube-manager-controller兩個組件是有leader選舉的,這個選舉機制是Kubernetes對於這兩個組件的高可用保障。即正常情況下kube-scheduler或kube-manager-controller組件的多個副本只有一個是處於業務邏輯運行狀態,其它副本則不斷的嘗試去獲取鎖,去競爭leader,直到自己成為leader。如果正在運行的leader因某種原因導致當前進程退出,或者鎖丟失,則由其它副本去競爭新的leader,獲取leader繼而執行業務邏輯。

不光是Kubernetes本身組件用到了這個選舉策略,我們自己定義的服務同樣可以用這個算法去實現選主。在Kubernetes client-go包中就提供了接口供用戶使用。代碼路徑在client-go/tools/leaderelection下。

2、leaderelection使用示例

以下是一個簡單使用的例子(例子來源於client-go中的example包中),編譯完成之后同時啟動多個進程,但是只有一個進程在工作,當把leader進程kill掉之后,會重新選舉出一個leader進行工作,即執行其中的 run 方法:

//代碼路徑:client-go/examples/leader-election/main.go

package main

import (
	"context"
	"flag"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/klog/v2"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
	if kubeconfig != "" {
		cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
		if err != nil {
			return nil, err
		}
		return cfg, nil
	}

	cfg, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}
	return cfg, nil
}

func main() {
	klog.InitFlags(nil)

	var kubeconfig string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
	flag.Parse()

	if leaseLockName == "" {
		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
	}
	if leaseLockNamespace == "" {
		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
	}

	// leader election uses the Kubernetes API by writing to a
	// lock object, which can be a LeaseLock object (preferred),
	// a ConfigMap, or an Endpoints (deprecated) object.
	// Conflicting writes are detected and each client handles those actions
	// independently.
	config, err := buildConfig(kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}

	client := clientset.NewForConfigOrDie(config)

	//業務邏輯
	run := func(ctx context.Context) {
		// complete your controller loop here
		klog.Info("Controller loop...")

		select {}
	}

	// use a Go context so we can tell the leaderelection code when we
	// want to step down
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// listen for interrupts or the Linux SIGTERM signal and cancel
	// our context, which the leader election code will observe and
	// step down
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	// we use the Lease lock type since edits to Leases are less common
	// and fewer objects in the cluster watch "all Leases".
	// 指定鎖的資源對象,這里使用了Lease資源,還支持configmap,endpoint,或者multilock(即多種配合使用)
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// 進行選舉
	// start the leader election code loop
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock: lock,
		// IMPORTANT: you MUST ensure that any code you have that
		// is protected by the lease must terminate **before**
		// you call cancel. Otherwise, you could have a background
		// loop still running and another process could
		// get elected before your background loop finished, violating
		// the stated goal of the lease.
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second, //租約時長,非主候選者用來判斷資源鎖是否過期
		RenewDeadline:   15 * time.Second, //leader刷新資源鎖超時時間
		RetryPeriod:     5 * time.Second,  //調用資源鎖間隔
		//回調函數,根據選舉不同事件觸發
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				//變為leader執行的業務代碼
				// we're notified when we start - this is where you would
				// usually put your code
				run(ctx)
			},
			OnStoppedLeading: func() {
				// 進程退出
				// we can do cleanup here
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
				//當產生新的leader后執行的方法
				// we're notified when new leader elected
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

關鍵啟動參數說明:  

kubeconfig: 指定kubeconfig文件地址
lease-lock-name:指定lock的名稱
lease-lock-namespace:指定lock的namespace
id: 例子中提供的區別參數,用於區分實例
logtostderr:klog提供的參數,指定log輸出到控制台
v: 指定日志輸出級別

2.1 同時啟動三個進程:

啟動進程1:  

go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4

輸出:

apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
I0126 13:46:59.753974   35080 leaderelection.go:243] attempting to acquire leader lease default/example...
I0126 13:47:00.660260   35080 leaderelection.go:253] successfully acquired lease default/example
I0126 13:47:00.660368   35080 main.go:75] Controller loop...

這里可以看出來id=1的進程持有鎖,並且運行的程序。

啟動進程2:

go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4

輸出:  

apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
I0126 13:47:05.066516   35096 leaderelection.go:243] attempting to acquire leader lease default/example...
I0126 13:47:05.451887   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:47:05.451909   35096 leaderelection.go:248] failed to acquire lease default/example
I0126 13:47:05.451918   35096 main.go:145] new leader elected: 1
I0126 13:47:14.188160   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:47:14.188188   35096 leaderelection.go:248] failed to acquire lease default/example
I0126 13:47:24.929607   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:47:24.929636   35096 leaderelection.go:248] failed to acquire lease default/example
.......

這里可以看出來id=1的進程持有鎖,並且運行的程序,而id=2的進程表示無法獲取到鎖,在不斷的進行嘗試。

啟動進程3:  

go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3 -v=4

輸出: 

apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3 -v=4
I0126 13:47:12.431518   35112 leaderelection.go:243] attempting to acquire leader lease default/example...
I0126 13:47:12.776614   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:47:12.776649   35112 leaderelection.go:248] failed to acquire lease default/example
I0126 13:47:12.776663   35112 main.go:145] new leader elected: 1
I0126 13:47:21.499295   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:47:21.499325   35112 leaderelection.go:248] failed to acquire lease default/example
I0126 13:47:32.241544   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:47:32.241572   35112 leaderelection.go:248] failed to acquire lease default/example
.......

這里可以看出來id=1的進程持有鎖,並且運行的程序,而id=3的進程表示無法獲取到鎖,在不斷的進行嘗試。

2.2 停掉進程1並觀察進程2和進程3競爭新的leader

apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
I0126 13:46:59.753974   35080 leaderelection.go:243] attempting to acquire leader lease default/example...
I0126 13:47:00.660260   35080 leaderelection.go:253] successfully acquired lease default/example
I0126 13:47:00.660368   35080 main.go:75] Controller loop...
^CI0126 13:53:16.629114   35080 main.go:92] Received termination, signaling shutdown
I0126 13:53:17.057999   35080 main.go:135] leader lost: 1

現在kill掉id=1進程,在等待lock釋放之后(有個LeaseDuration時間),觀察進程2和進程3的輸出,看哪個進程成為新的leader。   

id=2的進程輸出:

......
I0126 13:53:11.208487   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:53:11.208512   35096 leaderelection.go:248] failed to acquire lease default/example
I0126 13:53:18.189514   35096 leaderelection.go:253] successfully acquired lease default/example
I0126 13:53:18.189587   35096 main.go:75] Controller loop...

這里可以看出來id=2的進程持有鎖,並且運行的程序。

id=3的進程輸出:

......
I0126 13:53:04.675216   35112 leaderelection.go:248] failed to acquire lease default/example
I0126 13:53:12.918706   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
I0126 13:53:12.918736   35112 leaderelection.go:248] failed to acquire lease default/example
I0126 13:53:19.544314   35112 leaderelection.go:346] lock is held by 2 and has not yet expired
I0126 13:53:19.544372   35112 leaderelection.go:248] failed to acquire lease default/example
I0126 13:53:19.544387   35112 main.go:145] new leader elected: 2
I0126 13:53:26.346561   35112 leaderelection.go:346] lock is held by 2 and has not yet expired
I0126 13:53:26.346591   35112 leaderelection.go:248] failed to acquire lease default/example
......

這里可以看出來id=2的進程持有鎖,並且運行的程序,而id=3的進程表示無法獲取到鎖,在不斷的進行嘗試。

2.3 查看資源鎖對象 

[root@master1 ~]# kubectl get leases.coordination.k8s.io example -o yaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2022-01-26T05:46:38Z"
  managedFields:
  .......
    manager: main
    operation: Update
    time: "2022-01-26T06:05:43Z"
  name: example
  namespace: default
  resourceVersion: "314956587"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example
  uid: 5ce63489-c754-42b4-9e6c-a0a0a8442c3f
spec:
  acquireTime: "2022-01-26T05:53:17.905076Z" //獲得鎖時間
  holderIdentity: "2" //持有鎖進程的標識
  leaseDurationSeconds: 60 //lease租約時長
  leaseTransitions: 1 //leader更換次數
  renewTime: "2022-01-26T06:06:06.248393Z" //更新租約的時間

鎖已經被進程2獲得, 此時如果進程1再啟動的話, 也只能一直嘗試獲取鎖。

3、leaderelection源碼分析

leaderelection基本原理其實就是利用通過Kubernetes中lease、configmap 、endpoints資源實現一個分布式鎖,獲取到鎖的進程成為leader,並且定期更新租約(renew)。其他進程也在不斷的嘗試進行搶占,搶占不到則繼續等待下次循環。當leader節點掛掉之后,租約到期,其他節點就成為新的leader。

代碼路徑在client-go/tools/leaderelection下.邏輯結構如下圖:

注意: 請注意client-go的版本,不同版本對應LeaderElection的邏輯架構圖也略微有所不同。

3.1、Interface接口

Interface: 中定義了一系列方法, 包括增加、修改、獲取一個LeaderElectionRecord, 說白了就是一個客戶端, 而且每個客戶端實例都要有自己分布式唯一的id

// tools/leaderelection/resourcelock/interface.go
 
// 資源占有者的描述信息
type LeaderElectionRecord struct {
    // 持有鎖進程的標識 也就是leader的id
    HolderIdentity       string      `json:"holderIdentity"`
    // 一個租約多長時間
    LeaseDurationSeconds int         `json:"leaseDurationSeconds"`
    // 獲得leader的時間
    AcquireTime          metav1.Time `json:"acquireTime"`
    // 續約的時間
    RenewTime            metav1.Time `json:"renewTime"`
    // leader變更的次數
    LeaderTransitions    int         `json:"leaderTransitions"`
}
 
type Interface interface {
    // 返回當前資源LeaderElectionRecord 
    Get() (*LeaderElectionRecord, error)
    // 創建一個資源LeaderElectionRecord
    Create(ler LeaderElectionRecord) error
    // 更新資源
    Update(ler LeaderElectionRecord) error
    // 記錄事件
    RecordEvent(string)
    // 返回當前該應用的id
    Identity() string
    // 描述信息(namespace/name)
    Describe() string
}

Interface有四個實現類, 分別為EndpointLockConfigMapLock、LeaseLock和MultiLock(一般不用),分別可以操作Kubernetes中的endpointconfigmaplease。這里以LeaseLock為例子說明。

// tools/leaderelection/resourcelock/leaselock.go

type LeaseLock struct {
	// LeaseMeta should contain a Name and a Namespace of a
	// LeaseMeta object that the LeaderElector will attempt to lead.
	LeaseMeta  metav1.ObjectMeta
	// 訪問api-server的客戶端
	Client     coordinationv1client.LeasesGetter
	// 該LeaseLock的分布式唯一身份id
	LockConfig ResourceLockConfig
	// 資源鎖對應的lease資源對象
	lease      *coordinationv1.Lease
}

// tools/leaderelection/resourcelock/interface.go
type ResourceLockConfig struct {
    // 分布式唯一id
    Identity string
    EventRecorder EventRecorder
}

LeaseLock類型對應函數詳解:CreateUpdateGet方法都是利用client去訪問kubernetesapi-server。

// tools/leaderelection/resourcelock/leaselock.go

// 通過訪問apiserver獲取當前資源鎖對象ll.lease,並組織返回對應的LeaderElectionRecord對象和LeaderElectionRecord序列化值
// Get returns the election record from a Lease spec
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
	var err error
	// 獲取資源鎖對應的資源對象ll.lease
	ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
	if err != nil {
		return nil, nil, err
	}
	// 利用lease資源對象spec生成對應LeaderElectionRecord資源對象
	record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
	// 序列化LeaderElectionRecord資源對象(byte[])
	recordByte, err := json.Marshal(*record)
	if err != nil {
		return nil, nil, err
	}
	return record, recordByte, nil
}

// 根據LeaderElectionRecord創建對應資源鎖對象 ll.lease
// Create attempts to create a Lease
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
	var err error
	ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
		ObjectMeta: metav1.ObjectMeta{
			Name:      ll.LeaseMeta.Name,
			Namespace: ll.LeaseMeta.Namespace,
		},
		// 利用ElectionRecord資源對象生成對應lease資源對象spec
		Spec: LeaderElectionRecordToLeaseSpec(&ler),
	}, metav1.CreateOptions{})
	return err
}

// Update will update an existing Lease spec.
func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
	if ll.lease == nil {
		return errors.New("lease not initialized, call get or create first")
	}
	// 利用ElectionRecord資源對象生成對應lease資源對象spec
	ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)

	lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
	if err != nil {
		return err
	}

	ll.lease = lease
	return nil
}

// RecordEvent in leader election while adding meta-data
func (ll *LeaseLock) RecordEvent(s string) {
	if ll.LockConfig.EventRecorder == nil {
		return
	}
	events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
	ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events)
}

// Describe is used to convert details on current resource lock
// into a string
func (ll *LeaseLock) Describe() string {
	return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name)
}

// Identity returns the Identity of the lock
func (ll *LeaseLock) Identity() string {
	return ll.LockConfig.Identity
}


// 利用lease資源對象spec生成對應LeaderElectionRecord資源對象
func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
	var r LeaderElectionRecord
	if spec.HolderIdentity != nil {
		r.HolderIdentity = *spec.HolderIdentity
	}
	if spec.LeaseDurationSeconds != nil {
		r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
	}
	if spec.LeaseTransitions != nil {
		r.LeaderTransitions = int(*spec.LeaseTransitions)
	}
	if spec.AcquireTime != nil {
		r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
	}
	if spec.RenewTime != nil {
		r.RenewTime = metav1.Time{spec.RenewTime.Time}
	}
	return &r

}

// 利用ElectionRecord資源對象生成對應lease資源對象spec
func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec {
	leaseDurationSeconds := int32(ler.LeaseDurationSeconds)
	leaseTransitions := int32(ler.LeaderTransitions)
	return coordinationv1.LeaseSpec{
		HolderIdentity:       &ler.HolderIdentity,
		LeaseDurationSeconds: &leaseDurationSeconds,
		AcquireTime:          &metav1.MicroTime{ler.AcquireTime.Time},
		RenewTime:            &metav1.MicroTime{ler.RenewTime.Time},
		LeaseTransitions:     &leaseTransitions,
	}
}

3.2  LeaderElector

LeaderElectionConfig:

定義了一些競爭資源的參數,用於保存當前應用的一些配置,包括資源鎖、持有鎖的時間等,LeaderElectionConfig.lock 支持保存在以下三種資源中:

  • configmap
  • endpoint
  • lease

包中還提供了一個 multilock ,即可以進行選擇兩種,當其中一種保存失敗時,選擇第二種。

//client-go/tools/leaderelection/leaderelection.go

type LeaderElectionConfig struct {
	// Lock 的類型
	Lock rl.Interface
	//持有鎖的時間
	LeaseDuration time.Duration
	//在更新租約的超時時間
	RenewDeadline time.Duration
	//競爭獲取鎖的時間
	RetryPeriod time.Duration
	//需要用戶配置的狀態變化時執行的函數,支持三種:
	//1、OnStartedLeading 啟動是執行的業務代碼
	//2、OnStoppedLeading leader停止執行的方法
	//3、OnNewLeader 當產生新的leader后執行的方法
	Callbacks LeaderCallbacks

	//進行監控檢查
	// WatchDog is the associated health checker
	// WatchDog may be null if its not needed/configured.
	WatchDog *HealthzAdaptor
	//leader退出時,是否執行release方法
	ReleaseOnCancel bool

	// Name is the name of the resource lock for debugging
	Name string
}

LeaderElector:

是一個競爭資源的實體。 

//client-go/tools/leaderelection/leaderelection.go
// LeaderElector is a leader election client.
type LeaderElector struct {
	// 用於保存當前應用的一些配置
	config LeaderElectionConfig
	// 通過apiserver遠程獲取的資源鎖對象 (不一定自己是leader) 所有想競爭此資源的應用獲取的是同一份
	// internal bookkeeping
	observedRecord    rl.LeaderElectionRecord
	//資源鎖對象spec,用於和遠程獲取的資源鎖對象值比較
	observedRawRecord []byte
	// 獲取的時間
	observedTime      time.Time
	// used to implement OnNewLeader(), may lag slightly from the
	// value observedRecord.HolderIdentity if the transition has
	// not yet been reported.
	reportedLeader string

	// clock is wrapper around time to allow for less flaky testing
	clock clock.Clock

	metrics leaderMetricsAdapter
}

這里着重要關注以下幾個屬性:

config: 該LeaderElectionConfig對象配置了當前應用的客戶端, 以及此客戶端的唯一id等等。
observedRecord: 該LeaderElectionRecord就是保存着從api-server中獲得的leader的信息。
observedTime: 獲得的時間。

很明顯判斷當前進程是不是leader只需要判斷config中的id和observedRecord中的id是不是一致即可.

func (le *LeaderElector) GetLeader() string {
    return le.observedRecord.HolderIdentity
}
 
// IsLeader returns true if the last observed leader was this client else returns false.
func (le *LeaderElector) IsLeader() bool {
    return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
}

3.3 LeaderElector運行邏輯

3.3.1 run

func (le *LeaderElector) Run(ctx context.Context) {
	defer func() {
		runtime.HandleCrash()
		le.config.Callbacks.OnStoppedLeading()
	}()
	// 如果獲取成功 那就是ctx signalled done
	// 不然即使失敗, 該client也會一直去嘗試獲得leader位置
	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	// 如果獲得leadership 以goroutine和回調的形式啟動用戶自己的邏輯方法OnStartedLeading
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	// 一直去續約 這里也是一個循環操作
	// 如果失去了leadership 該方法才會返回
	// 該方法返回 整個Run方法就返回了
	le.renew(ctx)
} 

1. 該client(也就是le這個實例)首先會調用acquire方法一直嘗試去競爭leadership(如果競爭失敗, 繼續競爭, 不會進入2. 競爭成功, 進入2)。
2. 異步啟動用戶自己的邏輯程序(OnStartedLeading)(進入3)。
3. 通過調用renew方法續約自己的leadership. 續約成功, 繼續續約,續約失敗, 整個Run就結束了。

3.3.2 acquire

//檢查是否需要廣播新產生的leader
func (le *LeaderElector) maybeReportTransition() {
    // 如果沒有變化 則不需要更新
    if le.observedRecord.HolderIdentity == le.reportedLeader {
        return
    }
    // 更新reportedLeader為最新的leader的id
    le.reportedLeader = le.observedRecord.HolderIdentity
    if le.config.Callbacks.OnNewLeader != nil {
        // 調用當前應用的回調函數OnNewLeader報告新的leader產生
        go le.config.Callbacks.OnNewLeader(le.reportedLeader)
    }
}
 
// 一旦獲得leadership 立馬返回true,那就是ctx signalled done
// 失敗的話,該client會一直去嘗試獲得leader位置
func (le *LeaderElector) acquire(ctx context.Context) bool {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    succeeded := false
    desc := le.config.Lock.Describe()
    klog.Infof("attempting to acquire leader lease  %v...", desc)
    wait.JitterUntil(func() {
        // 嘗試獲得或者更新資源
        succeeded = le.tryAcquireOrRenew()
        // 有可能會產生新的leader
        // 所以調用maybeReportTransition檢查是否需要廣播新產生的leader
        le.maybeReportTransition()
        if !succeeded {
            // 如果獲得leadership失敗 則返回后繼續競爭
            klog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        // 自己成為leader
        // 可以調用cancel方法退出JitterUntil進而從acquire中返回
        le.config.Lock.RecordEvent("became leader")
        le.metrics.leaderOn(le.config.Name)
        klog.Infof("successfully acquired lease %v", desc)
        cancel()
    }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
    return succeeded
}

acquire的作用如下:
1. 一旦獲得leadership,立馬返回true,否則會隔RetryPeriod時間嘗試一次。

這里的邏輯比較簡單, 主要的邏輯是在tryAcquireOrRenew方法中。

3.3.3 renew and release

// RenewDeadline=15s RetryPeriod=5s
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	// 每隔RetryPeriod會調用 除非cancel()方法被調用才會退出
	wait.Until(func() {
		timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
		defer timeoutCancel()
		// 每隔5s調用該方法直到該方法返回true為止
		// 如果超時了也會退出該方法 並且err中有錯誤信息
		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
			return le.tryAcquireOrRenew(timeoutCtx), nil
		}, timeoutCtx.Done())

		// 有可能會產生新的leader 如果有會廣播新產生的leader
		le.maybeReportTransition()
		desc := le.config.Lock.Describe()
		if err == nil {
			// 如果err == nil, 表明上面PollImmediateUntil中返回true了 續約成功 依然處於leader位置
			// 返回后 繼續運行wait.Until的邏輯
			klog.V(4).Infof("successfully renewed lease %v", desc)
			return
		}
		// err != nil 表明超時了 試的總時間超過了RenewDeadline 失去了leader位置 續約失敗
		// 調用cancel方法退出wait.Until
		le.config.Lock.RecordEvent("stopped leading")
		le.metrics.leaderOff(le.config.Name)
		klog.Infof("failed to renew lease %v: %v", desc, err)
		cancel()
	}, le.config.RetryPeriod, ctx.Done())

	// if we hold the lease, give it up
	if le.config.ReleaseOnCancel {
		le.release()
	}
}

// leader續約cancel()的時候釋放資源鎖對象holderIdentity字段的值
// release attempts to release the leader lease if we have acquired it.
func (le *LeaderElector) release() bool {
	if !le.IsLeader() {
		return true
	}
	now := metav1.Now()
	leaderElectionRecord := rl.LeaderElectionRecord{
		LeaderTransitions:    le.observedRecord.LeaderTransitions,
		LeaseDurationSeconds: 1,
		RenewTime:            now,
		AcquireTime:          now,
	}
	if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
		klog.Errorf("Failed to release lock: %v", err)
		return false
	}
	le.observedRecord = leaderElectionRecord
	le.observedTime = le.clock.Now()
	return true
}

可以看到該client的base條件是它自己是當前的leader, 然后來續約操作。

這里來說一下RenewDeadline和RetryPeriod的作用。
每隔RetryPeriod時間會通過tryAcquireOrRenew續約, 如果續約失敗, 還會進行再次嘗試. 一直到嘗試的總時間超過RenewDeadline后該client就會失去leadership。

3.3.4 tryAcquireOrRenew

// 競爭或者更新leadership
// 成功返回true 失敗返回false
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
	now := metav1.Now()
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. obtain or create the ElectionRecord
	// client通過apiserver獲得ElectionRecord和ElectionRecord序列化值
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
	if err != nil {
		if !errors.IsNotFound(err) {
			// 失敗直接退出
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
		// 因為沒有獲取到, 因此創建一個新的進去
		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}
		// 然后設置observedRecord為剛剛加入進去的leaderElectionRecord
		le.observedRecord = leaderElectionRecord
		le.observedTime = le.clock.Now()
		return true
	}

	// 2. Record obtained, check the Identity & Time
	// 從遠端獲取到record(資源)成功存到oldLeaderElectionRecord
	// 如果oldLeaderElectionRecord與observedRecord不相同 更新observedRecord
	// 因為observedRecord代表是從遠端存在Record

	// 需要注意的是每個client都在競爭leadership, 而leader一直在續約, leader會更新它的RenewTime字段
	// 所以一旦leader續約成功 每個non-leader候選者都需要更新其observedTime和observedRecord
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
		le.observedRecord = *oldLeaderElectionRecord
		le.observedRawRecord = oldLeaderElectionRawRecord
		le.observedTime = le.clock.Now()
	}
	// 如果leader已經被占有並且不是當前自己這個應用, 而且時間還沒有到期
	// 那就直接返回false, 因為已經無法搶占 時間沒有過期
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
	if le.IsLeader() {
		// 如果當前服務就是以前的占有者
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		// 如果當前服務不是以前的占有者 LeaderTransitions加1
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// update the lock itself
	// 當前client占有該資源 成為leader
	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}

	le.observedRecord = leaderElectionRecord
	le.observedTime = le.clock.Now()
	return true
} 

這里需要注意的是當前client不是leader的時候, 如何去判斷一個leader是否已經expired了?

     通過le.observedTime.Add(le.config.LeaseDuration).After(now.Time);

  • le.observedTime: 代表的是獲得leader(截止當前時間為止的最后一次renew)對象的時間;
  • le.config.LeaseDuration: 當前進程獲得leadership需要的等待時間;
  • le.observedTime.Add(le.config.LeaseDuration): 就是自己(當前進程)被允許獲得leadership的時間。

如果le.observedTime.Add(le.config.LeaseDuration).before(now.Time)為true的話, 就表明leader過期了。白話文的意思就是從leader上次續約完, 已經超過le.config.LeaseDuration的時間沒有續約了, 所以被認為該leader過期了,這時候non-leader就可以搶占leader了。

4、總結

leaderelection 主要是利用了k8s API操作的原子性實現了一個分布式鎖,在不斷的競爭中進行選舉。選中為leader的進行才會執行具體的業務代碼,這在k8s中非常的常見,而且我們很方便的利用這個包完成組件的編寫,從而實現組件的高可用,比如部署為一個多副本的Deployment,當leader的pod退出后會重新啟動,可能鎖就被其他pod獲取繼續執行。

當應用在k8s上部署時,使用k8s的資源鎖,可方便的實現高可用,但需要注意:

  • 推薦使用lease或configmap作為資源鎖,原因是某些組件(如kube-proxy)會去監聽endpoints來更新節點iptables規則,當有大量資源鎖時,勢必會對性能有影響。

參考:https://www.jianshu.com/p/6e6f1d97d635 (endpoints類型資源鎖

參考:https://tangqing.blog.csdn.net/article/details/110729620?spm=1001.2014.3001.5502

參考:https://silenceper.com/blog/202002/kubernetes-leaderelection/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM