Kubernetes client-go Indexer / ThreadSafeStore 源碼分析


概述Indexer 接口ThreadSafeStorethreadSafeMap.Xxx()Index() 等實現Index() 方法ByIndex() 方法IndexKeys() 方法Replace() 方法

概述

源碼版本信息

  • Project: kubernetes
  • Branch: master
  • Last commit id: d25d741c
  • Date: 2021-09-26

我們在《Kubernetes client-go 源碼分析 - 開篇》里提到了自定義控制器涉及到的 client-go 組件整體工作流程,大致如下圖:

Indexer 主要依賴於 ThreadSafeStore 實現,是 client-go 提供的一種緩存機制,通過檢索本地緩存可以有效降低 apiserver 的壓力,今天我們來詳細看下 Indexer 和對應的 ThreadSafeStore 的實現。

Indexer 接口

Indexer 接口主要是在 Store 接口的基礎上拓展了對象的檢索功能

  • client-go/tools/cache/index.go:35
1type Indexer interface {
2   Store
3   Index(indexName string, obj interface{}) ([]interface{}, error) // 根據索引名和給定的對象返回符合條件的所有對象
4   IndexKeys(indexName, indexedValue string) ([]string, error)     // 根據索引名和索引值返回符合條件的所有對象的 key
5   ListIndexFuncValues(indexName string) []string                  // 列出索引函數計算出來的所有索引值
6   ByIndex(indexName, indexedValue string) ([]interface{}, error)  // 根據索引名和索引值返回符合條件的所有對象
7   GetIndexers() Indexers                     // 獲取所有的 Indexers,對應 map[string]IndexFunc 類型
8   AddIndexers(newIndexers Indexers) error    // 這個方法要在數據加入存儲前調用,添加更多的索引方法,默認只通過 namespace 檢索
9}

Indexer 的默認實現是 cache

1type cache struct {
2   cacheStorage ThreadSafeStore
3   keyFunc KeyFunc
4}

cache 對應兩個方法體實現完全一樣的 New 函數:

 1func NewStore(keyFunc KeyFunc) Store {
2   return &cache{
3      cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
4      keyFunc:      keyFunc,
5   }
6}
7
8func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
9   return &cache{
10      cacheStorage: NewThreadSafeStore(indexers, Indices{}),
11      keyFunc:      keyFunc,
12   }
13}

這里涉及到兩個類型:

  • KeyFunc
  • ThreadSafeStore

我們先看一下 Indexer 的 Add()、Update() 等方法是怎么實現的:

 1func (c *cache) Add(obj interface{}) error {
2   key, err := c.keyFunc(obj)
3   if err != nil {
4      return KeyError{obj, err}
5   }
6   c.cacheStorage.Add(key, obj)
7   return nil
8}
9
10func (c *cache) Update(obj interface{}) error {
11   key, err := c.keyFunc(obj)
12   if err != nil {
13      return KeyError{obj, err}
14   }
15   c.cacheStorage.Update(key, obj)
16   return nil
17}

可以看到這里的邏輯就是調用 keyFunc() 方法獲取 key,然后調用 cacheStorage.Xxx() 方法完成對應增刪改查過程。KeyFunc 類型是這樣定義的:

1type KeyFunc func(obj interface{}) (string, error)
2

也就是給一個對象,返回一個字符串類型的 key。KeyFunc 的一個默認實現如下:

 1func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
2    if key, ok := obj.(ExplicitKey); ok {
3        return string(key), nil
4    }
5    meta, err := meta.Accessor(obj)
6    if err != nil {
7        return "", fmt.Errorf("object has no meta: %v", err)
8    }
9    if len(meta.GetNamespace()) > 0 {
10        return meta.GetNamespace() + "/" + meta.GetName(), nil
11    }
12    return meta.GetName(), nil
13}

可以看到一般情況下返回值是 <namespace><name> ,如果 namespace 為空則直接返回 name。類似的還有一個叫做 IndexFunc 的類型,定義如下:

1type IndexFunc func(obj interface{}) ([]string, error)
2

這是給一個對象生成 Index 用的,一個通用實現如下,直接返回對象的 namespace 字段作為 Index

1func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
2   meta, err := meta.Accessor(obj)
3   if err != nil {
4      return []string{""}, fmt.Errorf("object has no meta: %v", err)
5   }
6   return []string{meta.GetNamespace()}, nil
7}

下面我們直接來看 cacheStorage 是如果實現增刪改查的。

ThreadSafeStore

ThreadSafeStore 是 Indexer 的核心邏輯所在,Indexer 的多數方法是直接調用內部 cacheStorage 屬性的方法實現的,同樣先看接口定義:

  • client-go/tools/cache/thread_safe_store.go:41
1type ThreadSafeStore interface {   Add(key string, obj interface{})   Update(key string, obj interface{})   Delete(key string)   Get(key string) (item interface{}, exists bool)   List() []interface{}   ListKeys() []string   Replace(map[string]interface{}, string)   Index(indexName string, obj interface{}) ([]interface{}, error)   IndexKeys(indexName, indexKey string) ([]string, error)   ListIndexFuncValues(name string) []string   ByIndex(indexName, indexKey string) ([]interface{}, error)   GetIndexers() Indexers   AddIndexers(newIndexers Indexers) error   Resync() error // 過期了,沒有具體代碼邏輯}

對應實現:

1type threadSafeMap struct {   lock  sync.RWMutex   items map[string]interface{}   indexers Indexers   indices Indices}

這里的 Indexers 和 Indices 是:

1type Index map[string]sets.Stringtype Indexers map[string]IndexFunctype Indices map[string]Index

對照圖片理解一下這幾個字段的關系:Indexers 里存的是 Index 函數 map,一個典型的實現是字符串 namespace 作為 key,IndexFunc 類型的實現 MetaNamespaceIndexFunc 函數作為 value,也就是我們希望通過 namespace 來檢索時,通過 Indexers 可以拿到對應的計算 Index 的函數,接着拿着這個函數,把對象穿進去,就可以計算出這個對象對應的 key,在這里也就是具體的 namespace 值,比如 default、kube-system 這種。然后在 Indices 里存的也是一個 map,key 是上面計算出來的 default 這種 namespace 值,value 是一個 set,而 set 表示的是這個 default namespace 下的一些具體 pod 的 <namespace>/<name> 這類字符串。最后拿着這種 key,就可以在 items 里檢索到對應的對象了。

threadSafeMap.Xxx()

比如 Add() 方法代碼如下:

1func (c *threadSafeMap) Add(key string, obj interface{}) {   c.lock.Lock()   defer c.lock.Unlock()   oldObject := c.items[key] // c.items 是 map[string]interface{} 類型   c.items[key] = obj // 在 items map 里添加這個對象   c.updateIndices(oldObject, obj, key) // 下面分析}

可以看到更復雜的邏輯在 updateIndices 方法里,我們繼續來看:

  • client-go/tools/cache/thread_safe_store.go:256
1func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {   // 添加場景這里是 nil,如果是更新,就需要刪除舊對象的索引了   if oldObj != nil {      c.deleteFromIndices(oldObj, key) // 刪除操作后面具體看   }  for name, indexFunc := range c.indexers { // 從 Indexers 里拿到索引函數,比如 "namespace":MetaNamespaceIndexFunc      indexValues, err := indexFunc(newObj) // 通過 MetaNamespaceIndexFunc 計算得到 namespace,比如 "default"      if err != nil {         panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))      }      index := c.indices[name] // 拿到一個 Index,對應類型 map[string]sets.String      if index == nil {         index = Index{}         c.indices[name] = index // 如果 map 不存在則初始化一個      }      for _, indexValue := range indexValues { // "default"         set := index[indexValue] // 檢索 "default" 下的 set,對應一個集合,多個 pod 信息         if set == nil {            set = sets.String{}            index[indexValue] = set // 如果為空則初始化一個         }         set.Insert(key) // key 也就是類似 "default/pod_1" 這樣的字符串,保存到 set 里,也就完成了 key + obj 的 Add 過程      }   }}

上面還提到了一個 deleteFromIndices 方法,前半段和上面邏輯上類似的,最后拿到 set 后不同於上面的 Insert 過程,這里調用了一個 Delete。

1func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {   for name, indexFunc := range c.indexers {      indexValues, err := indexFunc(obj)      if err != nil {         panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))      }      index := c.indices[name]      if index == nil {         continue      }      for _, indexValue := range indexValues {         set := index[indexValue]         if set != nil {            set.Delete(key) // set 中刪除這個 key            if len(set) == 0 {               delete(index, indexValue)            }         }      }   }}

Index() 等實現

最后看幾個具體方法等實現

Index() 方法

來看一下 Index() 方法的實現,Index() 方法的作用是給定一個 obj 和 indexName,比如 pod1和 "namespace",然后返回 pod1 所在 namespace 下的所有 pod。

  • client-go/tools/cache/thread_safe_store.go:141
1func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {   c.lock.RLock()   defer c.lock.RUnlock()   indexFunc := c.indexers[indexName] // 提取索引函數,比如通過 "namespace" 提取到 MetaNamespaceIndexFunc   if indexFunc == nil {      return nil, fmt.Errorf("Index with name %s does not exist", indexName)   }   indexedValues, err := indexFunc(obj) // 對象丟進去拿到索引值,比如 "default"   if err != nil {      return nil, err   }   index := c.indices[indexName] // indexName 例如 "namespace",這里可以查到 Index   var storeKeySet sets.String   if len(indexedValues) == 1 {      // 多數情況對應索引值為1到場景,比如用 namespace 時,值就是唯一的      storeKeySet = index[indexedValues[0]]   } else {      // 對應不為1場景      storeKeySet = sets.String{}      for _, indexedValue := range indexedValues {         for key := range index[indexedValue] {            storeKeySet.Insert(key)         }      }   }   list := make([]interface{}, 0, storeKeySet.Len())   // storeKey 也就是 "default/pod_1" 這種字符串,通過其就可以到 items map 里提取需要的 obj 了   for storeKey := range storeKeySet {      list = append(list, c.items[storeKey])   }   return list, nil}

ByIndex() 方法

相比 Index(),這個函數要簡單的多,直接傳遞 indexedValue,也就不需要通過 obj 去計算 key 了,例如 indexName == namespace & indexValue == default 就是直接檢索 default 下的資源對象。

1func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {   c.lock.RLock()   defer c.lock.RUnlock()   indexFunc := c.indexers[indexName]   if indexFunc == nil {      return nil, fmt.Errorf("Index with name %s does not exist", indexName)   }   index := c.indices[indexName]   set := index[indexedValue]   list := make([]interface{}, 0, set.Len())   for key := range set {      list = append(list, c.items[key])   }   return list, nil}

IndexKeys() 方法

和上面返回 obj 列表不同,這里只返回 key 列表,就是 []string{"default/pod_1"} 這種數據

1func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {   c.lock.RLock()   defer c.lock.RUnlock()   indexFunc := c.indexers[indexName]   if indexFunc == nil {      return nil, fmt.Errorf("Index with name %s does not exist", indexName)   }   index := c.indices[indexName]   set := index[indexedValue]   return set.List(), nil}

Replace() 方法

Replace() 的實現簡單粗暴,給一個新 items map,直接替換到 threadSafeMap.items 中,然后重建索引。

1func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {   c.lock.Lock()   defer c.lock.Unlock()   c.items = items   // rebuild any index   c.indices = Indices{}   for key, item := range c.items {      c.updateIndices(nil, item, key)   }}

(轉載請保留本文原始鏈接 https://www.danielhu.cn


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM