kubernetes client-go解析


注:本次使用的client-go版本為:client-go 11.0,主要參考CSDN上的深入淺出kubernetes之client-go系列,建議看本文前先參考該文檔。本文檔為CSDN文檔的深挖和補充。
本文中的visio圖可以從這里獲取

下圖為來自官方的Client-go架構圖

圖1.

下圖也可以作為參考

圖2.

Indexer

Indexer保存了來自apiServer的資源。使用listWatch方式來維護資源的增量變化。通過這種方式可以減小對apiServer的訪問,減輕apiServer端的壓力

Indexer的接口定義如下,它繼承了Store接口,Store中定義了對對象的增刪改查等方法。

// client-go/tools/cache/index.go
type Indexer interface { Store // Retrieve list of objects that match on the named indexing function Index(indexName string, obj interface{}) ([]interface{}, error) // IndexKeys returns the set of keys that match on the named indexing function. IndexKeys(indexName, indexKey string) ([]string, error) // ListIndexFuncValues returns the list of generated values of an Index func ListIndexFuncValues(indexName string) []string // ByIndex lists object that match on the named indexing function with the exact key ByIndex(indexName, indexKey string) ([]interface{}, error) // GetIndexer return the indexers GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error }
// client-go/tools/cache/store.go
type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error Resync() error }

cache實現了Indexer接口,但cache是包內私有的(首字母小寫),只能通過包內封裝的函數進行調用。

// client-go/tools/cache/store.go
type cache struct { // cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc }
// client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    Resync() error
}

可以通過NewStore和NewIndexer初始化cache來返回一個Store或Indexer指針(cache實現了Store和Indexer接口)。NewStore和NewIndexer返回的Store和Indexer接口的數據載體為threadSafeMap,threadSafeMap通過NewThreadSafeStore函數初始化。

注:運行go語言接口中的方法即運行該方法的實現。以threadSafeMap為例,在運行cache.Add函數中的“c.cacheStorage.Add(key, obj)”時,實際是在運行”(&threadSafeMap{items:map[string]interface{}{}, indexers: indexers, indices:  indices}).Add(key, obj)“

// client-go/tools/cache/store.go
func (c *cache) Add(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} }  c.cacheStorage.Add(key, obj) return nil }
// client-go/tools/cache/store.go
//
NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } // NewIndexer returns an Indexer implemented simply with a map and a lock. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }

client-go中的很多實現封裝都非常規范,index.go中給出了索引相關的操作(接口);store.go中給出了與操作存儲相關的接口,並提供了一個cache實現,當然也可以實現自行實現Store接口;thread_safe_store.go為cache的私有實現。

client-go的indexer實際操作的還是threadSafeMap中的方法和數據,調用關系如下:

可以通過下圖理解threadSafeMap中各種索引之間的關系

  • indexer實際的對象存儲在threadSafeMap結構中
  • indexers划分了不同的索引類型(indexName,如namespace),並按照索引類型進行索引(indexFunc,如MetaNamespaceIndexFunc),得出符合該對象的索引鍵(indexKeys,如namespaces),一個對象在一個索引類型中可能有多個索引鍵。
  • indices按照索引類型保存了索引(index,如包含所有namespaces下面的obj),進而可以按照索引鍵找出特定的對象鍵(keys,如某個namespace下面的對象鍵),indices用於快速查找對象
  • items按照對象鍵保存了實際的對象

以namespace作為索引類型為例來講,首先從indexers獲取計算namespace的indexFunc,然后使用該indexFunc計算出與入參對象相關的所有namespaces。indices中保存了所有namespaces下面的對象鍵,可以獲取特定namespace下面的所有對象鍵,在items中輸入特定的對象鍵就可以得出特定的對象。indexers用於找出與特定對象相關的資源,如找出某Pod相關的secrets。

默認的indexFunc如下,根據對象的namespace進行分類

// client-go/tools/cache/index.go
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil }

cache結構中的keyFunc用於生成objectKey,下面是默認的keyFunc。

//client-go/tools/cache/thread_safe_store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil } meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } if len(meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }

 

 DeltaFIFO

DeltaFIFO的源碼注釋寫的比較清楚,它是一個生產者-消費者隊列,生產者為Reflector,消費者為Pop()函數,從架構圖中可以看出DeltaFIFO的數據來源為Reflector,通過Pop操作消費數據,消費的數據一方面存儲到Indexer中,另一方面可以通過informer的handler進行處理(見下文)。informer的handler處理的數據需要與存儲在Indexer中的數據匹配。需要注意的是,Pop的單位是一個Deltas,而不是Delta。

DeltaFIFO同時實現了Queue和Store接口。DeltaFIFO使用Deltas保存了對象狀態的變更(Add/Delete/Update)信息(如Pod的刪除添加等),Deltas緩存了針對相同對象的多個狀態變更信息,如Pod的Deltas[0]可能更新了標簽,Deltas[1]可能刪除了該Pod。最老的狀態變更信息為Newest(),最新的狀態變更信息為Oldest()。使用中,獲取DeltaFIFO中對象的key以及獲取DeltaFIFO都以最新狀態為准。

//client-go/tools/cache/delta_fifo.go
type Delta struct {
    Type   DeltaType
    Object interface{}
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

DeltaFIFO結構中比較難以理解的是knownObjects,它的類型為KeyListerGetter。其接口中的方法ListKeys和GetByKey也是Store接口中的方法,因此knownObjects能夠被賦值為實現了Store的類型指針;同樣地,由於Indexer繼承了Store方法,因此knownObjects能夠被賦值為實現了Indexer的類型指針。

DeltaFIFO.knownObjects.GetByKey就是執行的store.go中的GetByKey函數,用於獲取Indexer中的對象鍵。

initialPopulationCount用於表示是否完成全量同步,initialPopulationCount在Replace函數中增加,在Pop函數中減小,當initialPopulationCount為0且populated為true時表示Pop了所有Replace添加到DeltaFIFO中的對象,populated用於判斷是DeltaFIFO中是否為初始化狀態(即沒有處理過任何對象)。

//client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
 items map[string]Deltas
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc  //用於計算Delta的key // knownObjects list keys that are "known", for the
    // purpose of figuring out which items have been deleted
    // when Replace() or Delete() is called.
 knownObjects KeyListerGetter// Indication the queue is closed.
    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    closed     bool
    closedLock sync.Mutex
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
    KeyLister
    KeyGetter
}

// A KeyLister is anything that knows how to list its keys.
type KeyLister interface { ListKeys() []string } // A KeyGetter is anything that knows how to get the value stored under a given key. type KeyGetter interface { GetByKey(key string) (interface{}, bool, error) }

在NewSharedIndexInformer(client-go/tools/cache/shared_informer.go)函數中使用下面進行初始化一個sharedIndexInformer,即使用函數DeletionHandlingMetaNamespaceKeyFunc初始化indexer,並在sharedIndexInformer.Run中將該indexer作為knownObjects入參,最終初始化為一個DeltaFIFO。

NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) //NewDeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) //sharedIndexInformer.Run

DeltaFIFO實現了Queue接口。可以看到Queue接口同時(Indexer繼承了Store)繼承了Store接口。

//client-go/tools/cache/delta_fifo.go
type Queue interface {
    Store

    // Pop blocks until it has something to process.
    // It returns the object that was process and the result of processing.
    // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
    // should be requeued before releasing the lock on the queue.
    Pop(PopProcessFunc) (interface{}, error)

    // AddIfNotPresent adds a value previously
    // returned by Pop back into the queue as long
    // as nothing else (presumably more recent)
    // has since been added.
    AddIfNotPresent(interface{}) error

    // HasSynced returns true if the first batch of items has been popped
    HasSynced() bool

    // Close queue
    Close()
}

knownObjects實際使用時為Indexer,它對應圖2中的localStore,DeltaFIFO根據其保存的對象狀態變更消息處理(增/刪/改/同步)knownObjects中相應的對象。其中同步(Sync)Detals中即將被刪除的對象是沒有意義的(參見willObjectBeDeletedLocked函數)。

ListWatch的list步驟中會調用Replace(client-go/tools/cache/delta_fifo.go)函數來對DeltaFIFO進行全量更新,包括3個步驟:

  • Sync所有DeltaFIFO中的對象,將輸入對象全部加入DeltaFIFO;
  • 如果knownObjects為空,則刪除DeltaFIFO中不存在於輸入對象的對象,使DeltaFIFO中的有效對象(非DeletedFinalStateUnknown)等同於輸入對象;
  • 如果knownObjects非空,獲取knownObjects中不存在於輸入對象的對象,並在DeltaFIFO中刪除這些對象。

第2步好理解,knownObjects為空,只需要更新DeltaFIFO即可。第3步中,當knownObjects非空時,需要以knowObjects為基准進行對象的刪除,否則會造成indexer中的數據與apiserver的數據不一致,舉個例子,比如knownObjects中的對象為{obj1, obj2, obj3},而DeltaFIFO中待處理的對象為{obj2, obj3,obj4},如果僅按照2步驟進行處理,會導致knownObjects中殘留obj1,因此需要在DeltaFIFO中添加刪除obj1變更消息。從下面ShareInformer章節的圖中可以看出,knownObjects(即Indexer)的數據只能通過DeltaFIFO變更。

 

ListWatch

Lister用於獲取某個資源(如Pod)的全量,Watcher用於獲取某個資源的增量變化。實際使用中Lister和Watcher都從apiServer獲取資源信息,Lister一般用於首次獲取某資源(如Pod)的全量信息,而Watcher用於持續獲取該資源的增量變化信息。Lister和Watcher的接口定義如下,使用NewListWatchFromClient函數來初始化ListerWatcher

// client-go/tools/cache/listwatch.go
type Lister interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    Lister
    Watcher
}

在workqueue的例子中可以看到調用NewListWatchFromClient的地方,該例子會從clientset.CoreV1().RESTClient()獲取"pods"的相關信息。

// client-go/examples/workqueue/main.go // create the pod watcher
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

cache.NewListWatchFromClient函數中的資源名稱可以從types.go中獲得

// k8s.io/api/core/v1/types.go
const (
    // Pods, number
    ResourcePods ResourceName = "pods"
    // Services, number
    ResourceServices ResourceName = "services"
    // ReplicationControllers, number
    ResourceReplicationControllers ResourceName = "replicationcontrollers"
    // ResourceQuotas, number
    ResourceQuotas ResourceName = "resourcequotas"
    // ResourceSecrets, number
    ResourceSecrets ResourceName = "secrets"
    // ResourceConfigMaps, number
    ResourceConfigMaps ResourceName = "configmaps"
    // ResourcePersistentVolumeClaims, number
    ResourcePersistentVolumeClaims ResourceName = "persistentvolumeclaims"
    // ResourceServicesNodePorts, number
    ResourceServicesNodePorts ResourceName = "services.nodeports"
    // ResourceServicesLoadBalancers, number
    ResourceServicesLoadBalancers ResourceName = "services.loadbalancers"
    // CPU request, in cores. (500m = .5 cores)
    ResourceRequestsCPU ResourceName = "requests.cpu"
    // Memory request, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
    ResourceRequestsMemory ResourceName = "requests.memory"
    // Storage request, in bytes
    ResourceRequestsStorage ResourceName = "requests.storage"
    // Local ephemeral storage request, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
    ResourceRequestsEphemeralStorage ResourceName = "requests.ephemeral-storage"
    // CPU limit, in cores. (500m = .5 cores)
    ResourceLimitsCPU ResourceName = "limits.cpu"
    // Memory limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
    ResourceLimitsMemory ResourceName = "limits.memory"
    // Local ephemeral storage limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
    ResourceLimitsEphemeralStorage ResourceName = "limits.ephemeral-storage"
)

除了可以從CoreV1版本的API group獲取RESTClient信息外,還可以從下面Clientset結構體定義的API group中獲取信息

// client-go/kubernetes/clientset.go
type Clientset struct {
    *discovery.DiscoveryClient
    admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
    appsV1                       *appsv1.AppsV1Client
    appsV1beta1                  *appsv1beta1.AppsV1beta1Client
    appsV1beta2                  *appsv1beta2.AppsV1beta2Client
    auditregistrationV1alpha1    *auditregistrationv1alpha1.AuditregistrationV1alpha1Client
    authenticationV1             *authenticationv1.AuthenticationV1Client
    authenticationV1beta1        *authenticationv1beta1.AuthenticationV1beta1Client
    authorizationV1              *authorizationv1.AuthorizationV1Client
    authorizationV1beta1         *authorizationv1beta1.AuthorizationV1beta1Client
    autoscalingV1                *autoscalingv1.AutoscalingV1Client
    autoscalingV2beta1           *autoscalingv2beta1.AutoscalingV2beta1Client
    autoscalingV2beta2           *autoscalingv2beta2.AutoscalingV2beta2Client
    batchV1                      *batchv1.BatchV1Client
    batchV1beta1                 *batchv1beta1.BatchV1beta1Client
    batchV2alpha1                *batchv2alpha1.BatchV2alpha1Client
    certificatesV1beta1          *certificatesv1beta1.CertificatesV1beta1Client
    coordinationV1beta1          *coordinationv1beta1.CoordinationV1beta1Client
    coordinationV1               *coordinationv1.CoordinationV1Client
    coreV1                       *corev1.CoreV1Client
    eventsV1beta1                *eventsv1beta1.EventsV1beta1Client
    extensionsV1beta1            *extensionsv1beta1.ExtensionsV1beta1Client
    networkingV1                 *networkingv1.NetworkingV1Client
    networkingV1beta1            *networkingv1beta1.NetworkingV1beta1Client
    nodeV1alpha1                 *nodev1alpha1.NodeV1alpha1Client
    nodeV1beta1                  *nodev1beta1.NodeV1beta1Client
    policyV1beta1                *policyv1beta1.PolicyV1beta1Client
    rbacV1                       *rbacv1.RbacV1Client
    rbacV1beta1                  *rbacv1beta1.RbacV1beta1Client
    rbacV1alpha1                 *rbacv1alpha1.RbacV1alpha1Client
    schedulingV1alpha1           *schedulingv1alpha1.SchedulingV1alpha1Client
    schedulingV1beta1            *schedulingv1beta1.SchedulingV1beta1Client
    schedulingV1                 *schedulingv1.SchedulingV1Client
    settingsV1alpha1             *settingsv1alpha1.SettingsV1alpha1Client
    storageV1beta1               *storagev1beta1.StorageV1beta1Client
    storageV1                    *storagev1.StorageV1Client
    storageV1alpha1              *storagev1alpha1.StorageV1alpha1Client
}

RESTClient()的返回值為Interface接口類型,該類型中包含如下對資源的操作方法,如Get()就封裝了HTTP的Get方法。NewListWatchFromClient初始化ListWatch的時候使用了Get方法

// client-go/rest/client.go
type Interface interface {
    GetRateLimiter() flowcontrol.RateLimiter
    Verb(verb string) *Request
    Post() *Request
    Put() *Request
    Patch(pt types.PatchType) *Request
    Get() *Request
    Delete() *Request
    APIVersion() schema.GroupVersion
}

 

Reflector

reflector使用listerWatcher獲取資源,並將其保存在store中,此處的store就是DeltaFIFO,Reflector核心處理函數為ListAndWatch(client-go/tools/cache/reflector.go)

// client-go/tools/cache/reflector.go type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string
    // metrics tracks basic metric information about the reflector
    metrics *reflectorMetrics

    // The type of object we expect to place in the store.
    expectedType reflect.Type
    // The destination to sync up with the watch source
 store Store // listerWatcher is used to perform lists and watches.
 listerWatcher ListerWatcher // period controls timing between one watch ending and
    // the beginning of the next one.
    period       time.Duration
    resyncPeriod time.Duration
    ShouldResync func() bool
    // clock allows tests to manipulate time
    clock clock.Clock
    // lastSyncResourceVersion is the resource version token last
    // observed when doing a sync with the underlying store
    // it is thread safe, but not synchronized with the underlying store
    lastSyncResourceVersion string
    // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    // Defaults to pager.PageSize.
    WatchListPageSize int64
}

ListAndWatch在Reflector.Run函數中啟動,並以Reflector.period周期性進行調度。ListAndWatch使用resourceVersion來獲取資源的增量變化:在List時會獲取資源的首個resourceVersion值,在Watch的時候會使用List獲取的resourceVersion來獲取資源的增量變化,然后將獲取到的資源的resourceVersion保存起來,作為下一次Watch的基線。

// client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

如可以使用如下命令獲取Pod的resourceVersion

# oc get pod $PodName -oyaml|grep resourceVersion:
resourceVersion: "4993804"

上圖中的Resync觸發的Sync動作,其作用與Replace中的第三步相同,用於將knowObject中的對象與DeltaFIFO中同步。這種操作是有必要的

 

 Controller

controller的結構如下,其包含一個配置變量config,在注釋中可以看到Config.Queue就是DeltaFIFO。controller定義了如何調度Reflector。

// client-go/tools/cache/controller.go
type controller struct {
    config Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}
// client-go/tools/cache/controller.go
type Config struct {
    // The queue for your objects - has to be a DeltaFIFO due to
    // assumptions in the implementation. Your Process() function
    // should accept the output of this Queue's Pop() method.
    Queue

    // Something that can list and watch your objects.
    ListerWatcher

    // Something that can process your objects.
    Process ProcessFunc

    // The type of your objects.
    ObjectType runtime.Object

    // Reprocess everything at least this often.
    // Note that if it takes longer for you to clear the queue than this
    // period, you will end up processing items in the order determined
    // by FIFO.Replace(). Currently, this is random. If this is a
    // problem, we can change that replacement policy to append new
    // things to the end of the queue instead of replacing the entire
    // queue.
    FullResyncPeriod time.Duration

    // ShouldResync, if specified, is invoked when the controller's reflector determines the next
    // periodic sync should occur. If this returns true, it means the reflector should proceed with
    // the resync.
    ShouldResync ShouldResyncFunc

    // If true, when Process() returns an error, re-enqueue the object.
    // TODO: add interface to let you inject a delay/backoff or drop
    //       the object completely if desired. Pass the object in
    //       question to this interface as a parameter.
    RetryOnError bool
}

controller的框架比較簡單它使用wg.StartWithChannel啟動Reflector.Run,相當於啟動了一個DeltaFIFO的生產者(wg.StartWithChannel(stopCh, r.Run)表示可以將r.Run放在獨立的協程運行,並可以使用stopCh來停止r.Run);使用wait.Until來啟動一個消費者(wait.Until(c.processLoop, time.Second, stopCh)表示每秒會觸發一次c.processLoop,但如果c.processLoop在1秒之內沒有結束,則運行c.processLoop繼續運行,不會結束其運行狀態)

// client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()

 wg.StartWithChannel(stopCh, r.Run)  wait.Until(c.processLoop, time.Second, stopCh)
}

processLoop的框架也很簡單,它運行了DeltaFIFO.Pop函數,用於消費DeltaFIFO中的對象,並在DeltaFIFO.Pop運行失敗后可能重新處理該對象(AddIfNotPresent)

注:c.config.RetryOnError在目前版本中初始化為False

// client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil {
            if err == FIFOClosedError {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
 c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}
//client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod,  RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, }
...

 

ShareInformer

下圖為SharedInformer的運行圖。可以看出SharedInformer啟動了controller,reflector,並將其與Indexer結合起來。

注:不同顏色表示不同的chan,相同顏色表示在同一個chan中的處理

SharedInformer.Run啟動了兩個chan,s.c.Run為controller的入口,s.c.Run函數中會Pop DeltaFIFO中的元素,並根據DeltaFIFO的元素的類型(Sync/Added/Updated/Deleted)進兩類處理,一類會使用indexer.Update,indexer,Add,indexer.Delete對保存的在Store中的數據進行處理;另一類會根據DeltaFIFO的元素的類型將其封裝為sharedInformer內部類型updateNotification,addNotification,deleteNotification,傳遞給s.processor.Listeners.addCh,后續給注冊的pl.handler處理。

s.processor.run主要用於處理注冊的handler,processorListener.run函數接受processorListener.nextCh中的值,將其作為參數傳遞給handler進行處理。而processorListener.pop負責將processorListener.addCh中的元素緩存到p.pendingNotifications,並讀取p.pendingNotifications中的元素,將其傳遞到processorListener.nextCh。即processorListener.pop負責管理數據,processorListener.run負責使用processorListener.pop管理的數據進行處理。

// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

 sharedIndexInformer有3個狀態:啟動前,啟動后,停止后,由started, stopped兩個bool值表示。

stopped=true表示inforer不再運作且不能添加新的handler(因為即使添加了也不會運行)

informer啟動前和停止后允許添加新的indexer(sharedIndexInformer.AddIndexers),但不能在informer運行時添加,因為此時需要通過listwatch以及handler等一系列處理來操作sharedIndexInformer.inxder。如果允許同時使用sharedIndexInformer.AddIndexers,可能會造成數據不一致。

還有一個狀態sharedProcessor.listenersStarted,用於表示是否所有的s.processor.Listeners都已經啟動,如果已經啟動,則在添加新的processorListener時,需要運行新添加的processorListener,否則僅僅添加即可(添加后同樣會被sharedProcessor.run調度)

// client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector CacheMutationDetector

    // This block is tracked to handle late initialization of the controller
    listerWatcher ListerWatcher
    objectType    runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

  started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex
}

SharedInformerFactory

sharedInformerFactory接口的內容如下,它按照group和version對informer進行了分類。

// client-go/informers/factory.go
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Admissionregistration() admissionregistration.Interface Apps() apps.Interface Auditregistration() auditregistration.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface Events() events.Interface Extensions() extensions.Interface Networking() networking.Interface Node() node.Interface Policy() policy.Interface Rbac() rbac.Interface Scheduling() scheduling.Interface Settings() settings.Interface Storage() storage.Interface
}

注:下圖來自https://blog.csdn.net/weixin_42663840/article/details/81980022

sharedInformerFactory負責在不同的chan中啟動不同的informer(或shared_informer)

// client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

那sharedInformerFactory啟動的informer又是怎么注冊到sharedInformerFactory.informers中的呢?informer的注冊函數統一為InformerFor,代碼如下,所有類型的informer都會調用該函數注冊到sharedInformerFactory

// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

下面以(Core,v1,podInformer)為例結合client-go中提供的代碼進行講解。代碼如下,在調用informers.Core().V1().Pods().Informer()的時候會同時調用informers.InformerFor注冊到sharedInformerFactory,后續直接調用informers.Start啟動注冊的informer。

// client-go/examples/fake-client/main_test.go
func TestFakeClient(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create the fake client.
    client := fake.NewSimpleClientset()

    // We will create an informer that writes added pods to a channel.
    pods := make(chan *v1.Pod, 1)
    informers := informers.NewSharedInformerFactory(client, 0)    //創建一個新的shareInformerFactory
    podInformer := informers.Core().V1().Pods().Informer()        //創建一個podInformer,並調用InformerFor函數進行注冊
    podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*v1.Pod)
            t.Logf("pod added: %s/%s", pod.Namespace, pod.Name)
            pods <- pod
        },
    })

    // Make sure informers are running.
    informers.Start(ctx.Done())                                   //啟動所有的informer
...

 

workqueue

 indexer用於保存apiserver的資源信息,而workqueue用於保存informer中的handler處理之后的數據。workqueue的接口定義如下: 

// client-go/util/workqueue/queue.go
type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

參見上圖可以看到真正處理的元素來自queue,dirty和queue中的元素可能不一致,不一致點來自於當Get一個元素后且Done執行前,此時Get操作會刪除dirty中的該元素,如果此時發生了Add正在處理的元素的操作,由於此時dirty中沒有該元素且processing中存在該元素,會發生dirty中的元素大於queue中元素的情況。但對某一元素的不一致會在Done完成后消除,即Done函數中會判斷該元素是否在dirty中,如果存在則會將該元素append到queue中。總之,dirty中的數據都會被append到queue中,后續queue中的數據會insert到processing中進行處理()

dType實現了Interface接口。包含下面幾個變量:

  • queue:使用數組順序存儲了待處理的元素;
  • dirty:使用哈希表存儲了需要處理的元素,它包含了queue中的所有元素,用於快速查找元素,dirty中可能包含queue中不存在的元素。dirty可以防止重復添加正在處理的元素;
  • processing:使用哈希表保存了正在處理的元素,它不包含queue中的元素,但可能包含dirty中的元素
// client-go/util/workqueue/queue.go // Type is a work queue (see the package comment).
type Type struct {
    // queue defines the order in which we will work on items. Every
    // element of queue should be in the dirty set and not in the
    // processing set.
 queue []t // dirty defines all of the items that need to be processed.
    dirty set

    // Things that are currently being processed are in the processing set.
    // These things may be simultaneously in the dirty set. When we finish
    // processing something and remove it from this set, we'll check if
    // it's in the dirty set, and if so, add it to the queue.
 processing set

    cond *sync.Cond

    shuttingDown bool

    metrics queueMetrics

    unfinishedWorkUpdatePeriod time.Duration
    clock                      clock.Clock
}

 workqueue的使用例子可以參見client-go/util/workqueue/queue_test.go

延時隊列

延時隊列接口繼承了queue的Interface接口,僅新增了一個AddAfter方法,它用於在duration時間之后將元素添加到queue中。

// client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

delayingType實現了DelayingInterface接口使用waitingForAddCh來傳遞需要添加到queue的元素,

// client-go/util/workqueue/delaying_queue.go
type delayingType struct {
    Interface

    // clock tracks time for delayed firing
    clock clock.Clock

    // stopCh lets us signal a shutdown to the waiting loop
    stopCh chan struct{}
    // stopOnce guarantees we only signal shutdown a single time
    stopOnce sync.Once

    // heartbeat ensures we wait no more than maxWait before firing
    heartbeat clock.Ticker

    // waitingForAddCh is a buffered channel that feeds waitingForAdd
    waitingForAddCh chan *waitFor // metrics counts the number of retries
    metrics           retryMetrics
    deprecatedMetrics retryMetrics
}

delayingType.waitingForAddCh中的元素如果沒有超過延時時間會添加到waitForPriorityQueue中,否則直接加入queue中。

// client-go/util/workqueue/delaying_queue.go
type waitForPriorityQueue []*waitFor

延時隊列實現邏輯比較簡單,需要注意的是waitingForQueue是以heap方式實現的隊列,隊列的pop和push等操作使用的是heap.pop和heap.push

限速隊列

限速隊列實現了3個接口,When用於返回元素的重試時間,Forget用於清除元素的重試記錄,NumRequeues返回元素的重試次數

//client-go/util/workqueue/default_rate_limiter.go
type RateLimiter interface {
    // When gets an item and gets to decide how long that item should wait
    When(item interface{}) time.Duration
    // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
    // or for success, we'll stop tracking it
    Forget(item interface{})
    // NumRequeues returns back how many failures the item has had
    NumRequeues(item interface{}) int
}

ItemExponentialFailureRateLimiter對使用指數退避的方式進行失敗重試,當failures增加時,下次重試的時間就變為了baseDelay.Nanoseconds()) * math.Pow(2, float64(exp),maxDelay用於限制重試時間的最大值,當計算的重試時間超過maxDelay時則采用maxDelay

// client-go/util/workqueue/default_rate_limiters.go
type ItemExponentialFailureRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int baseDelay time.Duration
    maxDelay  time.Duration
}

ItemFastSlowRateLimiter針對失敗次數采用不同的重試時間。當重試次數小於maxFastAttempts時,重試時間為fastDelay,否則我為slowDelay。

// client-go/util/workqueue/default_rate_limiters.go
type ItemFastSlowRateLimiter struct {
    failuresLock sync.Mutex
    failures     map[interface{}]int maxFastAttempts int fastDelay  time.Duration
    slowDelay  time.Duration
}

MaxOfRateLimiter為一個限速隊列列表,它的實現中返回列表中重試時間最長的限速隊列的值。

// client-go/util/workqueue/default_rate_limiters.go
type MaxOfRateLimiter struct {
    limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    ret := time.Duration(0)
    for _, limiter := range r.limiters {
        curr := limiter.When(item)
        if curr > ret {
            ret = curr
        }
    }

    return ret
}

BucketRateLimiter

使用令牌桶實現一個固定速率的限速器

// client-go/util/workqueue/default_rate_limiters.go
type BucketRateLimiter struct {
    *rate.Limiter
}

限速隊列的調用

所有的限速隊列實際上就是根據不同的需求,最終提供一個延時時間,在延時時間到后通過AddAfter函數將元素添加添加到隊列中。在queue.go中給出了workqueue的基本框架,delaying_queue.go擴展了workqueue的功能,提供了限速的功能,而default_rate_limiters.go提供了多種限速隊列,用於給delaying_queue.go中的AddAfter提供延時參數,最后rate_limiting_queue.go給出了使用使用限速隊列的入口。

RateLimitingInterface為限速隊列入口,AddRateLimited

// client-g0/util/workqueue/rate_limiting_queue.go
type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

rateLimitingType實現了RateLimitingInterface接口,第二個參數就時限速隊列接口。

// client-g0/util/workqueue/rate_limiting_queue.go
type rateLimitingType struct {
    DelayingInterface

    rateLimiter RateLimiter
}

下面是限速隊列的使用:

  • 使用NewItemExponentialFailureRateLimiter初始化一個限速器
  • 使用NewRateLimitingQueue新建一個限速隊列,並使用上一步的限速器進行初始化
  • 后續就可以使用AddRateLimited添加元素
// client-go/util/workqueue/rate_limiting_queue_test.go
func TestRateLimitingQueue(t *testing.T) {
    limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
    queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
    fakeClock := clock.NewFakeClock(time.Now())
    delayingQueue := &delayingType{
        Interface:         New(),
        clock:             fakeClock,
        heartbeat:         fakeClock.NewTicker(maxWait),
        stopCh:            make(chan struct{}),
        waitingForAddCh:   make(chan *waitFor, 1000),
        metrics:           newRetryMetrics(""),
        deprecatedMetrics: newDeprecatedRetryMetrics(""),
    }
    queue.DelayingInterface = delayingQueue

    queue.AddRateLimited("one")
    waitEntry := <-delayingQueue.waitingForAddCh
    if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
        t.Errorf("expected %v, got %v", e, a)
    }

    queue.Forget("one")
    if e, a := 0, queue.NumRequeues("one"); e != a {
        t.Errorf("expected %v, got %v", e, a)
    }
}

PS:后續會使用client-go編寫簡單程序

 TIPS:

  • 使用Client-go編寫程序時,需要注意client-go的版本需要與對接的kubernetes相匹配,對應關系參見github
  • 實際使用中會先創建SharedIndexInformer,DeltaFIFO和Reflector是在SharedIndexInformer.Run過程中自動創建的。用戶通過SharedIndexInformer暴露的接口對其進行操作,通常為對SharedIndexInformer的indexer進行操作,添加eventhandler以及判斷是否sync過。主要接口如下,其中GetStore和GetIndexer功能相同,返回informer的indexer
# client-go/tools/cache/shared_informer.go
type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. AddEventHandler(handler ResourceEventHandler) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer using the specified resync period. The resync // operation consists of delivering to the handler a create // notification for every object in the informer's local cache; it // does not add any interactions with the authoritative storage. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController gives back a synthetic interface that "votes" to start the informer GetController() Controller // Run starts and runs the shared informer, returning after it stops. // The informer will be stopped when stopCh is closed. Run(stopCh <-chan struct{}) // HasSynced returns true if the shared informer's store has been // informed by at least one full LIST of the authoritative state // of the informer's object collection. This is unrelated to "resync". HasSynced() bool // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. LastSyncResourceVersion() string } type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer }

 

參考:

https://www.huweihuang.com/kubernetes-notes/code-analysis/kube-controller-manager/sharedIndexInformer.html

https://rancher.com/using-kubernetes-api-go-kubecon-2017-session-recap/

https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/

https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/

https://www.jianshu.com/p/d17f70369c35

https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM