K8S list&watch機制


一、流程圖如下

client-go 組件

  • Reflector: 定義在 cache 包的 Reflector 類中,它監聽特定資源類型(Kind)的 Kubernetes API,在ListAndWatch方法中執行。監聽的對象可以是 Kubernetes 的內置資源類型或者是自定義資源類型。當 reflector 通過 watch API 發現新的資源實例被創建,它將通過對應的 list API 獲取到新創建的對象並在watchHandler方法中將其加入到Delta Fifo隊列中。

  • Informer: 定義在 cache 包的 base controller 中,它從Delta Fifo隊列中 pop 出對象,在processLoop方法中執行。base controller 的工作是將對象保存一遍后續獲取,並調用 controller 將對象傳給 controller。

  • Indexer: 提供對象的 indexing 方法,定義在 cache 包的 Indexer中。一個典型的 indexing 的應用場景是基於對象的 label 創建索引。Indexer 基於幾個 indexing 方法維護索引,它使用線程安全的 data store 來存儲對象和他們的key。在 cache 包的 Store 類中定義了一個名為MetaNamespaceKeyFunc的默認方法,可以為對象生成一個<namespace>/<name>形式的key。

自定義 controller 組件

  • Informer reference: 它是對 Informer 實例的引用,知道如何使用自定義資源對象。你編寫的自定義 controller 需要創建正確的 Informer。
  • Indexer reference: 它是對 Indexer 實例的引用,你編寫的自定義 controller 代碼中需要創建它,在獲取對象供后續使用時你會用到這個引用。

client-go 中的 base controller 提供了NewIndexerInformer來創建 Informer 和 Indexer。在你的代碼中,你可以直接使用 此方法,或者使用 工廠方法 創建 informer。

  • Resource Event Handlers: 一些回調方法,當 Informer 想要發送一個對象給 controller 時,會調用這些方法。典型的編寫回調方法的模式,是獲取資源對象的 key 並放入一個 work queue隊列,等待進一步的處理(Proceess item)。
  • Work queue: 在 controller 代碼中創建的隊列,用來解耦對象的傳遞和對應的處理。Resource Event Handlers 的方法就是用來接收對象並將其加入 work queue
  • Process Item: 在 controller 代碼中創建的方法,用來對work queue中的對象做對應處理,可以有一個或多個其他的方法實際做處理,這些方法一般會使用Indexer reference,或者 list 方法來獲取 key 對應的對象。

編寫自定義 controller

以 sample-controller 為例,整體流程如下:

/*
*** main.go
*/
// 創建 clientset
kubeClient, err := kubernetes.NewForConfig(cfg)		// k8s clientset, "k8s.io/client-go/kubernetes"
exampleClient, err := clientset.NewForConfig(cfg)	// sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned"

// 創建 Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)		// k8s informer, "k8s.io/client-go/informers"
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)		// sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions"

// 創建 controller,傳入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

// 運行 Informer,Start 方法為非阻塞,會運行在單獨的 goroutine 中
kubeInformerFactory.Start(stopCh)	
exampleInformerFactory.Start(stopCh)

// 運行 controller
controller.Run(2, stopCh)

/*
*** controller.go 
*/
NewController() *Controller {}
	// 將 CRD 資源類型定義加入到 Kubernetes 的 Scheme 中,以便 Events 可以記錄 CRD 的事件
	utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))

	// 創建 Broadcaster
	eventBroadcaster := record.NewBroadcaster()
	// ... ...

	// 監聽 CRD 類型'Foo'並注冊 ResourceEventHandler 方法,當'Foo'的實例變化時進行處理
	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueFoo,
		UpdateFunc: func(old, new interface{}) {
			controller.enqueueFoo(new)
		},
	})

	// 監聽 Deployment 變化並注冊 ResourceEventHandler 方法,
	// 當它的 ownerReferences 為 Foo 類型實例時,將該 Foo 資源加入 work queue
	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.handleObject,
		UpdateFunc: func(old, new interface{}) {
			newDepl := new.(*appsv1.Deployment)
			oldDepl := old.(*appsv1.Deployment)
			if newDepl.ResourceVersion == oldDepl.ResourceVersion {
				return
			}
			controller.handleObject(new)
		},
		DeleteFunc: controller.handleObject,
	})

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {}
	// 在啟動 worker 前等待緩存同步
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}
	// 運行兩個 worker 來處理資源
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
	// 無限循環,不斷的調用 processNextWorkItem 處理下一個對象
	func (c *Controller) runWorker() {
		for c.processNextWorkItem() {
		}
	}
	// 從workqueue中獲取下一個對象並進行處理,通過調用 syncHandler
	func (c *Controller) processNextWorkItem() bool {
		obj, shutdown := c.workqueue.Get()
		if shutdown {
			return false
		}
		err := func(obj interface{}) error {
			// 調用 workqueue.Done(obj) 方法告訴 workqueue 當前項已經處理完畢,
			// 如果我們不想讓當前項重新入隊,一定要調用 workqueue.Forget(obj)。
			// 當我們沒有調用Forget時,當前項會重新入隊 workqueue 並在一段時間后重新被獲取。
			defer c.workqueue.Done(obj)
			var key string
			var ok bool
			// 我們期望的是 key 'namespace/name' 格式的 string
			if key, ok = obj.(string); !ok {
				// 無效的項調用Forget方法,避免重新入隊。
				c.workqueue.Forget(obj)
				utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
				return nil
			}
			if err := c.syncHandler(key); err != nil {
				// 放回workqueue避免偶發的異常
				c.workqueue.AddRateLimited(key)
				return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
			}
			// 如果沒有異常,Forget當前項,同步成功
			c.workqueue.Forget(obj)
			klog.Infof("Successfully synced '%s'", key)
			return nil
		}(obj)
		if err != nil {
			utilruntime.HandleError(err)
			return true
		}

		return true
	}
	// 對比真實的狀態和期望的狀態並嘗試合並,然后更新Foo類型實例的狀態信息
	func (c *Controller) syncHandler(key string) error {
		// 通過 workqueue 中的 key 解析出 namespace 和 name
		namespace, name, err := cache.SplitMetaNamespaceKey(key)
		// 調用 lister 接口通過 namespace 和 name 獲取 Foo 實例
		foo, err := c.foosLister.Foos(namespace).Get(name)
		deploymentName := foo.Spec.DeploymentName
		// 獲取 Foo 實例中定義的 deploymentname
		deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
		// 沒有發現對應的 deployment,新建一個
		if errors.IsNotFound(err) {
			deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
		}
		// OwnerReferences 不是 Foo 實例,warning並返回錯誤
		if !metav1.IsControlledBy(deployment, foo) {
			msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
			c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
			return fmt.Errorf(msg)
		}
		// deployment 中 的配置和 Foo 實例中 Spec 的配置不一致,即更新 deployment
		if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
			deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
		}
		// 更新 Foo 實例狀態
		err = c.updateFooStatus(foo, deployment)
		c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM