kubernetes垃圾回收器GarbageCollector源碼分析(一)


kubernetes版本:1.13.2

背景

由於operator創建的redis集群,在kubernetes apiserver重啟后,redis集群被異常刪除(包括redis exporter statefulset、redis statefulset)。刪除后operator將其重建,重新組建集群,實例IP發生變更(中間件容器化,我們開發了固定IP,當statefulset刪除后,IP會被回收),導致創建集群失敗,最終集群不可用。

經多次復現,apiserver重啟后,通過查詢redis operator日志,並沒有發現主動去刪除redis集群(redis statefulset)、監控實例(redis exporter)。進一步去查看kube-controller-manager的日志,將其日志級別設置--v=5,繼續復現,最終在kube-controller-manager日志中發現如下日志:
在這里插入圖片描述
可以看到是garbage collector觸發刪除操作的。這個問題在apiserver正常的時候是不存在,要想弄其究竟,就得看看kube-controller-manager內置組件garbage collector這個控制器的邏輯。

由於內容偏長,分為多節來講:
①、monitors作為生產者將變化的資源放入graphChanges隊列;同時restMapper定期檢測集群內資源類型,刷新monitors
②、runProcessGraphChangesgraphChanges隊列中取出變化的item,根據情況放入attemptToDelete隊列;runAttemptToDeleteWorker取出處理垃圾資源;
③、runProcessGraphChangesgraphChanges隊列中取出變化的item,根據情況放入attemptToOrphan隊列;runAttemptToOrphanWorker取出處理該該孤立的資源;
在這里插入圖片描述


正文

想要啟用GC,需要在kube-apiserverkube-controller-manager的啟動參數中都設置--enable-garbage-collectortrue,1.13.2版本中默認開啟GC

需要注意:兩組件該參數必須保持同步。


kube-controller-manager啟動入口,app.NewControllerManagerCommand()中加載controller manager默認啟動參數,創建* cobra.Command對象:

func main() {
    	rand.Seed(time.Now().UnixNano())
    	//加載controller manager默認啟動參數,創建* cobra.Command對象
    	command := app.NewControllerManagerCommand()
    	//......省略.......
    	//執行cobra.command,並啟動controller-manager
    	if err := command.Execute(); err != nil {
    		fmt.Fprintf(os.Stderr, "%v\n", err)
    		os.Exit(1)
    	}
}

以下代碼處去啟動kube-controller-manager
在這里插入圖片描述
NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)加載各個控制器的配置:

//NewKubeControllerManagerOptions使用默認配置創建一個新的KubeControllerManagerOptions
func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
	//加載各個控制器的默認配置
	componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
	if err != nil {
		return nil, err
	}

	s := KubeControllerManagerOptions{
		Generic:         cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
		//.....省略
		GarbageCollectorController: &GarbageCollectorControllerOptions{
			ConcurrentGCSyncs:      componentConfig.GarbageCollectorController.ConcurrentGCSyncs,
			EnableGarbageCollector: componentConfig.GarbageCollectorController.EnableGarbageCollector,
		},
		//.....省略
	}
	//gc忽略的資源對象列表
	gcIgnoredResources := make([]kubectrlmgrconfig.GroupResource, 0, len(garbagecollector.DefaultIgnoredResources()))
	for r := range garbagecollector.DefaultIgnoredResources() {
		gcIgnoredResources = append(gcIgnoredResources, kubectrlmgrconfig.GroupResource{Group: r.Group, Resource: r.Resource})
	}
	s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources
	return &s, nil
}
// NewDefaultComponentConfig返回kube-controller管理器配置對象
func NewDefaultComponentConfig(insecurePort int32) (kubectrlmgrconfig.KubeControllerManagerConfiguration, error) {
	scheme := runtime.NewScheme()
	if err := kubectrlmgrschemev1alpha1.AddToScheme(scheme); err != nil {
		return kubectrlmgrconfig.KubeControllerManagerConfiguration{}, err
	}
	if err := kubectrlmgrconfig.AddToScheme(scheme); err != nil {
		return kubectrlmgrconfig.KubeControllerManagerConfiguration{}, err
	}

	versioned := kubectrlmgrconfigv1alpha1.KubeControllerManagerConfiguration{}
	//加載默認參數
	scheme.Default(&versioned)

	internal := kubectrlmgrconfig.KubeControllerManagerConfiguration{}
	if err := scheme.Convert(&versioned, &internal, nil); err != nil {
		return internal, err
	}
	internal.Generic.Port = insecurePort
	return internal, nil
}
// 根據Object,獲取提供的默認參數
func (s *Scheme) Default(src Object) {
	if fn, ok := s.defaulterFuncs[reflect.TypeOf(src)]; ok {
		fn(src)
	}
}

s.defaulterFuncs類型為map[reflect.Type]func(interface{}),用於根據指針類型獲取默認值函數。該map中的數據從哪里來的呢?

代碼位於src\k8s.io\kubernetes\pkg\controller\apis\config\v1alpha1\zz_generated.defaults.go
在這里插入圖片描述
可以看到默認參數中garbage collector中默認開啟gc(EnableGarbageCollector),並發數為20(ConcurrentGCSyncs)

func SetDefaults_GarbageCollectorControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.GarbageCollectorControllerConfiguration) {
	if obj.EnableGarbageCollector == nil {
		obj.EnableGarbageCollector = utilpointer.BoolPtr(true)
	}
	if obj.ConcurrentGCSyncs == 0 {
		obj.ConcurrentGCSyncs = 20
	}
}

回到Run函數,里面調用了NewControllerInitializers啟動所有控制器:
在這里插入圖片描述
重點來到啟動garbage collector的startGarbageCollectorController函數:

func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
	//k8s 1.13.2中默認為true,可在kube-apiserver和kube-controller-manager的啟動參數中加--enable-garbage-conllector=false設置
	//需保證這兩個組件中參數值一致
	if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
		return nil, false, nil
	}

	//k8s各種原生資源對象客戶端集合(默認啟動參數中用SimpleControllerClientBuilder構建)
	gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
	discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())

	//生成rest config
	config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		return nil, true, err
	}

	// Get an initial set of deletable resources to prime the garbage collector.
	//獲取一組初始可刪除資源以填充垃圾收集器。
	deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
	ignoredResources := make(map[schema.GroupResource]struct{})

	//忽略gc的資源類型
	for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
		ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
	}
	garbageCollector, err := garbagecollector.NewGarbageCollector(
		dynamicClient,
		ctx.RESTMapper,
		deletableResources,
		ignoredResources,
		ctx.InformerFactory,
		ctx.InformersStarted,
	)
	if err != nil {
		return nil, true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
	}

	// Start the garbage collector.
	//啟動參數中默認是20個協程
	workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
	//啟動monitors和deleteWorkers、orphanWorkers
	go garbageCollector.Run(workers, ctx.Stop)

	// Periodically refresh the RESTMapper with new discovery information and sync
	// the garbage collector.
	//使用新的發現信息定期刷新RESTMapper並同步垃圾收集器。
	go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)

	//gc提供debug dot grap依賴關系圖接口
	return garbagecollector.NewDebugHandler(garbageCollector), true, nil
}

該函數主要作用有:
1、deletableResources := garbagecollector.GetDeletableResources(discoveryClient)獲取集群內所有可刪除的資源對象;排除掉忽略的資源對象。
2、構建garbageCollector結構體對象;
3、garbageCollector.Run(workers, ctx.Stop)啟動一個monitors用來監聽資源對象的變化(對應的由runProcessGraphChanges死循環處理),和默認20個deleteWorkers協程處理可刪除的資源對象、20個orphanWorkers協程處理孤兒對象。
4、garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) 定時去獲取一個集群內是否有新類型的資源對象的加入,並重新刷新monitors,以監聽新類型的資源對象。
5、garbagecollector.NewDebugHandler(garbageCollector)注冊debug接口,用來提供獲取dot流程圖接口:

curl http://127.0.0.1:10252/debug/controllers/garbagecollector/graph?uid=11211212edsaddkqedmk12

使用graphviz提供的dot.exe可以生成svg格式的圖,可用google瀏覽器查看如下:
在這里插入圖片描述

// curl http://127.0.0.1:10252/debug/controllers/garbagecollector/graph?uid=11211212edsaddkqedmk12
func (h *debugHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.URL.Path != "/graph" {
		http.Error(w, "", http.StatusNotFound)
		return
	}

	var graph graph.Directed
	if uidStrings := req.URL.Query()["uid"]; len(uidStrings) > 0 {
		uids := []types.UID{}
		for _, uidString := range uidStrings {
			uids = append(uids, types.UID(uidString))
		}
		graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraphForObj(uids...)

	} else {
		graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraph()
	}

	//生成dot流程圖數據,用graphviz工具中的dot.exe工具轉換為svg圖(用google瀏覽器打開)或者png圖
	//API參考:https://godoc.org/gonum.org/v1/gonum/graph
	//graphviz下載地址:https://graphviz.gitlab.io/_pages/Download/Download_windows.html
	//dot.exe test.dot -T svg -o test.svg
	data, err := dot.Marshal(graph, "full", "", "  ", false)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Write(data)
	w.WriteHeader(http.StatusOK)
}

在這里插入圖片描述
GarbageCollector通過restMapper定期重置可刪除的資源類型,更新GraphBuilder中的monitors,monitors將創建所有資源類型的變更通知回調函數,將變化的資源對象加入到GraphBuilder的graphChanges隊列,GraphBuilder的runProcessGraphChanges()會一直從隊列中獲取變化,構建一個緩存對象之間依賴關系的圖形,以及觸發dependencyGraphBuilder將可能被垃圾收集的對象排隊到attemptToDelete隊列,並將其依賴項需要孤立的對象排隊到attemptToOrphan隊列。GarbageCollector具有使用這兩個隊列的工作人員runAttemptToDeleteWorker和runAttemptToOrphanWorker死循環,分別從attemptToDelete隊列和attemptToOrphan隊列取出,向API服務器發送請求以相應地刪除更新對象。

// GarbageCollector運行反射器來監視托管API對象的更改,將結果匯總到單線程dependencyGraphBuilder,
// 構建一個緩存對象之間依賴關系的圖形。由圖變化觸發,dependencyGraphBuilder將可能被垃圾收集的對象
// 排隊到`attemptToDelete`隊列,並將其依賴項需要孤立的對象排隊到`attemptToOrphan`隊列。
// GarbageCollector具有使用這兩個隊列的工作人員,向API服務器發送請求以相應地刪除更新對象。
// 請注意,讓dependencyGraphBuilder通知垃圾收集器確保垃圾收集器使用至少與發送通知一樣最新的圖形進行操作。
type GarbageCollector struct {
	// resettableRESTMapper是一個RESTMapper,它能夠在discovery資源類型時重置自己
	restMapper    resettableRESTMapper
	// dynamicClient提供操作集群內所有資源對象的接口方法,包括k8s內置、CRD生成的自定義資源
	dynamicClient dynamic.Interface
	//垃圾收集器嘗試在時間成熟時刪除attemptToDelete隊列中的item
	attemptToDelete workqueue.RateLimitingInterface
	//垃圾收集器嘗試孤立attemptToOrphan隊列中item的依賴項,然后刪除item
	attemptToOrphan        workqueue.RateLimitingInterface
	dependencyGraphBuilder *GraphBuilder
	// 有owner的資源對象,才會給absentOwnerCache填充不存在的Owner信息
	absentOwnerCache *UIDCache
	sharedInformers  informers.SharedInformerFactory

	workerLock sync.RWMutex
}
// GraphBuilder:基於informers提供的事件,GraphBuilder更新
// uidToNode,一個緩存我們所知的依賴關系的圖,並將
// 項放入attemptToDelete和attemptToOrphan隊列
type GraphBuilder struct {
	restMapper meta.RESTMapper

	//每個監視器列表/監視資源,結果匯集到dependencyGraphBuilder
	monitors    monitors
	monitorLock sync.RWMutex
	// informersStarted is closed after after all of the controllers have been initialized and are running.
	// After that it is safe to start them here, before that it is not.
	// informersStarted在所有控制器初始化並運行后關閉。之后在這里啟動它們是安全的,在此之前它不是。
	informersStarted <-chan struct{}

	// stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
	// This channel is also protected by monitorLock.
	// stopCh驅動器關閉當來自它的接收解除阻塞時,監視器將關閉。 此channel也受monitorLock保護。
	stopCh <-chan struct{}

	// running tracks whether Run() has been called.
	// it is protected by monitorLock.
	//運行軌道是否已調用Run()它受monitorLock保護。
	running bool

	dynamicClient dynamic.Interface
	// monitors are the producer of the graphChanges queue, graphBuilder alters
	// the in-memory graph according to the changes.
	// monitor是graphChanges隊列的生成者,graphBuilder根據更改改變了內存中的圖形。
	graphChanges workqueue.RateLimitingInterface

	// uidToNode doesn't require a lock to protect, because only the
	// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
	//uidToNode不需要鎖保護,因為只有單線程GraphBuilder.processGraphChanges()讀寫它。
	uidToNode *concurrentUIDToNode

	// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
	// GraphBuilder是attemptToDelete和attemptToOrphan的生產者,GC是消費者。
	attemptToDelete workqueue.RateLimitingInterface
	attemptToOrphan workqueue.RateLimitingInterface

	// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
	// be non-existent are added to the cached.
	// GraphBuilder和GC共享absentOwnerCache。已知不存在的對象將添加到緩存中。
	absentOwnerCache *UIDCache

	//所有k8s資源對象集的informer
	sharedInformers  informers.SharedInformerFactory

	//監視器忽略的資源對象集
	ignoredResources map[schema.GroupResource]struct{}
}

創建NewGarbageCollector結構體:

func NewGarbageCollector(
	dynamicClient dynamic.Interface,
	mapper resettableRESTMapper,
	deletableResources map[schema.GroupVersionResource]struct{},
	ignoredResources map[schema.GroupResource]struct{},
	sharedInformers informers.SharedInformerFactory,
	informersStarted <-chan struct{},
) (*GarbageCollector, error) {
	attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
	attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
	absentOwnerCache := NewUIDCache(500)
	gc := &GarbageCollector{
		dynamicClient:    dynamicClient,
		restMapper:       mapper,
		attemptToDelete:  attemptToDelete,
		attemptToOrphan:  attemptToOrphan,
		absentOwnerCache: absentOwnerCache,
	}
	gb := &GraphBuilder{
		dynamicClient:    dynamicClient,
		informersStarted: informersStarted,
		restMapper:       mapper,
		graphChanges:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
		uidToNode: &concurrentUIDToNode{
			uidToNode: make(map[types.UID]*node),
		},
		attemptToDelete:  attemptToDelete,
		attemptToOrphan:  attemptToOrphan,
		absentOwnerCache: absentOwnerCache,
		sharedInformers:  sharedInformers,
		ignoredResources: ignoredResources,
	}
	//初始化各個資源對象的monitors,啟動各資源對象的監聽器,變化時觸發回調,將其加入graphChanges 隊列
	if err := gb.syncMonitors(deletableResources); err != nil {
		utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
	}
	gc.dependencyGraphBuilder = gb

	return gc, nil
}

主要功能:
1、構建GarbageCollector結構體;
2、構建依賴結構圖維護結構體GraphBuilder,和GarbageCollector共用attemptToDelete和attemptToOrphan隊列,GraphBuilder作為生產着將適當資源放到attemptToDelete或者attemptToOrphan隊列,供GarbageCollector中的worker進行消費;
3、初始化各個資源對象的monitors,啟動各資源對象的監聽器,變化時觸發回調,將其加入graphChanges 隊列。

gb.syncMonitors(deletableResources)方法中最主要的是c, s, err := gb.controllerFor(resource, kind)

func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
	handlers := cache.ResourceEventHandlerFuncs{
		// add the event to the dependencyGraphBuilder's graphChanges.
		// 將事件添加到dependencyGraphBuilder的graphChanges中。
		AddFunc: func(obj interface{}) {
			event := &event{
				eventType: addEvent,
				obj:       obj,
				gvk:       kind,
			}
			gb.graphChanges.Add(event)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			// TODO: check if there are differences in the ownerRefs,
			// finalizers, and DeletionTimestamp; if not, ignore the update.
			//TODO:檢查ownerRefs, finalizers和DeletionTimestamp是否存在差異;如果沒有,請忽略更新。
			event := &event{
				eventType: updateEvent,
				obj:       newObj,
				oldObj:    oldObj,
				gvk:       kind,
			}
			gb.graphChanges.Add(event)
		},
		DeleteFunc: func(obj interface{}) {
			// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
			// delta fifo可以將對象包裝在cache.DeletedFinalStateUnknown中,解包它
			if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
				obj = deletedFinalStateUnknown.Obj
			}
			event := &event{
				eventType: deleteEvent,
				obj:       obj,
				gvk:       kind,
			}
			gb.graphChanges.Add(event)
		},
	}
	shared, err := gb.sharedInformers.ForResource(resource)
	if err == nil {
		klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
		// need to clone because it's from a shared cache
		shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
		return shared.Informer().GetController(), shared.Informer().GetStore(), nil
	} else {
		//獲取資源對象時出錯會到這里,比如非k8s內置RedisCluster、clusterbases、clusters、esclusters、volumeproviders、stsmasters、appapps、mysqlclusters、brokerclusters、clustertemplates;
		//內置的networkPolicies、apiservices、customresourcedefinitions
		klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
	}

	// TODO: consider store in one storage.
	// TODO: 考慮存儲在一個存儲中。
	klog.V(5).Infof("create storage for resource %s", resource)
	//上面失敗的資源對象的store和controller
	store, monitor := cache.NewInformer(
		listWatcher(gb.dynamicClient, resource),
		nil,
		ResourceResyncTime,
		// don't need to clone because it's not from shared cache
		//不需要克隆,因為它不是來自共享緩存
		handlers,
	)
	return monitor, store, nil
}

該方法主要功能是:
1、將新增、更改、刪除的資源對象構建為event結構體,放入GraphBuilder的graphChanges隊列里,最終被runProcessGraphChanges這個worker消費;
2、構建大多數內置資源的SharedInformerFactory,構建失敗的用cache.NewInformer構建(通過CRD定義的對象以及部分k8s內置對象)

代碼繼續回到k8s.io\kubernetes\cmd\kube-controller-manager\app\core.go中的startGarbageCollectorController中,看 garbageCollector.Run(workers, ctx.Stop)方法:

func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer gc.attemptToDelete.ShutDown()
	defer gc.attemptToOrphan.ShutDown()
	defer gc.dependencyGraphBuilder.graphChanges.ShutDown()

	klog.Infof("Starting garbage collector controller")
	defer klog.Infof("Shutting down garbage collector controller")

	//協程運行生產者monitors
	go gc.dependencyGraphBuilder.Run(stopCh)

	//等待dependencyGraphBuilder緩存開始同步
	if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
		return
	}

	//垃圾收集器:所有資源監視器都已同步。繼續收集垃圾
	klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")

	// gc workers
	//協程運行消費者DeleteWorkers和OrphanWorkers
	for i := 0; i < workers; i++ {
		//默認參數為20個並發協程嘗試delete worker
		go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
		//默認參數為20個並發協程嘗試orphan worker
		go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
	}

	<-stopCh
}

gc.dependencyGraphBuilder.Run(stopCh)主要功能:
1、gb.startMonitors()啟動監聽資源變化的informer;
2、wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)開啟從隊列GraphBuilder.graphChanges中消費的worker

啟動20個runAttemptToDeleteWorker和20個runAttemptToOrphanWorker

參考:

k8s官方文檔garbage-collection英文版:
https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/

依賴圖標生成庫gonum Api文檔:
https://godoc.org/gonum.org/v1/gonum/graph

graphviz下載:
https://graphviz.gitlab.io/_pages/Download/Download_windows.html



本公眾號免費提供csdn下載服務,海量IT學習資源,如果你准備入IT坑,勵志成為優秀的程序猿,那么這些資源很適合你,包括但不限於java、go、python、springcloud、elk、嵌入式 、大數據、面試資料、前端 等資源。同時我們組建了一個技術交流群,里面有很多大佬,會不定時分享技術文章,如果你想來一起學習提高,可以公眾號后台回復【2】,免費邀請加技術交流群互相學習提高,會不定期分享編程IT相關資源。


掃碼關注,精彩內容第一時間推給你

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM