k8s controller組件分析


image

上圖可見Controller 由五個部分組成,分別為Informer、Callback、workqueue、worker、以及Clients,其中Informer分為三部分分別是:Reflector、DeltaFIFO、LocalStore。

1. Reflector

Reflector 的核心作用有兩個:一是獲取到所有Reflector中定義的資源然后 sync 到DeltaFIFO中,二是對資源進行監控,當監控的資源發生變化時,sync給后面的DeltaFIFO。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {

   options := metav1.ListOptions{ResourceVersion: "0"}
   list, err := r.listerWatcher.List(options)
   listMetaInterface, err := meta.ListAccessor(list)
   resourceVersion = listMetaInterface.GetResourceVersion()
   items, err := meta.ExtractList(list)
   ...

上述代碼中可見:默認會從資源的ResourceVersion為0處開始list,將大於0的版本都sync到DeltaFIFO,也就是說第一次sync是一次全量同步,然后獲取最新的ResourceVersion,用於后面的watch操作。每當 controller 重啟時,都會進行一次全量的同步,對性能會有影響,在 k8s 1.17 GA版本增加了 watch bookmark 功能,可以指定 ResourceVersion 進行list watch。

	for {

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			TimeoutSeconds: &timeoutSeconds,
		}

		r.metrics.numberOfWatches.Inc()
		w, err := r.listerWatcher.Watch(options)
		。。。

		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
			}
			return nil
		}
	}

獲取到的最新resourceVersion會傳遞給options,Reflector會從這個最新版本的resourceVersion開始watch,當有資源變化時會調用watchHandler函數來sync到DeltaFIFO中。

2. DeltaFIFO

DeltaFIFO是用來記錄對資源的處理動作,如Added,Updated,Deleted,Sync,在資源進入到DeltaFIFO時,會先將其包裝成Delta對象,Delta其中包含兩個字段,一個是對資源的處理動作,一個是資源對象本身,Delta類型可以理解為記錄着要對哪個對象進行怎樣的操作。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	_, exists := f.items[id]
	if len(newDeltas) > 0 {
		if !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else if exists {
		delete(f.items, id)
	}
	return nil
}

代碼中actionType表示對該資源的處理動作,f.queue中存放的是不同資源的key,f.items的結構是map[string]Deltas 就是存放着對同一個對象的多個操作。代碼中的id(f.queue中的元素)默認是該資源的 " /name", id可以自定義算法來生成key。其中dedupDeltas方法是對多個Delta進行合並,只支持對兩個連續的delete操作進行合並。DeltaFIFO中重點的方法是如下的pop方法:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			if f.IsClosed() {
				return nil, FIFOClosedError
			}
			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		item, ok := f.items[id]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		if !ok {
			continue
		}
		delete(f.items, id)
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		return item, err
	}

如果隊列中沒有元素,也就是沒有元素可以彈出,那就會阻塞在這里,當有新元素添加到隊列里時會下發一個通知給這個阻塞的操作,將阻塞解除讓程序繼續進行。f.queue中的元素會一個一個的彈出,根據隊列中彈出的元素(f.items中的key)取出對應 f.items中的Delta數組,傳入的PopProcessFunc函數會對數組中的內容進行逐個處理。典型的生產者消費者模式。下面的HandleDeltas函數會被傳遞給pop函數用來處理 f.items中的元素

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

代碼中可見,HandleDeltas方法會根據不同的處理動作包裝成不同的Notification,然后將notification加入到processorListener的addCh中

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
				}
			}
			return true, nil
		})

		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

最后會調用processorListener的run方法,將各種notification添加到各個自定義的Callback進行處理。run方法中是從p.nextCh(提示:注意區分nextCh和p.nextCh)中取出信息,上面只是將信息存入到p.addCh,二者的關聯是什么呢?下面介紹的就是processorListener的pop方法,通過這個方法將addCh和nextCh關聯起來,這個方法也充分體現了go語言“應該以通信作為手段來共享內存”的特性。

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

當addCh中有數據時,會取出來賦值給notificationToAdd,然后判斷notification是否為空,如果為空,那么就把notificationToAdd賦值給notification,將nextCh賦值為p.nextCh一方面激活了netxCh,另一方面也間接的將信息存入到p.nextCh,這時就可以通過上面的add函數傳遞給各個Callbacks。notification不為空意味着這個notification還沒有被處理,但是p.addCh是一個雙向無緩存chan,如果不取出信息p.addCh會被阻塞,因此就引入了一個緩存(p.pendingNotifications)先把信息寫入到緩存,然后從緩存中去信息。

注意: DeltaFIFO和上面Reflector的sync是不同的,DeltaFIFO中的sync是同步到LocalStore,Reflector中的sync是同步到DeltaFIFO

3. LocalStore

type cache struct {
	// cacheStorage bears the burden of thread 
	cacheStorage ThreadSafeStore
	keyFunc KeyFunc
}
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

LocalStore是一個緩存,主要作用緩解apiserver 的壓力。緩存中存放的是全量信息,第一次會進行全量的sync,之后發生變化都是通過watch來sync到Delta,最后同步到LocalStore。LocalStore的具體實現是cache,cache聲明了兩個字段cacheStorage和keyFunc。cacheStorage是存放信息的,keyFunc是用來根據傳入的對象生成一個key的函數,它的主要作用是用來檢索,默認是生成的key是“namespace/name”,用戶可以自定義算法來生成key。

4. Workqueue

Workqueue 就是一個隊列,通過這個隊列,可以異步處理請求,從而緩解系統的壓力,client-go提供的隊列有延時隊列,限速隊列,可自己選擇使用。

5. Callback and Worker

Callback是需要自己定義的,當資源發生變化時,通過callback將上面KeyFunc生成的key放入到workqueue中,這時worker會從workqueue中取出key,通過Lister獲取到當前的資源情況然后和目標資源情況進行比對,如果不匹配會通過clients向apiserver發送請求來調整。為什么沒有在handler中直接處理而是先放入隊列再異步處理呢?最主要的原因是處理的過程復雜耗時,避免阻塞影響性能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM