Custom Controller 之 Informer
- 概述
- 架構概覽
- reflector - List & Watch API Server
- watchHandler - add obj to delta fifo
- Informer (controller) - pop obj from delta fifo
- Add obj to Indexer (Thread safe store)
- sharedIndexInformer
1. 概述
本節標題寫的是 Informer,不過我們的內容不局限於狹義的 Informer 部分,只是 Informer 最有代表性,其他的 Reflector 等也不好獨立開來講。
Informer 在很多組件的源碼中可以看到,尤其是 kube-controller-manager (寫這篇文章時我已經基本寫完 kube-scheduler 的源碼分析,着手寫 kube-controller-manager 了,鑒於 controlelr 和 client-go 關聯比較大,跳過來先講講典型的控制器工作流程中涉及到的 client-go 部分).
Informer 是 client-go 中一個比較核心的工具,通過 Informer(實際我們用到的都不是單純的 informer,而是組合了各種工具的 sharedInformerFactory) 我們可以輕松 List/Get 某個資源對象,可以監聽資源對象的各種事件(比如創建和刪除)然后觸發回調函數,讓我們能夠在各種事件發生的時候能夠作出相應的邏輯處理。舉個例字,當 pod 數量變化的時候 deployment 是不是需要判斷自己名下的 pod 數量是否還和預期的一樣?如果少了是不是要考慮創建?
2. 架構概覽
自定義控制器的工作流程基本如下圖所示,我們今天要分析圖中上半部分的邏輯。
我們開發自定義控制器的時候用到的“機制”主要定義在 client-go 的 tool/cache下:
我們根據圖中的9個步驟來跟源碼
3. reflector - List & Watch API Server
Reflector 會監視特定的資源,將變化寫入給定的存儲中,也就是 Delta FIFO queue.
3.1. Reflector 對象
Reflector 的中文含義是反射器,我們先看一下類型定義:
tools/cache/reflector.go:47
type Reflector struct { name string metrics *reflectorMetrics expectedType reflect.Type store Store listerWatcher ListerWatcher period time.Duration resyncPeriod time.Duration ShouldResync func() bool clock clock.Clock lastSyncResourceVersion string lastSyncResourceVersionMutex sync.RWMutex }
reflector.go
中主要就 Reflector 這個 struct 和相關的一些函數:
3.2. ListAndWatch
ListAndWatch 首先 list 所有 items,獲取當前的資源版本信息,然后使用這個版本信息來 watch(也就是從這個版本開始的所有資源變化會被關注)。我們看一下這里的 ListAndWatch 方法主要邏輯:
tools/cache/reflector.go:168
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // list 資源 list, err := r.listerWatcher.List(options) // 提取 items items, err := meta.ExtractList(list) // 更新存儲(Delta FIFO)中的 items if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion) // …… for { select { case <-stopCh: return nil default: } timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, TimeoutSeconds: &timeoutSeconds, } r.metrics.numberOfWatches.Inc() // 開始 watch w, err := r.listerWatcher.Watch(options) // …… // w 交給 watchHandler 處理 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } return nil } } }
4. watchHandler - add obj to delta fifo
前面講到 ListAndWatch 函數的最后一步邏輯是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 資源對象,最后交給 watchHandler 處理,所以 watchHandler 基本可以猜到是將有變化的資源添加到 Delta FIFO 中了。
tools/cache/reflector.go:287
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { // …… loop: // 這里進入一個無限循環 for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err // watch 返回值中的一個 channel case event, ok := <-w.ResultChan(): // …… newResourceVersion := meta.GetResourceVersion() // 根據事件類型處理,有 Added Modified Deleted 3種 // 3 種事件分別對應 store 中的增改刪操作 switch event.Type { case watch.Added: err := r.store.Add(event.Object) case watch.Modified: err := r.store.Update(event.Object) case watch.Deleted: err := r.store.Delete(event.Object) default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } // …… return nil }
5. Informer (controller) - pop obj from delta fifo
5.1. Controller
一個 Informer 需要實現 Controller 接口:
tools/cache/controller.go:82
type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string }
一個基礎的 Controller 實現如下:
tools/cache/controller.go:75
type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock }
controller 類型結構如下:
可以看到主要對外暴露的邏輯是 Run() 方法,我們看一下 Run() 中的邏輯:
tools/cache/controller.go:100
func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() // 內部 Reflector 創建 r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) // 循環調用 processLoop wait.Until(c.processLoop, time.Second, stopCh) }
5.2. processLoop
tools/cache/controller.go:148
func (c *controller) processLoop() { for { // 主要邏輯 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // 異常處理 } }
這里的 Queue 就是 Delta FIFO,Pop 是個阻塞方法,內部實現時會逐個 pop 隊列中的數據,交給 PopProcessFunc 處理。我們先不看 Pop 的實現,關注一下 PopProcessFunc 是如何處理 Pop 中從隊列拿出來的 item 的。
PopProcessFunc 是一個類型:
type PopProcessFunc func(interface{}) error
所以這里只是一個類型轉換,我們關注c.config.Process
就行:
tools/cache/controller.go:367
Process: func(obj interface{}) error { for _, d := range obj.(Deltas) { switch d.Type { // 更新、添加、同步、刪除等操作 case Sync, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err } h.OnUpdate(old, d.Object) } else { if err := clientState.Add(d.Object); err != nil { return err } h.OnAdd(d.Object) } case Deleted: if err := clientState.Delete(d.Object); err != nil { return err } h.OnDelete(d.Object) } } return nil },
這里涉及到2個點:
- clientState
- ResourceEventHandler (h)
我們一一來看
6. Add obj to Indexer (Thread safe store)
前面說到 clientState,這個變量的初始化是clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
NewIndexer 代碼如下:
tools/cache/store.go:239
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
tools/cache/index.go:27
type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
順帶看一下 NewThreadSafeStore()
tools/cache/thread_safe_store.go:298
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ items: map[string]interface{}{}, indexers: indexers, indices: indices, } }
然后關注一下 Process 中的err := clientState.Add(d.Object)
的 Add() 方法:
tools/cache/store.go:123
func (c *cache) Add(obj interface{}) error { // 計算key;一般是namespace/name key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } // Add c.cacheStorage.Add(key, obj) return nil }
cacheStorage 是一個 ThreadSafeStore 實例,這個 Add() 代碼如下:
tools/cache/thread_safe_store.go:68
func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() // 拿出 old obj oldObject := c.items[key] // 寫入 new obj c.items[key] = obj // 更新索引,有一堆邏輯 c.updateIndices(oldObject, obj, key) }
第四步和第五步的內容先分析到這里,后面關注 threadSafeMap 實現的時候再繼續深入。
7. sharedIndexInformer
第六步是 Dispatch Event Handler functions(Send Object to Custom Controller)
我們先看一個接口 SharedInformer:
tools/cache/shared_informer.go:43
type SharedInformer interface { AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string }
SharedInformer 有一個共享的 data cache,能夠分發 changes 通知到緩存,到通過 AddEventHandler 注冊了的 listerners. 當你接收到一個通知,緩存的內容能夠保證至少和通知中的一樣新。
再看一下 SharedIndexInformer 接口:
tools/cache/shared_informer.go:66
type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer }
相比 SharedInformer 增加了一個 Indexer. 然后看具體的實現 sharedIndexInformer 吧:
tools/cache/shared_informer.go:127
type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector listerWatcher ListerWatcher objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex }
這個類型內包了很多我們前面看到過的對象,indexer、controller、listeratcher 都不陌生,我們看這里的 processor 是做什么的:
7.1. sharedProcessor
類型定義如下:
tools/cache/shared_informer.go:375
type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group }
這里的重點明顯是 listeners 屬性了,我們繼續看 listeners 的類型中的 processorListener:
7.1.1. processorListener
tools/cache/shared_informer.go:466
type processorListener struct { nextCh chan interface{} addCh chan interface{} handler ResourceEventHandler // 一個 ring buffer,保存未分發的通知 pendingNotifications buffer.RingGrowing // …… }
processorListener 主要有2個方法:
- run()
- pop()
7.1.2. processorListener.run()
先看一下這個 run 做了什么:
tools/cache/shared_informer.go:540
func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { // 一分鍾執行一次這個 func() // 一分鍾內的又有幾次重試 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { // 等待信號 nextCh for next := range p.nextCh { // notification 是 next 的實際類型 switch notification := next.(type) { // update case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) // add case addNotification: p.handler.OnAdd(notification.newObj) // delete case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } return true, nil }) if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }
這個 run 過程不復雜,等待信號然后調用 handler 的增刪改方法做對應的處理邏輯。case 里的 Notification 再看一眼:
tools/cache/shared_informer.go:176
type updateNotification struct { oldObj interface{} newObj interface{} } type addNotification struct { newObj interface{} } type deleteNotification struct { oldObj interface{} }
另外注意到for next := range p.nextCh
是下面的 case 執行的前提,也就是說觸發點是 p.nextCh,我們接着看 pop 過程(這里的邏輯不簡單,可能得多花點精力)
7.1.3. processorListener.pop()
tools/cache/shared_informer.go:510
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop // 這個 chan 是沒有初始化的 var nextCh chan<- interface{} // 可以接收任意類型,其實是對應前面提到的 addNotification 等 var notification interface{} // for 循環套 select 是比較常規的寫法 for { select { //第一遍執行到這里的時候由於 nexth 沒有初始化,所以這里會阻塞(和notification有沒有值沒有關系,notification哪怕是nil也可以寫入 chan interface{} 類型的 channel) case nextCh <- notification: var ok bool // 第二次循環,下面一個case運行過之后才有這里的邏輯 notification, ok = p.pendingNotifications.ReadOne() if !ok { // 將 channel 指向 nil 相當於初始化的逆操作,會使得這個 case 條件阻塞 nextCh = nil } // 這里是 for 首次執行邏輯的入口 case notificationToAdd, ok := <-p.addCh: if !ok { return } // 如果是 nil,也就是第一個通知過來的時候,這時不需要用到緩存(和下面else相對) if notification == nil { // 賦值給 notification,這樣上面一個 case 在接下來的一輪循化中就可以讀到了 notification = notificationToAdd // 相當於復制引用,nextCh 就指向了 p.nextCh,使得上面 case 寫 channel 的時候本質上操作了 p.nextCh,從而 run 能夠讀到 p.nextCh 中的信號 nextCh = p.nextCh } else { // 處理到這里的時候,其實第一個 case 已經有了首個 notification,這里的邏輯是一下子來了太多 notification 就往 pendingNotifications 緩存,在第一個 case 中 有對應的 ReadOne()操作 p.pendingNotifications.WriteOne(notificationToAdd) } } } }
這里的 pop 邏輯的入口是<-p.addCh
,我們繼續向上找一下這個 addCh 的來源:
7.1.4. processorListener.add()
tools/cache/shared_informer.go:506
func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
這個 add() 方法又在哪里被調用呢?
7.1.5. sharedProcessor.distribute()
tools/cache/shared_informer.go:400
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } }
這個方法邏輯比較簡潔,分發對象。我們繼續看哪里進入的 distribute:
7.2. sharedIndexInformer.HandleDeltas()
tools/cache/shared_informer.go:344
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { // 根據 DeltaType 選擇 case case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { // indexer 更新的是本地 store if err := s.indexer.Update(d.Object); err != nil { return err } // 前面分析的 distribute;update s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } // 前面分析的 distribute;add s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } // 前面分析的 distribute;delete s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
繼續往前看代碼邏輯。
7.3. sharedIndexInformer.Run()
tools/cache/shared_informer.go:189
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // new DeltaFIFO fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ // DeltaFIFO Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, // 前面分析的 HandleDeltas() Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() // 創建 Informer s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // 關注一下 s.processor.run wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true }() // Run informer s.controller.Run(stopCh) }
看到這里已經挺和諧了,在 sharedIndexInformer 的 Run() 方法中先是創建一個 DeltaFIFO,然后和 lw 一起初始化 cfg,利用 cfg 創建 controller,最后 Run 這個 controller,也就是最基礎的 informer.
在這段代碼里我們還注意到有一步是s.processor.run
,我們看一下這個 run 的邏輯。
7.3.1. sharedProcessor.run()
tools/cache/shared_informer.go:415
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { // 前面詳細講過 listener.run p.wg.Start(listener.run) // 前面詳細講過 listener.pop p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh // …… }
撇開細節,可以看到這里調用了內部所有 listener 的 run() 和 pop() 方法,和前面的分析呼應上了。
到這里,我們基本講完了自定義 controller 的時候 client-go 里相關的邏輯,也就是圖中的上半部分: