現象
用戶通過在deployment中配置nodeName字段來直接綁定pod到特定的節點做相關測試驗證,此時該deployment對應的pod不斷被創建出來,短時間被創建出數千個pod,且狀態均為Outofpods
Kubernetes release-1.16
總結
用戶通過在deployment中直接配置nodeName字段嘗試綁定特定節點運行pod,而default調度器會忽略nodeName字段不為空的pod,由kubelet listAndWatch到pod之后篩選nodeName為自己的hostname的pod,放到syncLoop循環處理,其中具有一個Admit過程通過把scheduler的generalPredicate算法在(node, pod)上面執行一次,如果失敗且pod不是critical pod,那么這個pod會被reject,更新pod的status.phase為failed並上報event,同時rs controller的reconcile邏輯中會忽略status.phase為failed的pod,因此會創建出一個新pod,導致整個過程不斷循環。
分析
default scheduler
如果pod中直接指定nodeName,default scheduler將直接忽略該pod,如下:Run方法中通過scheduler.New創建一個Scheduler對象,New方法中通過AddAllEventHandlers方法為scheduler的podInformer設置EventHandler,其中通過assignedPod方法判斷的nodeName是否為空以及responsibleForPod方法判斷pod是否配置其他調度器來調度
可以看到為Pod Informer設置了兩次EventHandler,第一個為更新scheduler的pod緩存,第二個則為把pod添加進調度隊列
informer中的sharedProcess通過持有多個processorListener,支持配置多個EventHandler,細節可參考:https://www.cnblogs.com/orchidzjl/p/14768781.html
# cmd/kube-scheduler/app/server.go @Run
func AddAllEventHandlers(
sched *Scheduler,
schedulerName string,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
serviceInformer coreinformers.ServiceInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
) {
// scheduled pod cache
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache,
UpdateFunc: sched.updatePodInCache,
DeleteFunc: sched.deletePodFromCache,
},
},
)
// unscheduled pod queue
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
//如果Pod的spec.nodeName字段不為空或者spec.schedulerName不是default
return !assignedPod(t) && responsibleForPod(t, schedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
},
kubelet
我們知道Kubelet結構體中的syncLoopIteration方法接收多個channel數據源的信息,並調用相應的handler處理,其中configCh是kubelet獲取pod的數據源,比如通過informer從api中拉取到一個新的Pod,對應的handler為HandlePodAdditions
configCh的數據來源有多個,如下makePodSourceConfig方法為configCh添加多個數據源,其中一個就是api,通過配置spec.nodeName FieldSelector的ListOptions來向api獲取所有ns下指定nodeName的pod
# pkg/kubelet/kubelet.go@makePodSourceConfig
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
開啟 syncLoop循環處理目標pod
# pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
}
return true
}
HandlePodAdditions方法中會根據CreationTime對待處理的pod排序逐個處理,並通過podIsTerminated判斷該pod是否需要處理,如果需要則通過canAdmitPod調用各個handler的Admit方法判斷,如果存在一個失敗,則調用rejectPod直接更新pod為failed
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not terminated.
// We failed pods that we rejected, so activePods include all admitted pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
// canAdmitPod將判斷該Pod是否可以被kubelet創建,如果Admit失敗,那么通過rejectPod方法處理
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
}
}
// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manage.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(pod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: "Pod " + message})
}
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
// notRunning方法檢查pod中所有ContainerStatus,如果存在一個podstatus.State.Terminated == nil && status.State.Waiting == nil則認為該pod(至少一個容器)running
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
HandlePodAdditions接下來通過canAdmitPod方法判斷該Pod是否可以被kubelet創建,其通過kubelet對象的admitHandlers獲取注冊好的handler對象,並逐一調用這些對象的Admit方法檢查pod
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" are all admitted pods
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
// pods為kubelet中 正在運行的 && 已經被admit確認過沒問題的 && 不是terminated的 pod
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
return true, "", ""
}
而kubelet對象的admitHandlers是在如下NewMainKubelet方法中注冊,默認注冊兩個admitHandlers
# pkg/kubelet/kubelet.go @NewMainKubelet
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
lifecycle.NewPredicateAdmitHandler方法通過criticalPodAdmissionHandler生成一個admitHandler
- 在該handler中需要根據node信息來admit pod,比如node的resources allocatable是否滿足pod resources、node的 labels是否match pod的nodeSelector label
klet.getNodeAnyWay:總是能返回一個Node對象的方法,先從apiserver(其實是informer cache)中獲取node,如果獲取不到則直接返回一個initial node:該node的信息由kubelet的啟動參數初始化,比如label只包含Hostname、OS、Arch等
// getNodeAnyWay() must return a *v1.Node which is required by RunGeneralPredicates().
// The *v1.Node is obtained as follows:
// Return kubelet's nodeInfo for this node, except on error or if in standalone mode,
// in which case return a manufactured nodeInfo representing a node with no pods,
// zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
if kl.kubeClient != nil {
if n, err := kl.nodeInfo.GetNodeInfo(string(kl.nodeName)); err == nil {
return n, nil
}
}
return kl.initialNode()
}
- 在該handler中admit pod失敗后,如果改pod類型是critical pod,則需要根據AdmissionFailureHandler來決定如何處理該pod,否則直接reject pod(即設置pod的status.phase為failed)
preemption.NewCriticalPodAdmissionHandler創建出一個CriticalPodAdmissionHandler對象,該對象的HandleAdmissionFailure方法知道當一個critical類型的pod被admit失敗之后該如何處理
predicateAdmitHandler的Admit方法是真正admit一個pod的邏輯
func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
//通過kulet的getNodeAnyWay獲取一個Node
node, err := w.getNodeAnyWayFunc()
//待處理的pod
admitPod := attrs.Pod
//節點上已經被admit過可行的其他非terminated的pod
pods := attrs.OtherPods
//獲取一個調度器中用到的NodeInfo對象,該對象知道節點的requestedResource、allocatableResource等信息
nodeInfo := schedulernodeinfo.NewNodeInfo(pods...)
//利用node的status來初始化scheduler NodeInfo對象中的allocatableResource等信息
nodeInfo.SetNode(node)
// Remove the requests of the extended resources that are missing in the
// node info. This is required to support cluster-level resources, which
// are extended resources unknown to nodes.
//
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
//去除那些集群層面但是節點並不感知的extended resources
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
//利用調度器模塊中的GeneralPredicates方法檢查pod是否滿足其中的所有predicate方法
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
if !fit {
//HandleAdmissionFailure中判斷pod如果是critical pod,且失敗原因僅為相關資源不足,則嘗試驅逐其他pod來釋放資源
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
}
if !fit {
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *predicates.PredicateFailureError:
reason = re.PredicateName
message = re.Error()
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
//如果GeneralPredicates返回不合適的原因是資源不足,那么對於非critical的pod來說就返回對應的PodAdmitResult
case *predicates.InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
case *predicates.FailureReason:
reason = re.GetReason()
message = fmt.Sprintf("Failure: %s", re.GetReason())
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: reason,
Message: message,
}
}
}
GeneralPredicates中通過noncriticalPredicates和EssentialPredicates兩組預選算法來判斷pod是否合適,其中noncriticalPredicates中就包含PodFitsResources方法,而EssentialPredicates包含PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector
// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need
func GeneralPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
var predicateFails []PredicateFailureReason
for _, predicate := range []FitPredicate{noncriticalPredicates, noncriticalPredicates} {
fit, reasons, err := predicate(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
}
return len(predicateFails) == 0, predicateFails, nil
}
PodFitsResources堅持pod需要的資源和NodeInfo是否能夠滿足
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
node := nodeInfo.Node()
// 如果node中當前運行的pod+1>最大允許的pod,則添加一個failed reason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
// aliyun gpu share之類的extended resource
var podRequest *schedulernodeinfo.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok {
podRequest = predicateMeta.podRequest
if predicateMeta.ignoredExtendedResources != nil {
ignoredExtendedResources = predicateMeta.ignoredExtendedResources
}
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = GetResourceRequest(pod)
}
// cpu
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
// memory
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
// ephemeralStorage
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
}
return len(predicateFails) == 0, predicateFails, nil
}
HandleAdmissionFailure方法判斷pod如果是critical pod,且失敗原因僅為相關資源不足,則嘗試驅逐其他pod來釋放資源,其中IsCriticalPod方法判斷pod是否critical pod
// IsCriticalPod returns true if pod's priority is greater than or equal to SystemCriticalPriority.
func IsCriticalPod(pod *v1.Pod) bool {
//靜態pod
if IsStaticPod(pod) {
return true
}
//pod設置了優先級且大於2倍的用戶配置的Highest User Definable Priority
if pod.Spec.Priority != nil && IsCriticalPodBasedOnPriority(*pod.Spec.Priority) {
return true
}
return false
}
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error) {
//判斷是否critical pod
if !kubetypes.IsCriticalPod(admitPod) {
return false, failureReasons, nil
}
// InsufficientResourceError is not a reason to reject a critical pod.
// Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
nonResourceReasons := []predicates.PredicateFailureReason{}
resourceReasons := []*admissionRequirement{}
for _, reason := range failureReasons {
if r, ok := reason.(*predicates.InsufficientResourceError); ok {
resourceReasons = append(resourceReasons, &admissionRequirement{
resourceName: r.ResourceName,
quantity: r.GetInsufficientAmount(),
})
} else {
nonResourceReasons = append(nonResourceReasons, reason)
}
}
if len(nonResourceReasons) > 0 {
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
return false, nonResourceReasons, nil
}
//如果admit失敗的reason都是因為InsufficientResource,那么嘗試驅逐pod來釋放相關資源
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
// if no error is returned, preemption succeeded and the pod is safe to admit.
return err == nil, nil, err
}
rs controller
前面分析了kubelet中會通過scheduler的generalPredicate中的PodFitResources方法來判斷pod是否合適在節點中運行,如果predicate失敗且不是critical pod,那么kubelet會向api更新該pod的status.phase為failed,而現象中pod是不斷的被rs controller創建出來,過程如下:
controller-manager啟動所有注冊的controller
# cmd/kube-controller-manager/app/controllermanager.go @Run
// Run方法通過StartControllers啟動所有注冊好的controller
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
//NewControllerInitializers方法返回一個map,分別對應每個controller的啟動方法
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
......
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
......
return controllers
}
啟動rs controller
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
go replicaset.NewReplicaSetController(
//向factor中添加rs和pod informer
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
rs controller的reconcile方法,其中會list出所有的pod,並過濾掉那些inactive的pod,如pod的Status.Phase為Succeed或者Failed的或者被標記為刪除的(DeletionTimestamp != nil),所以對以一個status.phase為failed的pod,rs會忽略並創建一個新的pod
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
//從informer cache中獲取rs
namespace, name, err := cache.SplitMetaNamespaceKey(key)
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
//判斷當前rs是否需要處理
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
//直接列出所有的pod,而不是根據rs的selector
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
// Ignore inactive pods.
//過濾掉inactive的pod,
filteredPods := controller.FilterActivePods(allPods)
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
//rs嘗試收養那些可能匹配的pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
var manageReplicasErr error
//調整rs的副本數
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
// 更新rs status
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
}
// FilterActivePods returns pods that have not terminated.
func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
var result []*v1.Pod
for _, p := range pods {
if IsPodActive(p) {
result = append(result, p)
} else {
klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
}
}
return result
}