《k8s-1.13版本源碼分析》- Informer 機制


源碼分析系列文章已經開源到github,地址如下:

      • github:
        https://github.com/farmer-hutao/k8s-source-code-analysis

      • gitbook:
        https://farmer-hutao.github.io/k8s-source-code-analysis


 

本文已經重寫,請看《Custom Controller 之 Informer

本文大綱

1. 概述

講 Informer 還是比較有壓力的,client-go 中的邏輯確實有點復雜,我甚至懷疑有“炫技”的成分。Informer 在很多組件的源碼中可以看到,尤其是 kube-controller-manager (寫這篇文章時我已經基本寫完 kube-scheduler 的源碼分析,准備着手寫 kube-controller-manager 了,鑒於 controlelr 和 client-go 關聯太大,跳過來先講講 Informer).

Informer 是 client-go 中一個比較核心的工具,通過 Informer 我們可以輕松 List/Get 某個資源對象,可以監聽資源對象的各種事件(比如創建和刪除)然后觸發回調函數,讓我們能夠在各種事件發生的時候能夠作出相應的邏輯處理。舉個例字,當 pod 數量變化的時候 deployment 是不是需要判斷自己名下的 pod 數量是否還和預期的一樣?如果少了是不是要考慮創建?

2. 架構概覽

1555502518252

如上圖,Informer 可以 watch API Server,監聽各種事件,然后回調事件 handler。這些事件 handler 可以做一些簡單的過濾,最終要將 item 放到 workequeue 中,這個 workerqueue 也是 client-go 提供的工具。最終用戶寫的 controller 負責啟動 worker 去消費這 workqueue 中的 item.

3. SharedInformerFactory

SharedInformerFactory 提供所有 API group 資源的 shared informers,也就是說通過這個 factory 可以使用 DeploymentInformer、ConfigMapInformer 等等各種 Informer,從而能夠實現針對各種資源的邏輯處理。

informers/factory.go:185

type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Admissionregistration() admissionregistration.Interface Apps() apps.Interface Auditregistration() auditregistration.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface // …… }

這個 interface 我們關注3個點:
  1. internalinterfaces.SharedInformerFactory接口
  2. ForResource()方法
  3. 其他方法的類型

3.1. 同質的方法

我們先看第三點,找個特例,從這個接口的一個方法往里面看一下類型含義,比如Apps() apps.Interface吧:

informers/apps/interface.go:29

type Interface interface { // V1 provides access to shared informers for resources in V1. V1() v1.Interface // V1beta1 provides access to shared informers for resources in V1beta1. V1beta1() v1beta1.Interface // V1beta2 provides access to shared informers for resources in V1beta2. V1beta2() v1beta2.Interface } 

很自然我們想到要繼續看v1.Interface

informers/apps/v1/interface.go:26

type Interface interface { // ControllerRevisions returns a ControllerRevisionInformer. ControllerRevisions() ControllerRevisionInformer // DaemonSets returns a DaemonSetInformer. DaemonSets() DaemonSetInformer // Deployments returns a DeploymentInformer. Deployments() DeploymentInformer // ReplicaSets returns a ReplicaSetInformer. ReplicaSets() ReplicaSetInformer // StatefulSets returns a StatefulSetInformer. StatefulSets() StatefulSetInformer } 

DeploymentInformer 又是什么類型呢?

informers/apps/v1/deployment.go:36

type DeploymentInformer interface { Informer() cache.SharedIndexInformer Lister() v1.DeploymentLister } 

可以看到這個 interface 的兩個方法的特點,這個接口要提供的是針對 Deployments 的 shared informer 和 lister. 我們先不糾結細節,到這里我們先理解SharedInformerFactory 提供所有 API group 資源的 shared informers這句話。

3.2. ForResource()方法

這個方法返回指定類型的 shared informer 的通用訪問方式,從實現中可以看到一些端倪:

informers/generic.go:80

func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { // Group=admissionregistration.k8s.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("initializerconfigurations"): return &genericInformer{resource: resource.GroupResource(), informer: f.Admissionregistration().V1alpha1().InitializerConfigurations().Informer()}, nil // …… } 

這里的返回值是 GenericInformer 類型,很簡潔:

informers/generic.go:58

type GenericInformer interface { Informer() cache.SharedIndexInformer Lister() cache.GenericLister } 

3.3. internalinterfaces.SharedInformerFactory

informers/internalinterfaces/factory_interfaces.go:34

type SharedInformerFactory interface { Start(stopCh <-chan struct{}) InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer } 

這里的 InformerFor() 方法和前面的 ForResource() 有點像,這里的返回值是 SharedIndexInformer,GenericInformer 的 Informer() 方法返回值也是 SharedIndexInformer:

tools/cache/shared_informer.go:66

type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer } 

3.4. sharedInformerFactory

sharedInformerFactory 對象是 SharedInformerFactory 接口的具體實現,從這個 struct 的屬性中我們可以看到一些有用的信息:

informers/factory.go:53

type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer startedInformers map[reflect.Type]bool } 

這里主要注意 client 和 informers,client 先不細說,大家從字面理解,當作一個可以和 api server 交互(CURD)的工具先就行。informers map[reflect.Type]cache.SharedIndexInformer明顯是存放了多個不同類型的 informers,這個 map 的 key 表達一種 obj 的類型,value 是 SharedIndexInformer,后面我們會講。

4. SharedIndexInformer

看 client-go 的過程中我一直在想到底哪個對象最能代表 Informer,后來覺得 SharedIndexInformer 應該可以被認為就是廣義的 Informer 了。

我們在前面看到 GenericInformer 的代碼,再附加對應 struct 貼一份:

type GenericInformer interface { Informer() cache.SharedIndexInformer Lister() cache.GenericLister } type genericInformer struct { informer cache.SharedIndexInformer resource schema.GroupResource } 

我們編碼的時候直接使用的都是 SharedInformerFactory,往里面跟可以認為 GenericInformer 是第一層,這個接口的方法很清晰表達了意圖。這里涉及到 informer+lister,我們一一來看。

SharedIndexInformer 的定義如下:

tools/cache/shared_informer.go:66

type SharedIndexInformer interface { SharedInformer AddIndexers(indexers Indexers) error GetIndexer() Indexer } 

這里包了一個 Interface:

tools/cache/shared_informer.go:43

type SharedInformer interface { // 留意這個方法 AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string } 

從函數名得不到太多直觀的信息,我們從 SharedIndexInformer 的實現 sharedIndexInformer 入手:

tools/cache/shared_informer.go:127

type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector listerWatcher ListerWatcher objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex } 

從 sharedIndexInformer 的屬性中可以看到幾個實實在在的對象:

  • indexer
  • controller
  • processor
  • listerWatcher

4.1. indexer

Indexer 接口提供了各種 index 函數,讓我們在 list 一個對象時可以使用這些索引函數:

tools/cache/index.go:27

type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error } 

這個接口的實現是 cache:

tools/cache/store.go:112

type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc } 

另外我們注意到包了一個接口 Store:

type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error } 

Store 是一個一般對象的存儲接口,Reflector(后面介紹)知道怎樣 watch server 然后更新 store. Reflector 能夠將 store 當作一個本地緩存系統,進而以類似隊列的方式工作(隊列中存的是等待被處理的對象)。

我們來看 Store 接口的一個實現:

type DeltaFIFO struct { items map[string]Deltas queue []string //…… } 

4.2. reflector

前面說到 Store 要給 Reflector 服務,我們看一下 Reflector 的定義:

tools/cache/reflector.go:47

type Reflector struct { name string metrics *reflectorMetrics expectedType reflect.Type // The destination to sync up with the watch source store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // …… } 

Reflector 要做的事情是 watch 一個指定的資源,然后將這個資源的變化反射到給定的store中。很明顯這里的兩個屬性 listerWatcher 和 store 就是這些邏輯的關鍵。

我們簡單看一下往 store 中添加數據的代碼:

tools/cache/reflector.go:324

switch event.Type { case watch.Added: err := r.store.Add(event.Object) // …… case watch.Modified: err := r.store.Update(event.Object) // …… case watch.Deleted: // …… err := r.store.Delete(event.Object) 

這個 store 一般用的是 DeltaFIFO,到這里大概就知道 Refactor 從 API Server watch 資源,然后寫入 DeltaFIFO 的過程了,大概長這個樣子:

1555420426565

然后我們關注一下 DeltaFIFO 的 knownObjects 屬性,在創建一個 DeltaFIFO 實例的時候有這樣的邏輯:

tools/cache/delta_fifo.go:59

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, knownObjects: knownObjects, } f.cond.L = &f.lock return f } 

這里接收了 KeyListerGetter 類型的 knownObjects,繼續往前跟可以看到我們前面提到的 SharedIndexInformer 的初始化邏輯中將 indexer 對象當作了這里的 knownObjects 的實參:

tools/cache/shared_informer.go:192

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) 

s.indexer 來自於:NewSharedIndexInformer() 函數的邏輯:

func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), clock: realClock, } return sharedIndexInformer } 

這里的 NewIndexer() 函數中就可以看到我們前面提到的 Indexer 接口的實現 cache 對象了:

!FILENMAE tools/cache/store.go:239

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } } 

Ok,我們可以基於前面的圖加一個框框了:

1555420360544

4.3. ResourceEventHandler

在 SharedInformer 接口中有一個方法AddEventHandler(handler ResourceEventHandler),我們看一下這個方法的一些細節。先來看 ResourceEventHandler 接口的定義:

tools/cache/controller.go:177

type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) } // adaptor type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) } 

ResourceEventHandler 要做的事情是 handle 一個資源對象的事件通知,在這個資源對象發生增加、修改、刪除的時候分別對應上面3個方法的邏輯。下面在 processor 部分我們繼續看 ResourceEventHandler.

4.4. controller

controller 對應這里的 Controller 接口:

tools/cache/controller.go:82

type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string } 

這里有個Run()方法比較顯眼,我們看一下 sharedIndexInformer 對 Run() 方法的實現:

tools/cache/shared_informer.go:189

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { // …… cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func() { // …… s.controller = New(cfg) // …… }() // …… s.controller.Run(stopCh) } 

關注這里基於 Config 創建了一個 Controller 賦值給 s.controller,然后調用了這個 s.controller.Run() 方法。我們看一下 New 里面是什么:

tools/cache/controller.go:89

// New makes a new Controller from the given Config. func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr } 

這里的 controller 類型是:

tools/cache/controller.go:75

type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock } 

4.4.1. controller.Run()

我們接着關注這個 controller 是怎么實現 Run() 方法的:

tools/cache/controller.go:100

func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() // listerWatcher 和 queue 等都用於創建這里的r eflector 了 r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() // reflector 是 controller 的一個關鍵屬性 c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) // 邏輯到了 processLoop 里面 wait.Until(c.processLoop, time.Second, stopCh) } 

這個 loop 是用來消費 queue 的:

tools/cache/controller.go:148

func (c *controller) processLoop() { for { // 這里的 Pop() 明顯是阻塞式的 // type PopProcessFunc func(interface{}) error // PopProcessFunc 用於處理 queue 中 pop 出來的 element obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } } 

這里的 PopProcessFunc 可能會讓人一時摸不着頭腦,其實這這值是一個函數類型func(interface{}) error,這里PopProcessFunc(c.config.Process)也就是把c.config.Process轉為了PopProcessFunc類型而已。

我們在前面有貼 sharedIndexInformer.Run() 這個函數,里面的Process: s.HandleDeltas,這一行其實就交代了這里的 PopProcessFunc 類型實例來源。

4.4.2. sharedIndexInformer.HandleDeltas()

tools/cache/shared_informer.go:344

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest // 循環處理這個對象的一系列狀態 for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } // distribute s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } // distribute s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } // distribute s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } 

先關注這里的 distribute 過程,注意到這個 distribute 的參數是 xxxNotification,下面 processor 部分會講到這些信號被處理的邏輯。

tools/cache/shared_informer.go:400

func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { // add listener.add(obj) } } else { for _, listener := range p.listeners { // add listener.add(obj) } } } 

tools/cache/shared_informer.go:506

func (p *processorListener) add(notification interface{}) { p.addCh <- notification } 

這里的 p.addCh 接收到信號,也就是下面 processor 部分的邏輯processorListener.pop()邏輯的起點。

4.5. processor

在 sharedIndexInformer 對象中有一個屬性processor *sharedProcessor,這個 sharedProcessor 類型定義如下:

tools/cache/shared_informer.go:375

type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group } 

這里的重點明顯是 listeners 屬性了,我們繼續看 listeners 的類型中 processorListener 的定義:

tools/cache/shared_informer.go:466

type processorListener struct { nextCh chan interface{} addCh chan interface{} handler ResourceEventHandler // …… } 

這里有一個我們前面提到的 handler,下面結合在一起跟一下handler 方法調用邏輯。

4.5.1. sharedProcessor.run()

從 processor 的 run() 方法開始看:

tools/cache/shared_informer.go:415

func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh // …… } 

撇開細節,可以看到這里調用了內部所有 listener 的 run() 和 pop() 方法。

4.5.2. sharedIndexInformer.Run()

我們前面寫 controller 時提到過這個Run() ,現在只關注一點,sharedIndexInformer 的 run 會調用到s.processor.run,也就是上面寫的 sharedProcessor.run().

4.5.3. processorListener.run()

sharedProcessor.run()往里調到了 processorListener.run() 和 processorListener.pop(),先看一下這個 run 做了什么:

tools/cache/shared_informer.go:540

func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { // 一分鍾執行一次這個 func() // 一分鍾內的又有幾次重試 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { // 等待信號 nextCh for next := range p.nextCh { // notification 是 next 的實際類型 switch notification := next.(type) { // update case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) // add case addNotification: p.handler.OnAdd(notification.newObj) // delete case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } return true, nil }) if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) } 

這個 run 過程不復雜,等待信號然后調用 handler 的增刪改方法做對應的處理邏輯。case 里的 Notification 再看一眼:

tools/cache/shared_informer.go:176

type updateNotification struct { oldObj interface{} newObj interface{} } type addNotification struct { newObj interface{} } type deleteNotification struct { oldObj interface{} } 

另外注意到for next := range p.nextCh是下面的 case 執行的前提,也就是說觸發點是 p.nextCh,我們接着看 pop 過程( pod的代碼花了我不少時間,這里的邏輯不簡單):

tools/cache/shared_informer.go:510

func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop // 這個 chan 是沒有初始化的 var nextCh chan<- interface{} // 可以接收任意類型,其實是對應前面提到的 addNotification 等 var notification interface{} // for 循環套 select 是比較常規的寫法 for { select { //第一遍執行到這里的時候由於 nexth 沒有初始化,所以這里會阻塞(和notification有沒有值沒有關系,notification哪怕是nil也可以寫入 chan interface{} 類型的 channel) case nextCh <- notification: var ok bool // 第二次循環,下面一個case運行過之后才有這里的邏輯 notification, ok = p.pendingNotifications.ReadOne() if !ok { // 將 channel 指向 nil 相當於初始化的逆操作,會使得這個 case 條件阻塞 nextCh = nil } // 這里是 for 首次執行邏輯的入口 case notificationToAdd, ok := <-p.addCh: if !ok { return } // 如果是 nil,也就是第一個通知過來的時候,這時不需要用到緩存(和下面else相對) if notification == nil { // 賦值給 notification,這樣上面一個 case 在接下來的一輪循化中就可以讀到了 notification = notificationToAdd // 相當於復制引用,nextCh 就指向了 p.nextCh,使得上面 case 寫 channel 的時候本質上操作了 p.nextCh,從而 run 能夠讀到 p.nextCh 中的信號 nextCh = p.nextCh } else { // 處理到這里的時候,其實第一個 case 已經有了首個 notification,這里的邏輯是一下子來了太多 notification 就往 pendingNotifications 緩存,在第一個 case 中 有對應的 ReadOne()操作 p.pendingNotifications.WriteOne(notificationToAdd) } } } } 

這里的 pop 邏輯的入口是<-p.addCh,我們前面 controller 部分講到了這個 addCh 的來源。繼續看其他邏輯。

4.6. listerwatcher

ListerWatcher 的出鏡率還是挺高的,大家應該在很多文章里都有看到過這個詞。我們先看一下接口定義:

tools/cache/listwatch.go:31

type ListerWatcher interface { // List should return a list type object; List(options metav1.ListOptions) (runtime.Object, error) // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) } type ListFunc func(options metav1.ListOptions) (runtime.Object, error) type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool } 

從這些代碼中我們能夠體會到一些 ListerWatcher 的用意,但心里應該還是糾結的。我們看一下 deployment 的 list-watch.

我們是從 sharedIndexInformer 中看到有個屬性 listerWatcher,DeploymentInformer 的創建代碼如下:

informers/apps/v1beta2/deployment.go:50

// 注意到返回值類型是 SharedIndexInformer,也就是說這里的初始化肯定需要給 listerWatcher 屬性賦值 func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil) } func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( // 這里初始化一個 ListWatch 類型實例 &cache.ListWatch{ // ListFunc 和 WatchFunc 的賦值 ListFunc: func(options v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } // 邏輯是通過client的 xxx 實現的,這個 client 其實就是 Clientset return client.AppsV1beta2().Deployments(namespace).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1beta2().Deployments(namespace).Watch(options) }, }, &appsv1beta2.Deployment{}, resyncPeriod, indexers, ) } 

以 list 為例,client.AppsV1beta2().Deployments(namespace).List(options)其實是 client 提供的邏輯了,我們可以看一下 List() 方法對應的接口:

// DeploymentInterface has methods to work with Deployment resources. type DeploymentInterface interface { Create(*v1beta2.Deployment) (*v1beta2.Deployment, error) Update(*v1beta2.Deployment) (*v1beta2.Deployment, error) UpdateStatus(*v1beta2.Deployment) (*v1beta2.Deployment, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error Get(name string, options v1.GetOptions) (*v1beta2.Deployment, error) List(opts v1.ListOptions) (*v1beta2.DeploymentList, error) Watch(opts v1.ListOptions) (watch.Interface, error) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta2.Deployment, err error) DeploymentExpansion } 

順着這個接口再往里跟很快就到 http 協議層了,要了然整個 list-watch 的原理還得結合 API Server 的代碼,我們今天先不講。

5. 小結

Informer 的實現還是有點復雜的,啃的過程中很容易一個不小心就被繞暈了。今天我們以開頭的那張圖結尾。以后講Operator 的時候會基於這個圖增加幾個框框。

1555502518252


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM