k8s controller组件分析


image

上图可见Controller 由五个部分组成,分别为Informer、Callback、workqueue、worker、以及Clients,其中Informer分为三部分分别是:Reflector、DeltaFIFO、LocalStore。

1. Reflector

Reflector 的核心作用有两个:一是获取到所有Reflector中定义的资源然后 sync 到DeltaFIFO中,二是对资源进行监控,当监控的资源发生变化时,sync给后面的DeltaFIFO。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {

   options := metav1.ListOptions{ResourceVersion: "0"}
   list, err := r.listerWatcher.List(options)
   listMetaInterface, err := meta.ListAccessor(list)
   resourceVersion = listMetaInterface.GetResourceVersion()
   items, err := meta.ExtractList(list)
   ...

上述代码中可见:默认会从资源的ResourceVersion为0处开始list,将大于0的版本都sync到DeltaFIFO,也就是说第一次sync是一次全量同步,然后获取最新的ResourceVersion,用于后面的watch操作。每当 controller 重启时,都会进行一次全量的同步,对性能会有影响,在 k8s 1.17 GA版本增加了 watch bookmark 功能,可以指定 ResourceVersion 进行list watch。

	for {

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			TimeoutSeconds: &timeoutSeconds,
		}

		r.metrics.numberOfWatches.Inc()
		w, err := r.listerWatcher.Watch(options)
		。。。

		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
			}
			return nil
		}
	}

获取到的最新resourceVersion会传递给options,Reflector会从这个最新版本的resourceVersion开始watch,当有资源变化时会调用watchHandler函数来sync到DeltaFIFO中。

2. DeltaFIFO

DeltaFIFO是用来记录对资源的处理动作,如Added,Updated,Deleted,Sync,在资源进入到DeltaFIFO时,会先将其包装成Delta对象,Delta其中包含两个字段,一个是对资源的处理动作,一个是资源对象本身,Delta类型可以理解为记录着要对哪个对象进行怎样的操作。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	_, exists := f.items[id]
	if len(newDeltas) > 0 {
		if !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else if exists {
		delete(f.items, id)
	}
	return nil
}

代码中actionType表示对该资源的处理动作,f.queue中存放的是不同资源的key,f.items的结构是map[string]Deltas 就是存放着对同一个对象的多个操作。代码中的id(f.queue中的元素)默认是该资源的 " /name", id可以自定义算法来生成key。其中dedupDeltas方法是对多个Delta进行合并,只支持对两个连续的delete操作进行合并。DeltaFIFO中重点的方法是如下的pop方法:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			if f.IsClosed() {
				return nil, FIFOClosedError
			}
			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		item, ok := f.items[id]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		if !ok {
			continue
		}
		delete(f.items, id)
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		return item, err
	}

如果队列中没有元素,也就是没有元素可以弹出,那就会阻塞在这里,当有新元素添加到队列里时会下发一个通知给这个阻塞的操作,将阻塞解除让程序继续进行。f.queue中的元素会一个一个的弹出,根据队列中弹出的元素(f.items中的key)取出对应 f.items中的Delta数组,传入的PopProcessFunc函数会对数组中的内容进行逐个处理。典型的生产者消费者模式。下面的HandleDeltas函数会被传递给pop函数用来处理 f.items中的元素

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

代码中可见,HandleDeltas方法会根据不同的处理动作包装成不同的Notification,然后将notification加入到processorListener的addCh中

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
				}
			}
			return true, nil
		})

		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

最后会调用processorListener的run方法,将各种notification添加到各个自定义的Callback进行处理。run方法中是从p.nextCh(提示:注意区分nextCh和p.nextCh)中取出信息,上面只是将信息存入到p.addCh,二者的关联是什么呢?下面介绍的就是processorListener的pop方法,通过这个方法将addCh和nextCh关联起来,这个方法也充分体现了go语言“应该以通信作为手段来共享内存”的特性。

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

当addCh中有数据时,会取出来赋值给notificationToAdd,然后判断notification是否为空,如果为空,那么就把notificationToAdd赋值给notification,将nextCh赋值为p.nextCh一方面激活了netxCh,另一方面也间接的将信息存入到p.nextCh,这时就可以通过上面的add函数传递给各个Callbacks。notification不为空意味着这个notification还没有被处理,但是p.addCh是一个双向无缓存chan,如果不取出信息p.addCh会被阻塞,因此就引入了一个缓存(p.pendingNotifications)先把信息写入到缓存,然后从缓存中去信息。

注意: DeltaFIFO和上面Reflector的sync是不同的,DeltaFIFO中的sync是同步到LocalStore,Reflector中的sync是同步到DeltaFIFO

3. LocalStore

type cache struct {
	// cacheStorage bears the burden of thread 
	cacheStorage ThreadSafeStore
	keyFunc KeyFunc
}
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

LocalStore是一个缓存,主要作用缓解apiserver 的压力。缓存中存放的是全量信息,第一次会进行全量的sync,之后发生变化都是通过watch来sync到Delta,最后同步到LocalStore。LocalStore的具体实现是cache,cache声明了两个字段cacheStorage和keyFunc。cacheStorage是存放信息的,keyFunc是用来根据传入的对象生成一个key的函数,它的主要作用是用来检索,默认是生成的key是“namespace/name”,用户可以自定义算法来生成key。

4. Workqueue

Workqueue 就是一个队列,通过这个队列,可以异步处理请求,从而缓解系统的压力,client-go提供的队列有延时队列,限速队列,可自己选择使用。

5. Callback and Worker

Callback是需要自己定义的,当资源发生变化时,通过callback将上面KeyFunc生成的key放入到workqueue中,这时worker会从workqueue中取出key,通过Lister获取到当前的资源情况然后和目标资源情况进行比对,如果不匹配会通过clients向apiserver发送请求来调整。为什么没有在handler中直接处理而是先放入队列再异步处理呢?最主要的原因是处理的过程复杂耗时,避免阻塞影响性能。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM