client-go系列之5---Informer


1. 寫在前面

個人主頁: https://gzh.readthedocs.io

關注容器技術、關注Kubernetes

問題或建議,請公眾號(double12gzh)留言。

依然秉承本系列的傳統,在文章開始都會再次上一下下面這經經典的圖(足見其重要性,哈哈哈)。

client-go系列之1---client-go代碼結構講解中簡單介紹一個client-go中的相關的模塊(即圖中上半部分),其實,在client-go中不只是有前面提到的模塊,還包括上圖中下半部分的內容,即自定義控制器部分,如:

  • informer reference: 是一個Informer的實例,主要用於處理與CRD(自定義資源)對象相關的。當我們開發自定義控制器(custom controller)時,需要這個控制器開創建相匹配的Informer
  • indexer reference:是一個Indexer的實例,主要用於處理與CRD(自定義資源)對象相關的。當我們開發自定義控制器時,需要創建Indexer的實例,這個實例主要作用是實現存儲+索引
  • WorkQueue:工作者隊列。前面我們提到過Informer,它除了更新本地緩存之外,還要將數據同步給相應控制器,WorkQueue就是為了數據同步的問題而產生的。當有資源被添加、修改或刪除,Informer/SharedInformer就會將相應的事件加入到WorkQueue中。其它所有的控制器需要排隊對這個queue進行讀取,如果某個控制器發現這個事件與自己相關,就執行相應的操作。如果操作失敗,就會把剛才取出的事件再放回到WorkQueue中,等再輪到自己執行時會再去重試這次失敗的操作。如果操作成功,就將該事件從隊列中刪除。
  • Resource Event Handler:這是一個回調函數,當一個Informer/SharedInformer要分發一個對象到控制器時,會調用此函數。例如:將對象的Key放在WorkQueue中並等待后續的處理。
  • Process Item:用戶自定義的處理WorkQueue中的相應Item的函數。 如,我們可以在這里面使用IndexerListing wrapper來根據相應的Key檢索對象。

Item的內容如下:

queue中的內容如下:

在client-go的controller中給出了如何定義IndexerInformer的方法,代碼位置:client-go/tools/cache/controller.go,代碼如下:

  344 func NewIndexerInformer(
  345     lw ListerWatcher,                 // 用於獲取/監控需要Informer處理的資源
  346     objType runtime.Object,           // 訂閱的對象類型
  347     resyncPeriod time.Duration,       // 非0時,將會一直list我們所關心的對象; 0時,‘重新list’將會被推遲
  348     h ResourceEventHandler,           // 用於處理與resources相關的事件
  349     indexers Indexers,
  350 ) (Indexer, Controller) {
  351     // This will hold the client state, as we know it.
  352     clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
  353
  354     return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
  355 }

ResourceEventHandler的定義如下。其代碼位置: client-go/tools/cache/controller.go。這里的Evnet只是具有通知的作用,因為,我們不應該對這里收到的對象進行任何修改。

212 type ResourceEventHandler interface {
213     OnAdd(obj interface{})                  // 當有新的對象被創建時,將會調用這個函數
214     OnUpdate(oldObj, newObj interface{})    // 當對象被修改時,將會調用這個函數。除此之外,當有`re-list`操作時,這個函數也會被再次調用
215     OnDelete(obj interface{})               // 當對象被刪除時,將會調用這個函數。
216 }

2. Informer簡介

一句話背景介紹:為了減少當多個控制器對k8s-api-server進行大量訪問時對api-server造成壓力。

2.1 產生的背景

隨着Controller越來越多,如果Controller直接訪問k8s-apiserver,那么將會導致其壓力過大,於是在這樣的背景下就有了Informer的概念。其發展到今天這個架構,大概可以總結出以下迭代思路:

第一階段,Controller直接訪問k8s-api-server。存在的問題:多個控制器大量訪問k8s-apiserver時會對其造成巨大的壓力。

第二階段,Informer代替Controller去訪問k8s-apiserver。而Controller的所有操作操作(如:查狀態、對資源進行伸縮等)都和Informer進行交互。但Informer沒有必要每次都去訪問k8s-apiserver,它只要在需要的時候通過ListAndWatch(即通過k8s List API獲取所有資源的最新狀態;通過Wath API去監聽這些資源狀態的變化)與k8s-apiserver交互即可。

ListAndWatch的代碼位置: client-go/tools/cache/reflector.go

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error{
...
}

第三階段, Informer並沒有直接訪問k8s-api-server,而是通過一個叫Reflector的對象進行api-server的訪問。上面所說的 ListAndWatch 事實上是由Reflector`實現的。

第四階段, 通過指定資源類型來Watch特定資源。

// 代碼位置: client-go/tools/cache/listwatch.go
36 // Watcher is any object that knows how to start a watch on a resource.
37 type Watcher interface {
38     // Watch should begin a watch at the specified version.
39     Watch(options metav1.ListOptions) (watch.Interface, error)
40 }

第五階段,定義SharedInformer。如果ControllerInformer是一一對應的關系,那么k8s-api-server的壓力也還是挺大的。但是類似於Pod這樣的資源來說,DeploymentStatefulSet都能對它進行管理,當多個控制器同時想查Pod的狀態時,實現上,只需要有一個Informer就能滿足需求了,即: SharedInformered

第六階段,解決多個不同的控制器排除與重試問題,引入DeltaFIFOQueue。每當資源被修改時,Reflector就會收到事件通知,並將對應的事件放入DeltaFIFOQueue中。另外,SharedInformer會不斷從DeltaFIFOQueue中讀取事件並更新本地緩存(ThreadSafeStore)的狀態。

2.2 主要功能

  • 通過List()/Get()(代碼位置:client-go/tools/cache/listwatch.go)獲取資源對象
  • 監聽事件(OnAdd, OnUpdate, OnDelete),並觸發回調(ResourceEventHandler)
  • 支持二級緩存(DeltaFIFOQueueThreadSafeStore,前者用於存儲Watch返回的事件,后者是一個LocalStore,能夠被ListerList()GetterGet()方法訪問)

需要注意的是,Informerk8s-apiserver之間沒有同步機制,但是Informer內部的二級緩存之間是有同步機制的。

上面提到的List()/Get()的定義位於:client-go/tools/cache/listwatch.go

29 // Lister is any object that knows how to perform an initial list.
30 type Lister interface {
31     // List should return a list type object; the Items field will be extracted, and the
32     // ResourceVersion field will be used to start the watch in the right place.
33     List(options metav1.ListOptions) (runtime.Object, error)
34 }
... 
64 // Getter interface knows how to access Get method from RESTClient.
65 type Getter interface {
66     Get() *restclient.Request
67 }

2.3 主要模塊

  • Reflector
  • DeltaFIFO
  • ThreadSafeStore(LocalStore)
  • Controller
  • Lister
  • Processor

需要注意的是:

  • 這里提到的Controller不是Kubernetes Controller(前幾篇文章中我們實際上已經提到過,再次重審一下,這兩個Controller並沒有任何聯系)
  • Reflector主要用於監聽與指定資源類型相關的事件
  • DeltaFIFOThreadSafeStore(LocalStore)Informer的二級緩存
  • Lister主要是被調用List/Get方法
  • Processor中記錄了所有的回調函數(即 ResourceEventHandler)的實例,並負責觸發之

2.4 類圖

出處

2.5 SharedInformer實現機制

當同一個資源(如:pdod)的Informer被實例化多次后,將會產生多個Reflector,如果這些Reflector都去調用ListAndWatch來獲取資源時,將會對k8s-apiserver造成巨大的壓力。

SharedInformer的意思是指:對於屬於同一類型的資源來說,他們將會共享同一個InformerReflector,其實現代碼如下:

// 代碼位置: client-go/informers/factory.go
55 type sharedInformerFactory struct {
56     client           kubernetes.Interface
57     namespace        string
58     tweakListOptions internalinterfaces.TweakListOptionsFunc
59     lock             sync.Mutex
60     defaultResync    time.Duration
61     customResync     map[reflect.Type]time.Duration
62
63     informers map[reflect.Type]cache.SharedIndexInformer     // sharedInformer的數據都存放在這個map中。
64     // startedInformers is used for tracking which informers have been started.
65     // This allows Start() to be called multiple times safely.
66     startedInformers map[reflect.Type]bool
67 }

2.6 不同資源的Informer定義

對於不同的資源(如:pods, deployments, ...)都有一個與之相對應的xxxInformer存在,他們的位置為(以pod為例):

// 代碼位置:clent-go/informers/core/v1/pod.go
   35 // PodInformer 提供了與pod相關的shared informer及lister進行交互的途徑 
   36 // 每個k8s resource都會有這樣一個Informer,且Informer中都有以下兩個方法:Informer(), Lister()
   37 type PodInformer interface {
   38     Informer() cache.SharedIndexInformer
   39     Lister() v1.PodLister
   40 }
   41
   42 type podInformer struct {
   43     factory          internalinterfaces.SharedInformerFactory
   44     tweakListOptions internalinterfaces.TweakListOptionsFunc
   45     namespace        string
   46 }

調用不同資源的Informer的使用示例如下:

deployInformer = sharedInformer.
               apps().              // client-go/informers/apps
               V1().                // client-go/informers/apps/v1
               Deployments().       // client-go/informers/apps/v1/deployment.go
               Informer()           // client-go/informers/apps/v1/deployment.go: type DeploymentInformer interface{}

3. Controller

3.1 Controller定義

代碼位置: client-go/tools/cache/controller.go

96 // 這是一個Base Controller, 是其它所有控制器的`基類`。在`SharedInformer`中將會被用到。
97 // 
98 type Controller interface {
99      // Run 只做兩件事情:
100     // 1. 構建並啟動一個`Reflector`。把從`Config.ListerWatcher`中拋出的對象或通知發送到`Config.Queue`中,另外也可能會觸發隊列同步`Resync`
102     // 2. 不斷的從queue中彈出item,並使用Config.ProcessFunc進行處理。
103     // 
104     // 當channel `stopCh`被關閉時,上面兩個操作會停止。
105     Run(stopCh <-chan struct{})
106
107     // HasSynced Config的隊列是否已經同步過了
108     HasSynced() bool
109
110     // LastSyncResourceVersion delegates to the Reflector when there
111     // is one, otherwise returns the empty string
112     LastSyncResourceVersion() string
113 }

Controller的具體實現如下:

   89 type controller struct {
   90     config         Config
   91     reflector      *Reflector
   92     reflectorMutex sync.RWMutex
   93     clock          clock.Clock
   94 }

從上面的實現中, 我們可以看到, 一個Controller實際上是一個以Config為參數,並將會被Informer使用到的一個low-level的控制器。

3.2 Controller關鍵方法

// 代碼位置: client-go/tools/cache/controller.go
// Run 開始處理items,當stopCh被關閉或stopCh收到某個值是將會停止執行。
127 func (c *controller) Run(stopCh <-chan struct{}) {
128     defer utilruntime.HandleCrash()
129     go func() {
130         <-stopCh
131         c.config.Queue.Close()
132     }()
133     r := NewReflector(             // 創建一個Reflector, 用於ListAndWatch, 從而獲取指定資源的當前狀態
134         c.config.ListerWatcher,    // List/Watch指定的資源對象
135         c.config.ObjectType,
136         c.config.Queue,
137         c.config.FullResyncPeriod,
138     )
139     r.ShouldResync = c.config.ShouldResync
140     r.WatchListPageSize = c.config.WatchListPageSize
141     r.clock = c.clock
142     if c.config.WatchErrorHandler != nil {
143         r.watchErrorHandler = c.config.WatchErrorHandler
144     }
145
146     c.reflectorMutex.Lock()
147     c.reflector = r
148     c.reflectorMutex.Unlock()
149
150     var wg wait.Group
151
152     wg.StartWithChannel(stopCh, r.Run) // r.Run中的最主要的方法就是執行Reflector的ListAndWatch()獲取資源對象的值
153     // c.ProcessLoop(client-go/tools/cache/controller.go)會從queue中取出資源Delta並進行處理
154     wait.Until(c.processLoop, time.Second, stopCh)
155     wg.Wait()
156 }

c.processLoop的實現如下:

181 func (c *controller) processLoop() {
182     for {
183         obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
184         if err != nil {
185             if err == ErrFIFOClosed {
186                 return
187             }
188             if c.config.RetryOnError {
189                 // This is the safe way to re-enqueue.
190                 c.config.Queue.AddIfNotPresent(obj)  // Queue是一個DeltaFIFO
191             }
192         }
193     }
194 }

3.3 Controller小結

  • 有一個FIFO Queue的索引
  • 有一個ListWatcher的索引
  • 通過Pop從FIFO queue中消費資源對象(即: items)
  • 創建Reflector
  • 提供一個proccessLoop處理資源對象以達到期望的狀態

另外需要注意的是,不要把tools/cache/controller.go中的ControllerCumstom Controller混為一談,兩者是完全不同的兩個東西。

3.4 Controller示例

前面從“理論”方面對Controller/Informer做了簡單的介紹,俗話說的好“紙上得來終覺淺,絕知此事要耕行”,建議學完上面的理論后,還是結合下面例子在實踐中再體會一下。

Controller的工作流程:
請參考: client-go/tools/cache/controller_test.go: Example()


歡迎關注我的微信公眾號:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM