k8s OutOfPod等類型問題排查


現象

用戶通過在deployment中配置nodeName字段來直接綁定pod到特定的節點做相關測試驗證,此時該deployment對應的pod不斷被創建出來,短時間被創建出數千個pod,且狀態均為Outofpods

Kubernetes release-1.16

總結

用戶通過在deployment中直接配置nodeName字段嘗試綁定特定節點運行pod,而default調度器會忽略nodeName字段不為空的pod,由kubelet listAndWatch到pod之后篩選nodeName為自己的hostname的pod,放到syncLoop循環處理,其中具有一個Admit過程通過把scheduler的generalPredicate算法在(node, pod)上面執行一次,如果失敗且pod不是critical pod,那么這個pod會被reject,更新pod的status.phase為failed並上報event,同時rs controller的reconcile邏輯中會忽略status.phase為failed的pod,因此會創建出一個新pod,導致整個過程不斷循環。

分析

default scheduler

如果pod中直接指定nodeName,default scheduler將直接忽略該pod,如下:Run方法中通過scheduler.New創建一個Scheduler對象,New方法中通過AddAllEventHandlers方法為scheduler的podInformer設置EventHandler,其中通過assignedPod方法判斷的nodeName是否為空以及responsibleForPod方法判斷pod是否配置其他調度器來調度

可以看到為Pod Informer設置了兩次EventHandler,第一個為更新scheduler的pod緩存,第二個則為把pod添加進調度隊列

informer中的sharedProcess通過持有多個processorListener,支持配置多個EventHandler,細節可參考:https://www.cnblogs.com/orchidzjl/p/14768781.html

# cmd/kube-scheduler/app/server.go @Run
func AddAllEventHandlers(
	sched *Scheduler,
	schedulerName string,
	nodeInformer coreinformers.NodeInformer,
	podInformer coreinformers.PodInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	serviceInformer coreinformers.ServiceInformer,
	storageClassInformer storageinformersv1.StorageClassInformer,
	csiNodeInformer storageinformersv1beta1.CSINodeInformer,
) {
	// scheduled pod cache
	podInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch t := obj.(type) {
				case *v1.Pod:
					return assignedPod(t)
				case cache.DeletedFinalStateUnknown:
					if pod, ok := t.Obj.(*v1.Pod); ok {
						return assignedPod(pod)
					}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToCache,
				UpdateFunc: sched.updatePodInCache,
				DeleteFunc: sched.deletePodFromCache,
			},
		},
	)
	// unscheduled pod queue
	podInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch t := obj.(type) {
				case *v1.Pod:
          //如果Pod的spec.nodeName字段不為空或者spec.schedulerName不是default
					return !assignedPod(t) && responsibleForPod(t, schedulerName)
				case cache.DeletedFinalStateUnknown:
					if pod, ok := t.Obj.(*v1.Pod); ok {
						return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
					}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToSchedulingQueue,
				UpdateFunc: sched.updatePodInSchedulingQueue,
				DeleteFunc: sched.deletePodFromSchedulingQueue,
			},
		},

kubelet

我們知道Kubelet結構體中的syncLoopIteration方法接收多個channel數據源的信息,並調用相應的handler處理,其中configCh是kubelet獲取pod的數據源,比如通過informer從api中拉取到一個新的Pod,對應的handler為HandlePodAdditions

configCh的數據來源有多個,如下makePodSourceConfig方法為configCh添加多個數據源,其中一個就是api,通過配置spec.nodeName FieldSelector的ListOptions來向api獲取所有ns下指定nodeName的pod

# pkg/kubelet/kubelet.go@makePodSourceConfig
	if kubeDeps.KubeClient != nil {
		klog.Infof("Watching apiserver")
		if updatechannel == nil {
			updatechannel = cfg.Channel(kubetypes.ApiserverSource)
		}
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
	}

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
	newSourceApiserverFromLW(lw, updates)
}

// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
	optionsModifier := func(options *metav1.ListOptions) {
		options.FieldSelector = fieldSelector.String()
	}
	return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

開啟 syncLoop循環處理目標pod

# pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.RESTORE:
			klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
			// These are pods restored from the checkpoint. Treat them as new
			// pods.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.Errorf("Kubelet does not support snapshot update")
		}
	return true
}

HandlePodAdditions方法中會根據CreationTime對待處理的pod排序逐個處理,並通過podIsTerminated判斷該pod是否需要處理,如果需要則通過canAdmitPod調用各個handler的Admit方法判斷,如果存在一個失敗,則調用rejectPod直接更新pod為failed

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()

		if !kl.podIsTerminated(pod) {
			// Only go through the admission process if the pod is not terminated.

			// We failed pods that we rejected, so activePods include all admitted pods that are alive.
			activePods := kl.filterOutTerminatedPods(existingPods)

			// Check if we can admit the pod; if not, reject it.
      // canAdmitPod將判斷該Pod是否可以被kubelet創建,如果Admit失敗,那么通過rejectPod方法處理
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
	}
}

// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manage.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
	kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
	kl.statusManager.SetPodStatus(pod, v1.PodStatus{
		Phase:   v1.PodFailed,
		Reason:  reason,
		Message: "Pod " + message})
}
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
	// Check the cached pod status which was set after the last sync.
	status, ok := kl.statusManager.GetPodStatus(pod.UID)
  // notRunning方法檢查pod中所有ContainerStatus,如果存在一個podstatus.State.Terminated == nil && status.State.Waiting == nil則認為該pod(至少一個容器)running
	return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}

HandlePodAdditions接下來通過canAdmitPod方法判斷該Pod是否可以被kubelet創建,其通過kubelet對象的admitHandlers獲取注冊好的handler對象,並逐一調用這些對象的Admit方法檢查pod

// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" are all admitted pods
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
// pods為kubelet中 正在運行的 && 已經被admit確認過沒問題的 && 不是terminated的 pod
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
	// the kubelet will invoke each pod admit handler in sequence
	// if any handler rejects, the pod is rejected.
	attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
	for _, podAdmitHandler := range kl.admitHandlers {
		if result := podAdmitHandler.Admit(attrs); !result.Admit {
			return false, result.Reason, result.Message
		}
	}

	return true, "", ""
}

而kubelet對象的admitHandlers是在如下NewMainKubelet方法中注冊,默認注冊兩個admitHandlers

# pkg/kubelet/kubelet.go @NewMainKubelet
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)

	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))

lifecycle.NewPredicateAdmitHandler方法通過criticalPodAdmissionHandler生成一個admitHandler

  • 在該handler中需要根據node信息來admit pod,比如node的resources allocatable是否滿足pod resources、node的 labels是否match pod的nodeSelector label

klet.getNodeAnyWay:總是能返回一個Node對象的方法,先從apiserver(其實是informer cache)中獲取node,如果獲取不到則直接返回一個initial node:該node的信息由kubelet的啟動參數初始化,比如label只包含Hostname、OS、Arch等

// getNodeAnyWay() must return a *v1.Node which is required by RunGeneralPredicates().
// The *v1.Node is obtained as follows:
// Return kubelet's nodeInfo for this node, except on error or if in standalone mode,
// in which case return a manufactured nodeInfo representing a node with no pods,
// zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
	if kl.kubeClient != nil {
		if n, err := kl.nodeInfo.GetNodeInfo(string(kl.nodeName)); err == nil {
			return n, nil
		}
	}
	return kl.initialNode()
}
  • 在該handler中admit pod失敗后,如果改pod類型是critical pod,則需要根據AdmissionFailureHandler來決定如何處理該pod,否則直接reject pod(即設置pod的status.phase為failed)

preemption.NewCriticalPodAdmissionHandler創建出一個CriticalPodAdmissionHandler對象,該對象的HandleAdmissionFailure方法知道當一個critical類型的pod被admit失敗之后該如何處理

predicateAdmitHandler的Admit方法是真正admit一個pod的邏輯

func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
  //通過kulet的getNodeAnyWay獲取一個Node
	node, err := w.getNodeAnyWayFunc()
  //待處理的pod
	admitPod := attrs.Pod
  //節點上已經被admit過可行的其他非terminated的pod
	pods := attrs.OtherPods
  //獲取一個調度器中用到的NodeInfo對象,該對象知道節點的requestedResource、allocatableResource等信息
	nodeInfo := schedulernodeinfo.NewNodeInfo(pods...)
  //利用node的status來初始化scheduler NodeInfo對象中的allocatableResource等信息
	nodeInfo.SetNode(node)

	// Remove the requests of the extended resources that are missing in the
	// node info. This is required to support cluster-level resources, which
	// are extended resources unknown to nodes.
	//
	// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
	// node-level extended resource it requires is not found, then kubelet will
	// not fail admission while it should. This issue will be addressed with
	// the Resource Class API in the future.
  //去除那些集群層面但是節點並不感知的extended resources
	podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
	//利用調度器模塊中的GeneralPredicates方法檢查pod是否滿足其中的所有predicate方法
	fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
	if !fit {
    //HandleAdmissionFailure中判斷pod如果是critical pod,且失敗原因僅為相關資源不足,則嘗試驅逐其他pod來釋放資源
		fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
		if err != nil {
			message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
			klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
			return PodAdmitResult{
				Admit:   fit,
				Reason:  "UnexpectedAdmissionError",
				Message: message,
			}
		}
	}
	if !fit {
		// If there are failed predicates, we only return the first one as a reason.
		r := reasons[0]
		switch re := r.(type) {
		case *predicates.PredicateFailureError:
			reason = re.PredicateName
			message = re.Error()
			klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
    //如果GeneralPredicates返回不合適的原因是資源不足,那么對於非critical的pod來說就返回對應的PodAdmitResult
		case *predicates.InsufficientResourceError:
			reason = fmt.Sprintf("OutOf%s", re.ResourceName)
			message = re.Error()
			klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
		case *predicates.FailureReason:
			reason = re.GetReason()
			message = fmt.Sprintf("Failure: %s", re.GetReason())
			klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
		return PodAdmitResult{
			Admit:   fit,
			Reason:  reason,
			Message: message,
		}
	}
}

GeneralPredicates中通過noncriticalPredicates和EssentialPredicates兩組預選算法來判斷pod是否合適,其中noncriticalPredicates中就包含PodFitsResources方法,而EssentialPredicates包含PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector

// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need
func GeneralPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
   var predicateFails []PredicateFailureReason
   for _, predicate := range []FitPredicate{noncriticalPredicates, noncriticalPredicates} {
      fit, reasons, err := predicate(pod, meta, nodeInfo)
      if err != nil {
         return false, predicateFails, err
      }
      if !fit {
         predicateFails = append(predicateFails, reasons...)
      }
   }

   return len(predicateFails) == 0, predicateFails, nil
}

PodFitsResources堅持pod需要的資源和NodeInfo是否能夠滿足

// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	node := nodeInfo.Node()

	// 如果node中當前運行的pod+1>最大允許的pod,則添加一個failed reason
	allowedPodNumber := nodeInfo.AllowedPodNumber()
	if len(nodeInfo.Pods())+1 > allowedPodNumber {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
	}
  // aliyun gpu share之類的extended resource
	var podRequest *schedulernodeinfo.Resource
	if predicateMeta, ok := meta.(*predicateMetadata); ok {
		podRequest = predicateMeta.podRequest
		if predicateMeta.ignoredExtendedResources != nil {
			ignoredExtendedResources = predicateMeta.ignoredExtendedResources
		}
	} else {
		// We couldn't parse metadata - fallback to computing it.
		podRequest = GetResourceRequest(pod)
	}
  // cpu
	allocatable := nodeInfo.AllocatableResource()
	if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
	}
  // memory
	if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
	}
  // ephemeralStorage
	if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
	}

	return len(predicateFails) == 0, predicateFails, nil
}

HandleAdmissionFailure方法判斷pod如果是critical pod,且失敗原因僅為相關資源不足,則嘗試驅逐其他pod來釋放資源,其中IsCriticalPod方法判斷pod是否critical pod

// IsCriticalPod returns true if pod's priority is greater than or equal to SystemCriticalPriority.
func IsCriticalPod(pod *v1.Pod) bool {
  //靜態pod
	if IsStaticPod(pod) {
		return true
	}
  //pod設置了優先級且大於2倍的用戶配置的Highest User Definable Priority
	if pod.Spec.Priority != nil && IsCriticalPodBasedOnPriority(*pod.Spec.Priority) {
		return true
	}
	return false
}

// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error) {
  //判斷是否critical pod
	if !kubetypes.IsCriticalPod(admitPod) {
		return false, failureReasons, nil
	}
	// InsufficientResourceError is not a reason to reject a critical pod.
	// Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
	nonResourceReasons := []predicates.PredicateFailureReason{}
	resourceReasons := []*admissionRequirement{}
	for _, reason := range failureReasons {
		if r, ok := reason.(*predicates.InsufficientResourceError); ok {
			resourceReasons = append(resourceReasons, &admissionRequirement{
				resourceName: r.ResourceName,
				quantity:     r.GetInsufficientAmount(),
			})
		} else {
			nonResourceReasons = append(nonResourceReasons, reason)
		}
	}
	if len(nonResourceReasons) > 0 {
		// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
		return false, nonResourceReasons, nil
	}
  //如果admit失敗的reason都是因為InsufficientResource,那么嘗試驅逐pod來釋放相關資源
	err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
	// if no error is returned, preemption succeeded and the pod is safe to admit.
	return err == nil, nil, err
}
rs controller

前面分析了kubelet中會通過scheduler的generalPredicate中的PodFitResources方法來判斷pod是否合適在節點中運行,如果predicate失敗且不是critical pod,那么kubelet會向api更新該pod的status.phase為failed,而現象中pod是不斷的被rs controller創建出來,過程如下:

controller-manager啟動所有注冊的controller

# cmd/kube-controller-manager/app/controllermanager.go @Run
// Run方法通過StartControllers啟動所有注冊好的controller
		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
			klog.Fatalf("error starting controllers: %v", err)
		}

//NewControllerInitializers方法返回一個map,分別對應每個controller的啟動方法
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc.  This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
  ......
	controllers["endpoint"] = startEndpointController
	controllers["endpointslice"] = startEndpointSliceController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController
  ......
	return controllers
}

啟動rs controller

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
		return nil, false, nil
	}
	go replicaset.NewReplicaSetController(
    //向factor中添加rs和pod informer
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
		replicaset.BurstReplicas,
	).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
	return nil, true, nil
}

rs controller的reconcile方法,其中會list出所有的pod,並過濾掉那些inactive的pod,如pod的Status.Phase為Succeed或者Failed的或者被標記為刪除的(DeletionTimestamp != nil),所以對以一個status.phase為failed的pod,rs會忽略並創建一個新的pod

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
  //從informer cache中獲取rs
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
  //判斷當前rs是否需要處理
	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)

	// list all pods to include the pods that don't match the rs`s selector
	// anymore but has the stale controller ref.
	// TODO: Do the List and Filter in a single pass, or use an index.
  //直接列出所有的pod,而不是根據rs的selector
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	// Ignore inactive pods.
  //過濾掉inactive的pod,
	filteredPods := controller.FilterActivePods(allPods)

	// NOTE: filteredPods are pointing to objects from cache - if you need to
	// modify them, you need to copy it first.
  //rs嘗試收養那些可能匹配的pod
	filteredPods, err = rsc.claimPods(rs, selector, filteredPods)

	var manageReplicasErr error
  //調整rs的副本數
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
	}
	rs = rs.DeepCopy()
  // 更新rs status
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

	// Always updates status as pods come up or die.
	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
}


// FilterActivePods returns pods that have not terminated.
func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
	var result []*v1.Pod
	for _, p := range pods {
		if IsPodActive(p) {
			result = append(result, p)
		} else {
			klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
				p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
		}
	}
	return result
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM