轉載 luozhiyun的博客:https://www.luozhiyun.com
Informer機制#
機制設計#
Informer主要有兩個作用:
- 通過一種叫作 ListAndWatch 的方法,把 APIServer 中的 API 對象緩存在了本地,並負責更新和維護這個緩存。ListAndWatch通過 APIServer 的 LIST API“獲取”所有最新版本的 API 對象;然后,再通過 WATCH API 來“監聽”所有這些 API 對象的變化;
- 注冊相應的事件,之后如果監聽到的事件變化就會調用事件對應的EventHandler,實現回調。
Kubernetes中都是各種controller的實現,各種controller都會用到Informer
Informer運行原理如下:
根據流程圖來解釋一下Informer中幾個組件的作用:
- Reflector:用於監控指定的k8s資源,當資源發生變化時,觸發相應的變更事件,如Added事件、Updated事件、Deleted事件,並將器資源對象放到本地DeltaFIFO Queue中;
- DeltaFIFO:DeltaFIFO是一個先進先出的隊列,可以保存資源對象的操作類型;
- Indexer:用來存儲資源對象並自帶索引功能的本地存儲,Reflector從DeltaFIFO中將消費出來的資源對象存儲至Indexer;
Reflector 包會和 apiServer 建立長連接,並使用 ListAndWatch 方法獲取並監聽某一個資源的變化。List 方法將會獲取某個資源的所有實例,Watch 方法則監聽資源對象的創建、更新以及刪除事件,然后將事件放入到DeltaFIFO Queue中;
然后Informer會不斷的從 Delta FIFO Queue 中 pop 增量事件,並根據事件的類型來決定新增、更新或者是刪除本地緩存;接着Informer 根據事件類型來觸發事先注冊好的 Event Handler觸發回調函數,然后然后將該事件丟到 Work Queue 這個工作隊列中。
實例#
go get -u -v kgo get -u -v k8s.io/client-go/..8s.io/client-go/..
git clone https://github.com/huweihuang/client-go.git cd client-go #保證本地HOME目錄有配置kubernetes集群的配置文件 go run client-go.go
將到了go-client部分的代碼,我們可以直接通過實例來進行上手跑動,Informers Example代碼示例如下:
package main import ( "flag" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "log" "path/filepath" "time" ) func main() { var kubeconfig *string //如果是windows,那么會讀取C:\Users\xxx\.kube\config 下面的配置文件 //如果是linux,那么會讀取~/.kube/config下面的配置文件 if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } stopCh := make(chan struct{}) defer close(stopCh) //表示每分鍾進行一次resync,resync會周期性地執行List操作 sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute) informer := sharedInformers.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("New Pod Added to Store: %s", mObj.GetName()) }, UpdateFunc: func(oldObj, newObj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("Pod Deleted from Store: %s", mObj.GetName()) }, }) informer.Run(stopCh) }
要運行這段代碼,需要我們將k8s服務器上的~/.kube代碼拷貝到本地,我是win10的機器所以拷貝到C:\Users\xxx\.kube
中。
informers.NewSharedInformerFactory會傳入兩個參數,第1個參數clientset是用於與k8s apiserver交互的客戶端,第2個參數是代表每分鍾會執行一次resync,resync會周期性執行List將所有資源存放再Informer Store中,如果該參數是0,則禁用resync功能。
通過informer.AddEventHandler函數可以為pod資源添加資源事件回調方法,支持3種資源事件回調方法:
- AddFunc
- UpdateFunc
- DeleteFunc
通過名稱我們就可以知道是新增、更新、刪除時會回調這些方法。
在我們初次執行run方法的時候,可以會將監控的k8s上pod存放到本地,並回調AddFunc方法,如下日志:
2020/10/17 15:13:10 New Pod Added to Store: dns-test 2020/10/17 15:13:10 New Pod Added to Store: web-1 2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph 2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2 2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k 2020/10/17 15:13:10 New Pod Added to Store: mongodb 2020/10/17 15:13:10 New Pod Added to Store: web-0 ....
這里我用一張圖總結一下informer的Run方法流程: