深入理解k8s中的informer機制


k8s是典型的server-client架構。etcd存儲集群的數據信息,apiserver作為統一的操作入口,任何對數據的操作都必須經過apiserver。

客戶端通過ListAndWatch機制查詢apiserver,而informer模塊則封裝了List-watch。

《kubernetes源碼剖析》一書中的informer機制架構圖:
 
整個架構大體分為以下幾個部分:
一、Index
tools/cache/thread_safe_store.go中,定義了實現了線程安全的存儲接口ThreadSafeStore:
type ThreadSafeStore interface {
   Add(key string, obj interface{})
   Update(key string, obj interface{})
   Delete(key string)
   Get(key string) (item interface{}, exists bool)
   List() []interface{}
   ListKeys() []string
   Replace(map[string]interface{}, string)
   Index(indexName string, obj interface{}) ([]interface{}, error)  //傳入indexName和obj,返回所有和obj有相同index key的obj
   IndexKeys(indexName, indexKey string) ([]string, error)          // 傳入indexName和index key,返回index key指定的所有obj key
   ListIndexFuncValues(name string) []string                        //獲取indexName對應的index內的所有index key
   ByIndex(indexName, indexKey string) ([]interface{}, error)       //和IndexKeys方法類似,只是返回的是index key指定的所有obj
   GetIndexers() Indexers                                           //返回目前所有的indexers
   AddIndexers(newIndexers Indexers) error                         //存儲數據前調用,添加indexer
   Resync() error    // Resync is a no-op and is deprecated
}

結構體threadSafeMap將資源對象數據存儲於一個內存中的map數據結構中:

type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}    //實際存所有資源對象的地方
   indexers Indexers     
   indices Indices       
}

每次的增、刪、改、查操作都會都會加鎖,以保證數據的一致性。

k8s.io/client-go/tools/cache/store.go中,定義了存儲接口Store:
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error    
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)
    Replace([]interface{}, string) error
    Resync() error
}
在tools/cache/index.go中,定義了Indexer接口:
type Indexer interface {
   Store    // 繼承接口Store
   Index(indexName string, obj interface{}) ([]interface{}, error)   
   IndexKeys(indexName, indexedValue string) ([]string, error)   
   ListIndexFuncValues(indexName string) []string  
   ByIndex(indexName, indexedValue string) ([]interface{}, error)   
   GetIndexers() Indexers     
   AddIndexers(newIndexers Indexers) error    
}
還定義了一些數據結構:
type IndexFunc func(obj interface{}) ([]string, error) // 計算資源對象的index key的函數類型,值得注意的是,返回的是多個index key組成的列表
type Indexers map[string]IndexFunc // 計算索引的方法不止一個,通過給他們命名來加以區別,存儲索引名(name)與索引方法(IndexFunc)的映射
type Indices map[string]Index      // 索引名(name)與索引(index)的映射
type Index map[string]sets.String  // 索引鍵(index key)與值(Obj Key)列表的映射
它們間的關系如圖所示:
具體實現在tools/cache/store.go中的cache結構體:
type cache struct {   
   cacheStorage ThreadSafeStore   //cacheStorage是一個ThreadSafeStore類型的對象,實際使用的是threadSafeMap類型
   keyFunc KeyFunc   //用於計算資源對象的index key
}
type KeyFunc func(obj interface{}) (string, error)
cache結構體封裝了threadSafeMap的很多方法,對外提供了Add、Update、Delete等方法;Indexer接口中規定需要實現的那些方法都是調用的threadSafeMap的實現
通過cache.NewIndexer(keyFunc, Indexers)初始化Indexer對象
    keyFunc:k8s內部目前使用的自定義的indexFunc有PodPVCIndexFunc 、indexByPodNodeName 、MetaNamespaceIndexFunc
    默認使用MetaNamespaceKeyFunc:根據資源對象計算出<namespace>/<name>格式的key,如果資源對象的<namespace>為空,則<name>作為key
    Indexers:通過NewThreadSafeStore(indexers, Indices{})得到結構體內的cacheStorage
 
示例:
// 定義一個IndexFunc,功能為:根據Annotations的users字段返回index key
func UsersIndexFunc(obj interface{}) ([]string, error){
   pod := obj.(*v1.Pod)
   usersString := pod.Annotations["users"]
   return strings.Split(usersString, ","), nil
}
 
func main() {
   index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"byUser":UsersIndexFunc})
 
   pod1 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"one",Annotations:map[string]string{"users":"ernie,bert"}}}
   pod2 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"two",Annotations:map[string]string{"users":"bert,oscar"}}}
   pod3 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"three",Annotations:map[string]string{"users":"ernie,elmo"}}}
   
   //添加3個Pod資源對象
   index.Add(pod1)
   index.Add(pod2)
   index.Add(pod3)
 
   //通過index.ByIndex函數(通過執行索引器函數得到索引結果)查詢byUser索引器下匹配ernie字段的Pod列表
   erniePods, err := index.ByIndex("byUser","ernie")
   if err != nil{
      panic(err)
   }
 
   for _, erniePods := range erniePods{
      fmt.Println(erniePods.(*v1.Pod).Name)
   }
}
 
二、DeltaFIFO
tools/cache/delta_fifo.go中定義了DeltaFIFO。Delta代表變化, FIFO則是先入先出的隊列。
DeltaFIFO將接受來的資源event,轉化為特定的變化類型,存儲在隊列中,周期性的POP出去,分發到事件處理器,並更新Indexer中的本地緩存。
 
DeltaType是string的別名,代表一種變化:
type DeltaType string   
類型定義:
const (
   Added   DeltaType = "Added"
   Updated DeltaType = "Updated"
   Deleted DeltaType = "Deleted"
   Replaced DeltaType = “Replaced”  // 替換,list出錯時,會觸發relist,此時會替換
   Sync DeltaType = “Sync”   // 周期性的同步,底層會當作一個update類型處理
)
Delta由變化類型+資源對象組成:
type Delta struct {
   Type   DeltaType
   Object interface{}
}
Deltas是[]delta切片:
type Deltas []Delta
DeltaFIFO的定義:
type DeltaFIFO struct {
   lock sync.RWMutex  //讀寫鎖
   cond sync.Cond    //條件變量
   items map[string]Deltas   //通過map數據結構的方式存儲,value存儲的是對象的Deltas數組
   queue []string     //存儲資源對象的key,該key通過KeyOf(obj)函數計算得到
   populated bool   //通過Replace()接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記為true
   initialPopulationCount int   //通過Replace()接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記為true
   keyFunc KeyFunc
   knownObjects KeyListerGetter   //indexer
   closed     bool
   closedLock sync.Mutex
   emitDeltaTypeReplaced bool
}

 

向隊列里添加元素:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj  interface{}) error {
     id, err := f.KeyOf(obj)  //獲取obj key
     if err != nil {
           return KeyError{obj, err}
     }
     //向items中添加delta,並對操作進行去重,目前來看,只有連續兩次操作都是刪除操作的情況下,才可以合並,其他操作不會合並
     newDeltas := append(f.items[id], Delta{actionType, obj})  
     newDeltas = dedupDeltas(newDeltas)
     if len(newDeltas) > 0 {
           //向queue和items中添加元素,添加以后,條件變量發出消息,通知可能正在阻塞的POP方法有事件進隊列了
           if _, exists := f.items[id]; !exists {
                f.queue = append(f.queue, id)
           }
           f.items[id] = newDeltas
           f.cond.Broadcast()
     } else {
           // 冗余判斷,其實是不會走到這個分支的,去重后的delta list長度怎么也不可能小於1
           delete(f.items, id)
     }
     return nil
}
從隊列里Pop元素:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{},  error) {
     f.lock.Lock()
     defer f.lock.Unlock()
     for {
           for len(f.queue) == 0 {   // 如果隊列是空的,利用條件變量阻塞住,直到有新的delta
                if f.IsClosed() {    // 如果Close()被調用,則退出
                     return nil, ErrFIFOClosed
                }
                f.cond.Wait()
           }
           id := f.queue[0]
           f.queue = f.queue[1:]
           if f.initialPopulationCount > 0 {
                f.initialPopulationCount--
           }
           item, ok := f.items[id]
           if !ok {
                // Item may have been deleted subsequently.
                continue
           }
           delete(f.items, id)
           err := process(item)
           // 如果處理失敗了,調用addIfNotPresent:如果queue中沒有則添加。本身剛剛從queue和items中取出對象,應該不會存在重復的對象,這里調用addIfNotPresent應該只是為了保險起見
           if e, ok := err.(ErrRequeue); ok {
                f.addIfNotPresent(id, item)
                err = e.Err
           }
           // Don't need to copyDeltas here, because we're  transferring
           // ownership to the caller.
           return item, err
     }
}
 
三、Reflector
tools/cache/reflector.go中定義了Reflector:
type Reflector struct {
   name string
   expectedTypeName string     //被監控的資源的類型名
   expectedType reflect.Type   // 監控的對象類型
   expectedGVK *schema.GroupVersionKind
   store Store    // 存儲,就是Delta_FIFO,這里的Store類型實際是Delta_FIFO的父類
   listerWatcher ListerWatcher  // 用來進行list&watch的接口對象
   backoffManager wait.BackoffManager
   resyncPeriod time.Duration   //重新同步的周期
   ShouldResync func() bool    //周期性的判斷是否需要重新同步
   clock clock.Clock     //時鍾對象,主要是為了給測試留后門,方便修改時間
   ……
}
同一類資源Informer共享一個Reflector。Reflector通過ListAndWatch函數來ListAndWatch apiserver來獲取資源的數據。
獲取時需要基於ResourceVersion(Etcd生成的全局唯一且遞增的資源版本號)。通過此序號,客戶端可以知道目前與服務端信息同步的狀態,每次只取大於等於本地序號的事件。好處是可以實現事件的全局唯一,實現”斷點續傳“功能,不用擔心本地客戶端偶爾出現的網絡異常
ListAndwatch是k8s統一的異步消息處理機制,保證了消息的實時性、可靠性、順序性、性能等,為聲明式風格的API奠定了良好的基礎,是k8s架構的精髓。
    List在Controller重啟或Watch中斷的情況下,調用資源的list API羅列資源對象以進行全量更新,基於HTTP短鏈接實現
(1)r.listerWatcher.List用於獲取資源下的所有對象的數據,例如,獲取所有Pod的資源數據。獲取資源數據是由options的ResourceVersion控制的。如果ResourceVersion為0,則表示獲取所有Pod的資源數據;如果ResourceVersion非0,則表示根據資源版本號繼續獲取。
(2)listMetaInterface.GetResourceVersion用於獲取資源版本號。
(3)meta.ExtractList用於將資源數據(runtime.Object對象)轉換成資源對象列表([]runtime.Object對象)。
因為r.listerWatcher.List獲取的是資源下的所有對象的數據,例如所有的Pod資源數據,所以它是一個資源列表。
(4)r.syncWith用於將資源對象列表中的資源對象和資源版本號存儲至DeltaFIFO中,並會替換已存在的對象。
(5)r.setLastSyncResourceVersion用於設置最新的資源版本號。
    Watch則在多次List之間進行,調用資源的watch API,基於當前的資源版本號監聽資源變更(如Added、Updated、Deleted)事件。
通過在Http請求中帶上watch=true,表示采用Http長連接持續監聽apiserver發來的資源變更事件。
apiserver在response的HTTP Header中設置Transfer-Encoding的值為chunked,表示采用分塊傳輸編碼。每當有事件來臨,返回一個WatchEvent。
 
Reflector在獲取新的資源數據后,調用的Add方法將資源對象的Delta記錄存放到本地緩存DeltaFIFO中。
 
四、Controller
在tool/cache/controller.go中定義了Controller接口:
type Controller interface {
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
}
controller結構體實現了此接口:
type controller struct {
   config         Config
   reflector      *Reflector
   reflectorMutex sync.RWMutex
   clock          clock.Clock
}
config結構體中是所有配置:
type Config struct {
   Queue     //DeltaFIFO
   ListerWatcher
   Process ProcessFunc   //從DeltaFIFO Pop調用時,調用的回調
   ObjectType runtime.Object  //期待處理的資源對象的類型
   FullResyncPeriod time.Duration   //全量resync的周期
   ShouldResync ShouldResyncFunc  //delta fifo周期性同步判斷時使用
   RetryOnError bool
}
Controller的processLoop方法會不斷地調用的Pop方法從Delta隊列中消費彈出delta記錄(隊列中沒有數據時阻塞等待數據):
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}
Pop方法須傳入Process函數——用於接收並處理對象的回調方法,默認的Process函數是Informer模塊中的HandleDeltas
 
五、informer
Kubernetes的其他組件都是通過client-go的Informer機制與Kubernetes API Server進行通信的。
Informer也被稱為Shared Informer,它是可以共享使用的。
在clientgo的informer/factory.go中,有接口定義:
type SharedInformerFactory interface {
   internalinterfaces.SharedInformerFactory
   ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
   WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
   // 所有已知資源的shared informer
   Admissionregistration() admissionregistration.Interface
   Apps() apps.Interface
   Auditregistration() auditregistration.Interface
   Autoscaling() autoscaling.Interface
   Batch() batch.Interface
   Certificates() certificates.Interface
   Coordination() coordination.Interface
   Core() core.Interface
   Discovery() discovery.Interface
   Events() events.Interface
   Extensions() extensions.Interface
   Flowcontrol() flowcontrol.Interface
   Networking() networking.Interface
   Node() node.Interface
   Policy() policy.Interface
   Rbac() rbac.Interface
   Scheduling() scheduling.Interface
   Settings() settings.Interface
   Storage() storage.Interface
}
sharedInformerFactory結構體實現了此接口:
type sharedInformerFactory struct {
   client           kubernetes.Interface
   namespace        string
   tweakListOptions internalinterfaces.TweakListOptionsFunc
   lock             sync.Mutex
   defaultResync    time.Duration
   customResync     map[reflect.Type]time.Duration
   informers map[reflect.Type]cache.SharedIndexInformer
   startedInformers map[reflect.Type]bool   //用於追蹤哪種informer被啟動了,避免同一資源的Informer被實例化多次,運行過多相同的ListAndWatch
}
新建一個sharedInformerFactory結構體:
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)  

第1個參數是用於與Kubernetes API Server交互的客戶端;第2個參數用於設置多久進行一次resync(周期性的List操作),如果該參數為0,則禁用resync功能。

sharedInformerFactory結構體實現了所有已知資源的shared informer,例如在clientgo的informer/core/vi/pod.go中,定義了如下接口:

type PodInformer interface{
  Informer() cache.SharedIndexInformer
  Listen() v1.PodLister
}
podInformer結構體實現了Informer方法和Listen方法:
func (f *podInformer) Informer() cache.SharedIndexInformer {
   return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)  // 如果已經存在同類型的資源Informer,則返回當前Informer,不再繼續添加
}
func (f *podInformer) Lister() v1.PodLister {
   return v1.NewPodLister(f.Informer().GetIndexer())
}
通過調用sharedInformers.Core().V1().Pods()獲得podInformer結構體
得到具體Pod資源的informer對象:
informer := sharedInformers.Core().V1().Pods().Informer()     
最終獲得的,是clientgo/tool/cache/shared_informer.go中的sharedIndexInformer結構體,它實現的接口為:
type SharedIndexInformer interface {
   SharedInformer
   AddIndexers(indexers Indexers) error   //啟動informer前為其添加indexers
   GetIndexer() Indexer
}
它的定義為:
type sharedIndexInformer struct {
   indexer    Indexer
   controller Controller
   processor             *sharedProcessor    
   cacheMutationDetector MutationDetector     
   listerWatcher ListerWatcher
   objectType runtime.Object        
   resyncCheckPeriod time.Duration  
   defaultEventHandlerResyncPeriod time.Duration
   clock clock.Clock
   started, stopped bool
   startedLock      sync.Mutex
   blockDeltas sync.Mutex
}
通過informer.AddEventHandler函數可以為資源對象添加資源事件回調方法,支持3種資源事件回調方法:AddFunc、UpdateFunc、DeleteFunc
sharedIndexInformer結構體定義了HandleDeltas函數,作為process回調函數(通過Config結構體傳給controller)
當資源對象的操作類型為Added、Updated、Deleted時,會將該資源對象存儲至Indexer,並通過distribute函數將資源對象分發至用戶自定義的事件處理函數(通過informer.AddEventHandler添加)中
 
通過informer.Run(stopCH)運行該informer,它是一個持久化的goroutine,通過clientset對象與apiserver交互。
它會啟動controller,啟動時傳入的Config結構體包含了
stopCH用於在程序進程退出前通知Informer退出
 
調用Pod的Informer的示例:
stopCH := make(chan struct{})
defer close(stopCH)
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)   
informer := sharedInformers.Core().V1().Pods().Informer()     
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{     //為Pod資源添加資源事件回調方法
   AddFunc: func(obj interface{}){
      mObj := obj.(v1.Object)
      log.Print("創建新Pod:",mObj.GetName())
   },
   UpdateFunc: func(oldObj, newObj interface{}){
      oObj := oldObj.(v1.Object)
      nObj := newObj.(v1.Object)
      log.Print(oObj.GetName(),",",nObj.GetName())
   },
   DeleteFunc: func(obj interface{}) {
      mObj :=obj.(v1.Object)
      log.Print("刪除舊Pod:",mObj.GetName())
   },
})
informer.Run(stopCH)  

 

 
六、Work Queue(選用)
在開發並行程序時,需要頻繁的進行數據同步,本身golang擁有channel 機制,但不能滿足一些復雜場景的需求。例如:延時隊列、限速隊列。
client-go中提供了多種隊列以供選擇,可以勝任更多的場景。工作隊列會對存儲的對象進行去重,從而避免多個woker 處理同一個資源的情況。
用戶可以在回調函數里,將資源對象推送到WorkQueue(或其他隊列)中,也可以直接處理。
 

參考資料:

[1] https://kubernetes.io/docs/home/

[2] https://edu.aliyun.com/roadmap/cloudnative

[3] 鄭東旭《Kubernetes源碼剖析》

 
代碼示例:通過informer采集event並存入ES
package main

import (
   "bytes"
   "context"
   "fmt"
   "github.com/elastic/go-elasticsearch/v7"
   "github.com/elastic/go-elasticsearch/v7/esapi"
   "k8s.io/api/events/v1beta1"
   "k8s.io/apimachinery/pkg/runtime"
   "k8s.io/apimachinery/pkg/util/json"
   "k8s.io/client-go/informers"
   "k8s.io/client-go/kubernetes"
   "k8s.io/client-go/tools/cache"
   "k8s.io/client-go/tools/clientcmd"
   "math/rand"
   "strconv"
   "time"
)

func mustSuccess(err error) {
   if err != nil {
      panic(err)
   }
}

func main() {
   rand.Seed(time.Now().UnixNano())
   config, err := clientcmd.BuildConfigFromFlags("", "/Users/qiulingwei/Projects/kube-goclient/kubeconfig")
   mustSuccess(err)

   clientset, err := kubernetes.NewForConfig(config)
   mustSuccess(err)
   sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
   stopChan := make(chan struct{})
   defer close(stopChan)

   eventInformer := sharedInformers.Events().V1beta1().Events().Informer()
   addChan := make(chan v1beta1.Event)
   deleteChan := make(chan v1beta1.Event)
   eventInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
         unstructObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
         mustSuccess(err)
         event := &v1beta1.Event{}
         err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj, event)
         mustSuccess(err)
         addChan <- *event
      },
      UpdateFunc: func(oldObj, newObj interface{}) {
      },
      DeleteFunc: func(obj interface{}) {
      },
   }, 0)

   go func() {
      for  {
         select {
         case event := <- addChan:
            str, err := json.Marshal(&event)
            mustSuccess(err)
            esinsert(str)
            break
         case <-deleteChan:
            break
         }
      }
   }()
   eventInformer.Run(stopChan)
}

func esinsert(str []byte){
   cfg := elasticsearch.Config{
      Addresses: []string{
         "xxxxxx",
         "xxxxx",
         "xxxxx",
      },
      Username: "xxx",
      Password: "xxxxxx",
   }
   es, _ := elasticsearch.NewClient(cfg)
   req := esapi.CreateRequest{
      Index:        "qlw-index",
      DocumentType: "_doc",
      DocumentID:   strconv.FormatInt(time.Now().Unix(),10) + strconv.Itoa(rand.Int()),
      Body:         bytes.NewReader(str),
   }
   res, err := req.Do(context.Background(), es)
   defer res.Body.Close()
   if err!=nil  {
      fmt.Println(res.String())
   }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM