Kubernetes client-go 源碼分析 - Reflector


概述入口 - Reflector.Run()核心 - Reflector.ListAndWatch()Reflector.watchHandler()NewReflector()小結

概述

源碼版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

回顧一下 Reflector 在整個自定義控制器工作流中的位置:

《Kubernetes client-go 源碼分析 - 開篇》中我們提到過 Reflector 的任務就是向 apiserver watch 特定類型的資源,拿到變更通知后將其丟到 DeltaFIFO 隊列中。另外前面已經在 《Kubernetes client-go 源碼分析 - ListWatcher》中分析過 ListWatcher 是如何從 apiserver 中 list-watch 資源的,今天我們繼續來看 Reflector 的實現。

入口 - Reflector.Run()

Reflector 的啟動入口是 Run() 方法:

  • client-go/tools/cache/reflector.go:218
1func (r *Reflector) Run(stopCh <-chan struct{}) {
2   klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
3   wait.BackoffUntil(func() {
4      if err := r.ListAndWatch(stopCh); err != nil {
5         r.watchErrorHandler(r, err)
6      }
7   }, r.backoffManager, true, stopCh)
8   klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
9}

這里有一些健壯性機制,用於處理 apiserver 短暫失聯的場景。我們直接來看主要邏輯先,也就是 Reflector.ListAndWatch() 方法的內容。

核心 - Reflector.ListAndWatch()

Reflector.ListAndWatch() 方法有將近 200 行,是 Reflector 的核心邏輯之一。ListAndWatch() 方法做的事情是先 list 特定資源的所有對象,然后獲取其資源版本,接着使用這個資源版本來開始 watch 流程。watch 到新版本資源然后將其加入 DeltaFIFO 的動作是在 watchHandler() 方法中具體實現的,后面一節會單獨分析。在此之前 list 到的最新 items 會通過 syncWith() 方法添加一個 Sync 類型的 DeltaType 到 DeltaFIFO 中,所以 list 操作本身也會觸發后面的調諧邏輯運行。具體來看:

  • client-go/tools/cache/reflector.go:254
 1func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
2   klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
3   var resourceVersion string
4
5   // 當 r.lastSyncResourceVersion 為 "" 時這里為 "0",當使用 r.lastSyncResourceVersion 失敗時這里為 ""
6   // 區別是 "" 會直接請求到 etcd,獲取一個最新的版本,而 "0" 訪問的是 cache
7   options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
8
9   if err := func() error {
10      // trace 是用於記錄操作耗時的,這里的邏輯是超過 10s 的步驟打印出來
11      initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
12      defer initTrace.LogIfLong(10 * time.Second)
13      var list runtime.Object
14      var paginatedResult bool
15      var err error
16      listCh := make(chan struct{}, 1)
17      panicCh := make(chan interface{}, 1)
18      go func() { // 內嵌一個函數,這里會直接調用
19         defer func() {
20            if r := recover(); r != nil { // 收集這個 goroutine panic 的時候將奔潰信息
21               panicCh <- r
22            }
23         }()
24         // 開始嘗試收集 list 的 chunks,我們在 《Kubernetes List-Watch 機制原理與實現 - chunked》中介紹過相關邏輯
25         pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
26            return r.listerWatcher.List(opts)
27         }))
28         switch {
29         case r.WatchListPageSize != 0:
30            pager.PageSize = r.WatchListPageSize
31         case r.paginatedResult:
32         case options.ResourceVersion != "" && options.ResourceVersion != "0":
33            pager.PageSize = 0
34         }
35
36         list, paginatedResult, err = pager.List(context.Background(), options)
37         if isExpiredError(err) || isTooLargeResourceVersionError(err) {
38            // 設置這個屬性后,下一次 list 會從 etcd 里取
39            r.setIsLastSyncResourceVersionUnavailable(true)
40            list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
41         }
42         close(listCh)
43      }()
44      select {
45      case <-stopCh:
46         return nil
47      case r := <-panicCh:
48         panic(r)
49      case <-listCh:
50      }
51      if err != nil {
52         return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
53      }
54
55      if options.ResourceVersion == "0" && paginatedResult {
56         r.paginatedResult = true
57      }
58
59      // list 成功
60      r.setIsLastSyncResourceVersionUnavailable(false)
61      initTrace.Step("Objects listed")
62      listMetaInterface, err := meta.ListAccessor(list)
63      if err != nil {
64         return fmt.Errorf("unable to understand list result %#v: %v", list, err)
65      }
66      resourceVersion = listMetaInterface.GetResourceVersion()
67      initTrace.Step("Resource version extracted")
68      items, err := meta.ExtractList(list)
69      if err != nil {
70         return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
71      }
72      initTrace.Step("Objects extracted")
73      // 將 list 到的 items 添加到 store 里,這里是 store 也就是 DeltaFIFO,也就是添加一個 Sync DeltaType 這里的 resourveVersion 並沒有用到
74      if err := r.syncWith(items, resourceVersion); err != nil {
75         return fmt.Errorf("unable to sync list result: %v", err)
76      }
77      initTrace.Step("SyncWith done")
78      r.setLastSyncResourceVersion(resourceVersion)
79      initTrace.Step("Resource version updated")
80      return nil
81   }(); err != nil {
82      return err
83   }
84
85   resyncerrc := make(chan error, 1)
86   cancelCh := make(chan struct{})
87   defer close(cancelCh)
88   go func() {
89      resyncCh, cleanup := r.resyncChan()
90      defer func() {
91         cleanup()
92      }()
93      for {
94         select {
95         case <-resyncCh:
96         case <-stopCh:
97            return
98         case <-cancelCh:
99            return
100         }
101         if r.ShouldResync == nil || r.ShouldResync() {
102            klog.V(4).Infof("%s: forcing resync", r.name)
103            if err := r.store.Resync(); err != nil {
104               resyncerrc <- err
105               return
106            }
107         }
108         cleanup()
109         resyncCh, cleanup = r.resyncChan()
110      }
111   }()
112
113   for {
114      select {
115      case <-stopCh:
116         return nil
117      default:
118      }
119      // 超時時間是 5-10分鍾
120      timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
121      options = metav1.ListOptions{
122         ResourceVersion: resourceVersion,
123         // 如果超時沒有接收到任何 Event,這時候需要停止 watch,避免一直掛着
124         TimeoutSeconds: &timeoutSeconds,
125         // 用於降低 apiserver 壓力,bookmark 類型響應的對象主要只有 RV 信息
126         AllowWatchBookmarks: true,
127      }
128
129      start := r.clock.Now()
130      // 調用 watch
131      w, err := r.listerWatcher.Watch(options)
132      if err != nil {
133         // 這時候直接 re-list 已經沒有用了,apiserver 暫時拒絕服務
134         if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
135            <-r.initConnBackoffManager.Backoff().C()
136            continue
137         }
138         return err
139      }
140      // 核心邏輯之一,后面單獨會講到
141      if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
142         if err != errorStopRequested {
143            switch {
144            case isExpiredError(err):
145               klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
146            case apierrors.IsTooManyRequests(err):
147               klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
148               <-r.initConnBackoffManager.Backoff().C()
149               continue
150            default:
151               klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
152            }
153         }
154         return nil
155      }
156   }
157}

Reflector.watchHandler()

watchHandler() 方法中完成了將 watch 到的 Event 根據其 EventType 分別調用 DeltaFIFOAdd()/Update/Delete() 等方法完成對象追加到 DeltaFIFO 隊列的過程。watchHandler() 方法的調用在一個 for 循環中,所以一次 watchHandler() 工作流程完成后,函數退出,新一輪的調用會傳遞進來新的 watch.InterfaceresourceVersion 等,我們具體來看。

  • client-go/tools/cache/reflector.go:459
 1func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
2   eventCount := 0
3
4   // 當前函數返回時需要關閉 watch.Interface,因為新一輪的調用會傳遞新的 watch.Interface 進來
5   defer w.Stop()
6
7loop:
8   for {
9      select {
10      case <-stopCh:
11         return errorStopRequested
12      case err := <-errc:
13         return err
14        // 接收 event
15      case event, ok := <-w.ResultChan():
16         if !ok {
17            break loop
18         }
19         // 如果是 "ERROR"
20         if event.Type == watch.Error {
21            return apierrors.FromObject(event.Object)
22         }
23         // 創建 Reflector 的時候會指定一個 expectedType
24         if r.expectedType != nil {
25            // 類型不匹配
26            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
27               utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
28               continue
29            }
30         }
31         // 沒有對應 Golang 結構體的對象可以通過這種方式來指定期望類型
32         if r.expectedGVK != nil {
33            if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
34               utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
35               continue
36            }
37         }
38         meta, err := meta.Accessor(event.Object)
39         if err != nil {
40            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
41            continue
42         }
43         // 新的 ResourceVersion
44         newResourceVersion := meta.GetResourceVersion()
45         switch event.Type {
46         // 調用 DeltaFIFO 的 Add/Update/Delete 等方法完成不同類型 Event 等處理,我們在《Kubernetes client-go 源碼分析 - DeltaFIFO》詳細介紹過 DeltaFIFO 對應的 Add/Update/Delete 是如何實現的
47         case watch.Added:
48            err := r.store.Add(event.Object)
49            if err != nil {
50               utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
51            }
52         case watch.Modified:
53            err := r.store.Update(event.Object)
54            if err != nil {
55               utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
56            }
57         case watch.Deleted:
58            err := r.store.Delete(event.Object)
59            if err != nil {
60               utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
61            }
62         case watch.Bookmark:
63         default:
64            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
65         }
66         // 更新 resourceVersion
67         *resourceVersion = newResourceVersion
68         r.setLastSyncResourceVersion(newResourceVersion)
69         if rvu, ok := r.store.(ResourceVersionUpdater); ok {
70            rvu.UpdateResourceVersion(newResourceVersion)
71         }
72         eventCount++
73      }
74   }
75   // 耗時
76   watchDuration := r.clock.Since(start)
77   // 1s 就結束了,而且沒有收到 event,屬於異常情況
78   if watchDuration < 1*time.Second && eventCount == 0 {
79      return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
80   }
81   klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
82   return nil
83}

NewReflector()

繼續來看下 Reflector 的初始化。NewReflector() 的參數里有一個 ListerWatcher 類型的 lw,還有有一個 expectedType 和 store,lw 就是我們在《Kubernetes client-go 源碼分析 - ListWatcher》中介紹的那個 ListerWatcher,expectedType指定期望關注的類型,而 store 是一個 DeltaFIFO,我們在《Kubernetes client-go 源碼分析 - DeltaFIFO》中也有詳細的介紹過。加在一起大致可以預想到 Reflector 通過 ListWatcher 提供的能力去 list-watch apiserver,然后將 Event 加到 DeltaFIFO 中。

  • client-go/tools/cache/reflector.go:166
 1func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
2   // 直接調用下面的 NewNamedReflector
3   return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
4}
5
6func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
7   realClock := &clock.RealClock{}
8   r := &Reflector{
9      name:          name,
10      listerWatcher: lw,
11      store:         store,
12      // 重試機制,這里可以有效降低 apiserver 的負載,也就是重試間隔會越來越長
13      backoffManager:         wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.01.0, realClock),
14      initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.01.0, realClock),
15      resyncPeriod:           resyncPeriod,
16      clock:                  realClock,
17      watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),
18   }
19   r.setExpectedType(expectedType)
20   return r
21}

小結

如文章開頭的圖中所示,Reflector 的職責很清晰,要做的事情是保持 DeltaFIFO 中的 items 持續更新,具體實現是通過 ListWatcher 提供的 list-watch 能力來 list 指定類型的資源,這時候會產生一系列 Sync 事件,然后通過 list 到的 ResourceVersion 來開啟 watch 過程,而 watch 到新的事件后,會和前面提到的 Sync 事件一樣,都通過 DeltaFIFO 提供的方法構造相應的 DeltaType 添加到 DeltaFIFO 中。當然前面提到的更新也並不是直接修改 DeltaFIFO 中已經存在的 items,而是添加一個新的 DeltaType 到隊列中。另外 DeltaFIFO 中添加新 DeltaType 的時候也會有一定的去重機制,我們以前在 ListWatcher 和 DeltaFIFO 中分別介紹過這兩個組件的工作邏輯,有了這個基礎后再看 Reflector 的工作流就相對輕松很多了。這里還有一個細節就是 watch 過程不是一勞永逸的,watch 到新的 event 后,會拿着對象的新 ResourceVersion 重新開啟一輪新的 watch 過程。當然這里的 watch 調用也有超時機制,一系列的健壯性措施,所以我們脫離 Reflector(Informer) 直接使用 list-watch 還是很難手撕一套健壯的代碼出來。

(轉載請保留本文原始鏈接 https://www.danielhu.cn)

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM