1. 寫在前面
個人主頁: https://gzh.readthedocs.io
關注容器技術、關注
Kubernetes
。問題或建議,請公眾號(
double12gzh
)留言。
依然秉承本系列的傳統,在文章開始都會再次上一下下面這經經典的圖(足見其重要性,哈哈哈)。
在client-go系列之1---client-go代碼結構講解中簡單介紹一個client-go中的相關的模塊(即圖中上半部分),其實,在client-go中不只是有前面提到的模塊,還包括上圖中下半部分的內容,即自定義控制器部分,如:
- informer reference: 是一個
Informer
的實例,主要用於處理與CRD(自定義資源)對象相關的。當我們開發自定義控制器(custom controller)時,需要這個控制器開創建相匹配的Informer
。 - indexer reference:是一個
Indexer
的實例,主要用於處理與CRD(自定義資源)對象相關的。當我們開發自定義控制器時,需要創建Indexer
的實例,這個實例主要作用是實現存儲+索引
。 - WorkQueue:工作者隊列。前面我們提到過
Informer
,它除了更新本地緩存之外,還要將數據同步給相應控制器,WorkQueue
就是為了數據同步的問題而產生的。當有資源被添加、修改或刪除,Informer
/SharedInformer
就會將相應的事件加入到WorkQueue
中。其它所有的控制器需要排隊對這個queue進行讀取,如果某個控制器發現這個事件與自己相關,就執行相應的操作。如果操作失敗,就會把剛才取出的事件再放回到WorkQueue
中,等再輪到自己執行時會再去重試這次失敗的操作。如果操作成功,就將該事件從隊列中刪除。 - Resource Event Handler:這是一個回調函數,當一個
Informer
/SharedInformer
要分發一個對象到控制器時,會調用此函數。例如:將對象的Key
放在WorkQueue
中並等待后續的處理。 - Process Item:用戶自定義的處理
WorkQueue
中的相應Item
的函數。 如,我們可以在這里面使用Indexer
或Listing wrapper
來根據相應的Key
檢索對象。
Item的內容如下:
queue中的內容如下:
在client-go的controller中給出了如何定義
Indexer
和Informer
的方法,代碼位置:client-go/tools/cache/controller.go,代碼如下:
344 func NewIndexerInformer(
345 lw ListerWatcher, // 用於獲取/監控需要Informer處理的資源
346 objType runtime.Object, // 訂閱的對象類型
347 resyncPeriod time.Duration, // 非0時,將會一直list我們所關心的對象; 0時,‘重新list’將會被推遲
348 h ResourceEventHandler, // 用於處理與resources相關的事件
349 indexers Indexers,
350 ) (Indexer, Controller) {
351 // This will hold the client state, as we know it.
352 clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
353
354 return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
355 }
ResourceEventHandler的定義如下。其代碼位置: client-go/tools/cache/controller.go。這里的
Evnet
只是具有通知
的作用,因為,我們不應該對這里收到的對象進行任何修改。
212 type ResourceEventHandler interface {
213 OnAdd(obj interface{}) // 當有新的對象被創建時,將會調用這個函數
214 OnUpdate(oldObj, newObj interface{}) // 當對象被修改時,將會調用這個函數。除此之外,當有`re-list`操作時,這個函數也會被再次調用
215 OnDelete(obj interface{}) // 當對象被刪除時,將會調用這個函數。
216 }
2. Informer簡介
一句話背景介紹:為了減少當多個控制器對k8s-api-server進行大量訪問時對api-server造成壓力。
2.1 產生的背景
隨着Controller越來越多,如果Controller直接訪問k8s-apiserver,那么將會導致其壓力過大,於是在這樣的背景下就有了Informer
的概念。其發展到今天這個架構,大概可以總結出以下迭代思路:
第一階段,Controller
直接訪問k8s-api-server。存在的問題:多個控制器大量訪問k8s-apiserver時會對其造成巨大的壓力。
第二階段,Informer
代替Controller
去訪問k8s-apiserver。而Controller
的所有操作操作(如:查狀態、對資源進行伸縮等)都和Informer
進行交互。但Informer
沒有必要每次都去訪問k8s-apiserver,它只要在需要的時候通過ListAndWatch
(即通過k8s List API獲取所有資源的最新狀態;通過Wath API去監聽這些資源狀態的變化)與k8s-apiserver交互即可。
ListAndWatch的代碼位置: client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error{
...
}
第三階段, Informer
並沒有直接訪問k8s-api-server,而是通過一個叫Reflector
的對象進行api-server的訪問。上面所說的 ListAndWatch
事實上是由Reflector`實現的。
第四階段, 通過指定資源類型來Watch
特定資源。
// 代碼位置: client-go/tools/cache/listwatch.go
36 // Watcher is any object that knows how to start a watch on a resource.
37 type Watcher interface {
38 // Watch should begin a watch at the specified version.
39 Watch(options metav1.ListOptions) (watch.Interface, error)
40 }
第五階段,定義SharedInformer
。如果Controller
與Informer
是一一對應的關系,那么k8s-api-server的壓力也還是挺大的。但是類似於Pod這樣的資源來說,Deployment
和StatefulSet
都能對它進行管理,當多個控制器同時想查Pod的狀態時,實現上,只需要有一個Informer
就能滿足需求了,即: SharedInformered
。
第六階段,解決多個不同的控制器排除與重試問題,引入DeltaFIFOQueue
。每當資源被修改時,Reflector
就會收到事件通知,並將對應的事件放入DeltaFIFOQueue
中。另外,SharedInformer
會不斷從DeltaFIFOQueue
中讀取事件並更新本地緩存(ThreadSafeStore
)的狀態。
2.2 主要功能
- 通過
List()
/Get()
(代碼位置:client-go/tools/cache/listwatch.go)獲取資源對象 - 監聽事件(
OnAdd
,OnUpdate
,OnDelete
),並觸發回調(ResourceEventHandler
) - 支持二級緩存(
DeltaFIFOQueue
和ThreadSafeStore
,前者用於存儲Watch
返回的事件,后者是一個LocalStore,能夠被Lister
的List()
和Getter
的Get()
方法訪問)
需要注意的是,
Informer
與k8s-apiserver
之間沒有同步機制,但是Informer
內部的二級緩存之間是有同步機制的。
上面提到的List()
/Get()
的定義位於:client-go/tools/cache/listwatch.go
29 // Lister is any object that knows how to perform an initial list.
30 type Lister interface {
31 // List should return a list type object; the Items field will be extracted, and the
32 // ResourceVersion field will be used to start the watch in the right place.
33 List(options metav1.ListOptions) (runtime.Object, error)
34 }
...
64 // Getter interface knows how to access Get method from RESTClient.
65 type Getter interface {
66 Get() *restclient.Request
67 }
2.3 主要模塊
- Reflector
- DeltaFIFO
- ThreadSafeStore(LocalStore)
- Controller
- Lister
- Processor
需要注意的是:
- 這里提到的
Controller
不是Kubernetes Controller
(前幾篇文章中我們實際上已經提到過,再次重審一下,這兩個Controller並沒有任何聯系)Reflector
主要用於監聽與指定資源類型相關的事件DeltaFIFO
和ThreadSafeStore(LocalStore)
是Informer
的二級緩存Lister
主要是被調用List/Get
方法Processor
中記錄了所有的回調函數(即ResourceEventHandler
)的實例,並負責觸發之
2.4 類圖
2.5 SharedInformer實現機制
當同一個資源(如:pdod)的
Informer
被實例化多次后,將會產生多個Reflector
,如果這些Reflector
都去調用ListAndWatch
來獲取資源時,將會對k8s-apiserver造成巨大的壓力。
SharedInformer
的意思是指:對於屬於同一類型的資源來說,他們將會共享同一個Informer
和Reflector
,其實現代碼如下:
// 代碼位置: client-go/informers/factory.go
55 type sharedInformerFactory struct {
56 client kubernetes.Interface
57 namespace string
58 tweakListOptions internalinterfaces.TweakListOptionsFunc
59 lock sync.Mutex
60 defaultResync time.Duration
61 customResync map[reflect.Type]time.Duration
62
63 informers map[reflect.Type]cache.SharedIndexInformer // sharedInformer的數據都存放在這個map中。
64 // startedInformers is used for tracking which informers have been started.
65 // This allows Start() to be called multiple times safely.
66 startedInformers map[reflect.Type]bool
67 }
2.6 不同資源的Informer定義
對於不同的資源(如:pods, deployments, ...)都有一個與之相對應的xxxInformer
存在,他們的位置為(以pod為例):
// 代碼位置:clent-go/informers/core/v1/pod.go
35 // PodInformer 提供了與pod相關的shared informer及lister進行交互的途徑
36 // 每個k8s resource都會有這樣一個Informer,且Informer中都有以下兩個方法:Informer(), Lister()
37 type PodInformer interface {
38 Informer() cache.SharedIndexInformer
39 Lister() v1.PodLister
40 }
41
42 type podInformer struct {
43 factory internalinterfaces.SharedInformerFactory
44 tweakListOptions internalinterfaces.TweakListOptionsFunc
45 namespace string
46 }
調用不同資源的Informer的使用示例如下:
deployInformer = sharedInformer.
apps(). // client-go/informers/apps
V1(). // client-go/informers/apps/v1
Deployments(). // client-go/informers/apps/v1/deployment.go
Informer() // client-go/informers/apps/v1/deployment.go: type DeploymentInformer interface{}
3. Controller
3.1 Controller定義
代碼位置: client-go/tools/cache/controller.go
96 // 這是一個Base Controller, 是其它所有控制器的`基類`。在`SharedInformer`中將會被用到。
97 //
98 type Controller interface {
99 // Run 只做兩件事情:
100 // 1. 構建並啟動一個`Reflector`。把從`Config.ListerWatcher`中拋出的對象或通知發送到`Config.Queue`中,另外也可能會觸發隊列同步`Resync`
102 // 2. 不斷的從queue中彈出item,並使用Config.ProcessFunc進行處理。
103 //
104 // 當channel `stopCh`被關閉時,上面兩個操作會停止。
105 Run(stopCh <-chan struct{})
106
107 // HasSynced Config的隊列是否已經同步過了
108 HasSynced() bool
109
110 // LastSyncResourceVersion delegates to the Reflector when there
111 // is one, otherwise returns the empty string
112 LastSyncResourceVersion() string
113 }
Controller
的具體實現如下:
89 type controller struct {
90 config Config
91 reflector *Reflector
92 reflectorMutex sync.RWMutex
93 clock clock.Clock
94 }
從上面的實現中, 我們可以看到, 一個Controller
實際上是一個以Config
為參數,並將會被Informer
使用到的一個low-level的控制器。
3.2 Controller關鍵方法
// 代碼位置: client-go/tools/cache/controller.go
// Run 開始處理items,當stopCh被關閉或stopCh收到某個值是將會停止執行。
127 func (c *controller) Run(stopCh <-chan struct{}) {
128 defer utilruntime.HandleCrash()
129 go func() {
130 <-stopCh
131 c.config.Queue.Close()
132 }()
133 r := NewReflector( // 創建一個Reflector, 用於ListAndWatch, 從而獲取指定資源的當前狀態
134 c.config.ListerWatcher, // List/Watch指定的資源對象
135 c.config.ObjectType,
136 c.config.Queue,
137 c.config.FullResyncPeriod,
138 )
139 r.ShouldResync = c.config.ShouldResync
140 r.WatchListPageSize = c.config.WatchListPageSize
141 r.clock = c.clock
142 if c.config.WatchErrorHandler != nil {
143 r.watchErrorHandler = c.config.WatchErrorHandler
144 }
145
146 c.reflectorMutex.Lock()
147 c.reflector = r
148 c.reflectorMutex.Unlock()
149
150 var wg wait.Group
151
152 wg.StartWithChannel(stopCh, r.Run) // r.Run中的最主要的方法就是執行Reflector的ListAndWatch()獲取資源對象的值
153 // c.ProcessLoop(client-go/tools/cache/controller.go)會從queue中取出資源Delta並進行處理
154 wait.Until(c.processLoop, time.Second, stopCh)
155 wg.Wait()
156 }
c.processLoop的實現如下:
181 func (c *controller) processLoop() {
182 for {
183 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
184 if err != nil {
185 if err == ErrFIFOClosed {
186 return
187 }
188 if c.config.RetryOnError {
189 // This is the safe way to re-enqueue.
190 c.config.Queue.AddIfNotPresent(obj) // Queue是一個DeltaFIFO
191 }
192 }
193 }
194 }
3.3 Controller小結
- 有一個FIFO Queue的索引
- 有一個ListWatcher的索引
- 通過Pop從FIFO queue中消費資源對象(即: items)
- 創建Reflector
- 提供一個proccessLoop處理資源對象以達到期望的狀態
另外需要注意的是,不要把tools/cache/controller.go中的
Controller
與Cumstom Controller
混為一談,兩者是完全不同的兩個東西。
3.4 Controller示例
前面從“理論”方面對Controller/Informer做了簡單的介紹,俗話說的好“紙上得來終覺淺,絕知此事要耕行”,建議學完上面的理論后,還是結合下面例子在實踐中再體會一下。
Controller的工作流程:
請參考: client-go/tools/cache/controller_test.go: Example()
歡迎關注我的微信公眾號: