
上圖可見Controller 由五個部分組成,分別為Informer、Callback、workqueue、worker、以及Clients,其中Informer分為三部分分別是:Reflector、DeltaFIFO、LocalStore。
1. Reflector
Reflector 的核心作用有兩個:一是獲取到所有Reflector中定義的資源然后 sync 到DeltaFIFO中,二是對資源進行監控,當監控的資源發生變化時,sync給后面的DeltaFIFO。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
options := metav1.ListOptions{ResourceVersion: "0"}
list, err := r.listerWatcher.List(options)
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
...
上述代碼中可見:默認會從資源的ResourceVersion為0處開始list,將大於0的版本都sync到DeltaFIFO,也就是說第一次sync是一次全量同步,然后獲取最新的ResourceVersion,用於后面的watch操作。每當 controller 重啟時,都會進行一次全量的同步,對性能會有影響,在 k8s 1.17 GA版本增加了 watch bookmark 功能,可以指定 ResourceVersion 進行list watch。
for {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
r.metrics.numberOfWatches.Inc()
w, err := r.listerWatcher.Watch(options)
。。。
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
獲取到的最新resourceVersion會傳遞給options,Reflector會從這個最新版本的resourceVersion開始watch,當有資源變化時會調用watchHandler函數來sync到DeltaFIFO中。
2. DeltaFIFO
DeltaFIFO是用來記錄對資源的處理動作,如Added,Updated,Deleted,Sync,在資源進入到DeltaFIFO時,會先將其包裝成Delta對象,Delta其中包含兩個字段,一個是對資源的處理動作,一個是資源對象本身,Delta類型可以理解為記錄着要對哪個對象進行怎樣的操作。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
_, exists := f.items[id]
if len(newDeltas) > 0 {
if !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
delete(f.items, id)
}
return nil
}
代碼中actionType表示對該資源的處理動作,f.queue中存放的是不同資源的key,f.items的結構是map[string]Deltas 就是存放着對同一個對象的多個操作。代碼中的id(f.queue中的元素)默認是該資源的 "
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok {
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
如果隊列中沒有元素,也就是沒有元素可以彈出,那就會阻塞在這里,當有新元素添加到隊列里時會下發一個通知給這個阻塞的操作,將阻塞解除讓程序繼續進行。f.queue中的元素會一個一個的彈出,根據隊列中彈出的元素(f.items中的key)取出對應 f.items中的Delta數組,傳入的PopProcessFunc函數會對數組中的內容進行逐個處理。典型的生產者消費者模式。下面的HandleDeltas函數會被傳遞給pop函數用來處理 f.items中的元素
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
代碼中可見,HandleDeltas方法會根據不同的處理動作包裝成不同的Notification,然后將notification加入到processorListener的addCh中
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
return true, nil
})
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
最后會調用processorListener的run方法,將各種notification添加到各個自定義的Callback進行處理。run方法中是從p.nextCh(提示:注意區分nextCh和p.nextCh)中取出信息,上面只是將信息存入到p.addCh,二者的關聯是什么呢?下面介紹的就是processorListener的pop方法,通過這個方法將addCh和nextCh關聯起來,這個方法也充分體現了go語言“應該以通信作為手段來共享內存”的特性。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
當addCh中有數據時,會取出來賦值給notificationToAdd,然后判斷notification是否為空,如果為空,那么就把notificationToAdd賦值給notification,將nextCh賦值為p.nextCh一方面激活了netxCh,另一方面也間接的將信息存入到p.nextCh,這時就可以通過上面的add函數傳遞給各個Callbacks。notification不為空意味着這個notification還沒有被處理,但是p.addCh是一個雙向無緩存chan,如果不取出信息p.addCh會被阻塞,因此就引入了一個緩存(p.pendingNotifications)先把信息寫入到緩存,然后從緩存中去信息。
注意: DeltaFIFO和上面Reflector的sync是不同的,DeltaFIFO中的sync是同步到LocalStore,Reflector中的sync是同步到DeltaFIFO
3. LocalStore
type cache struct {
// cacheStorage bears the burden of thread
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
LocalStore是一個緩存,主要作用緩解apiserver 的壓力。緩存中存放的是全量信息,第一次會進行全量的sync,之后發生變化都是通過watch來sync到Delta,最后同步到LocalStore。LocalStore的具體實現是cache,cache聲明了兩個字段cacheStorage和keyFunc。cacheStorage是存放信息的,keyFunc是用來根據傳入的對象生成一個key的函數,它的主要作用是用來檢索,默認是生成的key是“namespace/name”,用戶可以自定義算法來生成key。
4. Workqueue
Workqueue 就是一個隊列,通過這個隊列,可以異步處理請求,從而緩解系統的壓力,client-go提供的隊列有延時隊列,限速隊列,可自己選擇使用。
5. Callback and Worker
Callback是需要自己定義的,當資源發生變化時,通過callback將上面KeyFunc生成的key放入到workqueue中,這時worker會從workqueue中取出key,通過Lister獲取到當前的資源情況然后和目標資源情況進行比對,如果不匹配會通過clients向apiserver發送請求來調整。為什么沒有在handler中直接處理而是先放入隊列再異步處理呢?最主要的原因是處理的過程復雜耗時,避免阻塞影響性能。
