informer使用示例


一、informer機制

 

 

 

根據流程圖來解釋一下 Informer 中幾個組件的作用:

  • Reflector:稱之為反射器,實現對 apiserver 指定類型對象的監控(ListAndWatch),其中反射實現的就是把監控的結果實例化成具體的對象,最終也是調用 Kubernetes 的 List/Watch API;

  • DeltaIFIFO Queue:一個增量隊列,將 Reflector 監控變化的對象形成一個 FIFO 隊列,此處的 Delta 就是變化;

  • LocalStore:就是 informer 的 cache,這里面緩存的是 apiserver 中的對象(其中有一部分可能還在DeltaFIFO 中),此時使用者再查詢對象的時候就直接從 cache 中查找,減少了 apiserver 的壓力,LocalStore 只會被 Lister 的 List/Get 方法訪問。

  • WorkQueue:DeltaIFIFO 收到事件后會先將事件存儲在自己的數據結構中,然后直接操作 Store 中存儲的數據,更新完 store 后 DeltaIFIFO 會將該事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件會根據對應的類型觸發對應的回調函數。

2、Informer 的工作流程
    1. Informer 首先會 list/watch apiserver,Informer 所使用的 Reflector 包負責與 apiserver 建立連接,Reflector 使用 ListAndWatch 的方法,會先從 apiserver 中 list 該資源的所有實例,list 會拿到該對象最新的 resourceVersion,然后使用 watch 方法監聽該 resourceVersion 之后的所有變化,若中途出現異常,reflector 則會從斷開的 resourceVersion 處重現嘗試監聽所有變化,一旦該對象的實例有創建、刪除、更新動作,Reflector 都會收到"事件通知",這時,該事件及它對應的 API 對象這個組合,被稱為增量(Delta),它會被放進 DeltaFIFO 中。
    1. Informer 會不斷地從這個 DeltaFIFO 中讀取增量,每拿出一個對象,Informer 就會判斷這個增量的時間類型,然后創建或更新本地的緩存,也就是 store。
    1. 如果事件類型是 Added(添加對象),那么 Informer 會通過 Indexer 的庫把這個增量里的 API 對象保存到本地的緩存中,並為它創建索引,若為刪除操作,則在本地緩存中刪除該對象。
    1. DeltaFIFO 再 pop 這個事件到 controller 中,controller 會調用事先注冊的 ResourceEventHandler 回調函數進行處理。
    1. 在 ResourceEventHandler 回調函數中,其實只是做了一些很簡單的過濾,然后將關心變更的 Object 放到 workqueue 里面。
    1. Controller 從 workqueue 里面取出 Object,啟動一個 worker 來執行自己的業務邏輯,業務邏輯通常是計算目前集群的狀態和用戶希望達到的狀態有多大的區別,然后孜孜不倦地讓 apiserver 將狀態演化到用戶希望達到的狀態,比如為 deployment 創建新的 pods,或者是擴容/縮容 deployment。
    1. 在worker中就可以使用 lister 來獲取 resource,而不用頻繁的訪問 apiserver,因為 apiserver 中 resource 的變更都會反映到本地的 cache 中。

Informer 在使用時需要先初始化一個 InformerFactory,目前主要推薦使用的是 SharedInformerFactory,Shared 指的是在多個 Informer 中共享一個本地 cache。

三、Informer 使用示例

在實際的開發工作中,Informer 主要用在兩處:

  • 在訪問 k8s apiserver 的客戶端作為一個 client 緩存對象使用;
  • 在一些自定義 controller 中使用,比如 operator 的開發;

1、下面是一個作為 client 的使用示例:

package main

import (
    "flag"
    "fmt"
    "log"
    "path/filepath"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/util/runtime"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // 初始化 client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Panic(err.Error())
    }

    stopper := make(chan struct{})
    defer close(stopper)
    
    // 初始化 informer
    factory := informers.NewSharedInformerFactory(clientset, 0)
    nodeInformer := factory.Core().V1().Nodes()
    informer := nodeInformer.Informer()
    defer runtime.HandleCrash()
    
    // 啟動 informer,list & watch
    go factory.Start(stopper)
    
    // 從 apiserver 同步資源,即 list 
    if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    // 使用自定義 handler
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
        DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
    })
    
    // 創建 lister
    nodeLister := nodeInformer.Lister()
    // 從 lister 中獲取所有 items
    nodeList, err := nodeLister.List(labels.Everything())
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("nodelist:", nodeList)
    <-stopper
}

func onAdd(obj interface{}) {
    node := obj.(*corev1.Node)
    fmt.Println("add a node:", node.Name)
}
Shared指的是多個 lister 共享同一個cache,而且資源的變化會同時通知到cache和 listers。這個解釋和上面圖所展示的內容的是一致的,cache我們在Indexer的介紹中已經分析過了,lister 指的就是OnAdd、OnUpdate、OnDelete 這些回調函數背后的對象

2、以下是作為 controller 使用的一個整體工作流程

(1) 創建一個控制器

  • 為控制器創建 workqueue
  • 創建 informer, 為 informer 添加 callback 函數,創建 lister

(2) 啟動控制器

  • 啟動 informer
  • 等待本地 cache sync 完成后, 啟動 workers

(3) 當收到變更事件后,執行 callback

  • 等待事件觸發
  • 從事件中獲取變更的 Object
  • 做一些必要的檢查
  • 生成 object key,一般是 namespace/name 的形式
  • 將 key 放入 workqueue 中

(4) worker loop

  • 等待從 workqueue 中獲取到 item,一般為 object key
  • 用 object key 通過 lister 從本地 cache 中獲取到真正的 object 對象
  • 做一些檢查
  • 執行真正的業務邏輯
  • 處理下一個 item

下面是自定義 controller 使用的一個參考:

var (
    masterURL  string
    kubeconfig string
)

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}

func main() {
    flag.Parse()

    stopCh := signals.SetupSignalHandler()

    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        glog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    // 所謂 Informer,其實就是一個帶有本地緩存和索引機制的、可以注冊 EventHandler 的 client
    // informer watch apiserver,每隔 30 秒 resync 一次(list)
    kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)

    controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes())

    //  啟動 informer
    go kubeInformerFactory.Start(stopCh)

     // start controller 
    if err = controller.Run(2, stopCh); err != nil {
        glog.Fatalf("Error running controller: %s", err.Error())
    }
}


// NewController returns a new network controller
func NewController(
    kubeclientset kubernetes.Interface,
    networkclientset clientset.Interface,
    networkInformer informers.NetworkInformer) *Controller {

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
    glog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:    kubeclientset,
        networkclientset: networkclientset,
        networksLister:   networkInformer.Lister(),
        networksSynced:   networkInformer.Informer().HasSynced,
        workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
        recorder:         recorder,
    }

    glog.Info("Setting up event handlers")
    // Set up an event handler for when Network resources change
    networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueNetwork,
        UpdateFunc: func(old, new interface{}) {
            oldNetwork := old.(*samplecrdv1.Network)
            newNetwork := new.(*samplecrdv1.Network)
            if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
                // Periodic resync will send update events for all known Networks.
                // Two different versions of the same Network will always have different RVs.
                return
            }
            controller.enqueueNetwork(new)
        },
        DeleteFunc: controller.enqueueNetworkForDelete,
    })

    return controller
}

自定義controller參考:https://github.com/resouer/k8s-controller-custom-resource


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM