上图可见Controller 由五个部分组成,分别为Informer、Callback、workqueue、worker、以及Clients,其中Informer分为三部分分别是:Reflector、DeltaFIFO、LocalStore。
1. Reflector
Reflector 的核心作用有两个:一是获取到所有Reflector中定义的资源然后 sync 到DeltaFIFO中,二是对资源进行监控,当监控的资源发生变化时,sync给后面的DeltaFIFO。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
options := metav1.ListOptions{ResourceVersion: "0"}
list, err := r.listerWatcher.List(options)
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
...
上述代码中可见:默认会从资源的ResourceVersion为0处开始list,将大于0的版本都sync到DeltaFIFO,也就是说第一次sync是一次全量同步,然后获取最新的ResourceVersion,用于后面的watch操作。每当 controller 重启时,都会进行一次全量的同步,对性能会有影响,在 k8s 1.17 GA版本增加了 watch bookmark 功能,可以指定 ResourceVersion 进行list watch。
for {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
r.metrics.numberOfWatches.Inc()
w, err := r.listerWatcher.Watch(options)
。。。
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
获取到的最新resourceVersion会传递给options,Reflector会从这个最新版本的resourceVersion开始watch,当有资源变化时会调用watchHandler函数来sync到DeltaFIFO中。
2. DeltaFIFO
DeltaFIFO是用来记录对资源的处理动作,如Added,Updated,Deleted,Sync,在资源进入到DeltaFIFO时,会先将其包装成Delta对象,Delta其中包含两个字段,一个是对资源的处理动作,一个是资源对象本身,Delta类型可以理解为记录着要对哪个对象进行怎样的操作。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
_, exists := f.items[id]
if len(newDeltas) > 0 {
if !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
delete(f.items, id)
}
return nil
}
代码中actionType表示对该资源的处理动作,f.queue中存放的是不同资源的key,f.items的结构是map[string]Deltas 就是存放着对同一个对象的多个操作。代码中的id(f.queue中的元素)默认是该资源的 "
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok {
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
如果队列中没有元素,也就是没有元素可以弹出,那就会阻塞在这里,当有新元素添加到队列里时会下发一个通知给这个阻塞的操作,将阻塞解除让程序继续进行。f.queue中的元素会一个一个的弹出,根据队列中弹出的元素(f.items中的key)取出对应 f.items中的Delta数组,传入的PopProcessFunc函数会对数组中的内容进行逐个处理。典型的生产者消费者模式。下面的HandleDeltas函数会被传递给pop函数用来处理 f.items中的元素
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
代码中可见,HandleDeltas方法会根据不同的处理动作包装成不同的Notification,然后将notification加入到processorListener的addCh中
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
return true, nil
})
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
最后会调用processorListener的run方法,将各种notification添加到各个自定义的Callback进行处理。run方法中是从p.nextCh(提示:注意区分nextCh和p.nextCh)中取出信息,上面只是将信息存入到p.addCh,二者的关联是什么呢?下面介绍的就是processorListener的pop方法,通过这个方法将addCh和nextCh关联起来,这个方法也充分体现了go语言“应该以通信作为手段来共享内存”的特性。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
当addCh中有数据时,会取出来赋值给notificationToAdd,然后判断notification是否为空,如果为空,那么就把notificationToAdd赋值给notification,将nextCh赋值为p.nextCh一方面激活了netxCh,另一方面也间接的将信息存入到p.nextCh,这时就可以通过上面的add函数传递给各个Callbacks。notification不为空意味着这个notification还没有被处理,但是p.addCh是一个双向无缓存chan,如果不取出信息p.addCh会被阻塞,因此就引入了一个缓存(p.pendingNotifications)先把信息写入到缓存,然后从缓存中去信息。
注意: DeltaFIFO和上面Reflector的sync是不同的,DeltaFIFO中的sync是同步到LocalStore,Reflector中的sync是同步到DeltaFIFO
3. LocalStore
type cache struct {
// cacheStorage bears the burden of thread
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
LocalStore是一个缓存,主要作用缓解apiserver 的压力。缓存中存放的是全量信息,第一次会进行全量的sync,之后发生变化都是通过watch来sync到Delta,最后同步到LocalStore。LocalStore的具体实现是cache,cache声明了两个字段cacheStorage和keyFunc。cacheStorage是存放信息的,keyFunc是用来根据传入的对象生成一个key的函数,它的主要作用是用来检索,默认是生成的key是“namespace/name”,用户可以自定义算法来生成key。
4. Workqueue
Workqueue 就是一个队列,通过这个队列,可以异步处理请求,从而缓解系统的压力,client-go提供的队列有延时队列,限速队列,可自己选择使用。
5. Callback and Worker
Callback是需要自己定义的,当资源发生变化时,通过callback将上面KeyFunc生成的key放入到workqueue中,这时worker会从workqueue中取出key,通过Lister获取到当前的资源情况然后和目标资源情况进行比对,如果不匹配会通过clients向apiserver发送请求来调整。为什么没有在handler中直接处理而是先放入队列再异步处理呢?最主要的原因是处理的过程复杂耗时,避免阻塞影响性能。