圖解kubernetes調度器搶占流程與算法設計


搶占調度是分布式調度中一種常見的設計,其核心目標是當不能為高優先級的任務分配資源的時候,會通過搶占低優先級的任務來進行高優先級的調度,本文主要學習k8s的搶占調度以及里面的一些有趣的算法

1. 搶占調度設計

1.1 搶占原理

搶占調度原理其實很簡單就是通過高優先級的pod搶占低優先級的pod資源,從而滿足高優先pod的調度

1.2 中斷預算

在kubernetes中為了保證服務盡可能的高可用,設計PDB(PodDisruptionBudget)其核心目標就是在保證對應pod在指定的數量,主要是為了保證服務的可用性,在進行搶占的過程中,應盡可能遵守該設計,盡量不去搶占有PDB的資源,避免因為搶占導致服務的不可用

1.3 優先級反轉

優先級反轉是信號量里面的一種機制即因為低優先級任務的運行阻塞高優先級的任務運行

在k8s中搶占調度是通過高優先級搶占低優先級pod,如果高優先級pod依賴低優先級pod, 則會因為依賴問題,導致優先級失效,所以應該盡可能減少高優先級pod對低優先級的pod的依賴, 后面進行篩選源碼分析時可以看到

1.4 搶占選擇算法

搶占選擇算法是指的通過搶占部分節點后,如何從被搶占的node數組中篩選出一個node節點,目前k8s中主要實現了5個算法

1.4.1 最少違反PDB

即最少違反PDB規則

1.4.2 最高優先級最小優先

比較所有node的最高優先級的pod,找到優先級最低的node

1.4.3 優先級總和最低優先

計算每個node上面的被搶占的pod優先級之和,選擇優先級和最低的節點

1.4.4 最少搶占數量優先

計算需要搶占的節點數量最少的節點優先

1.4.5 最近更新節點優先

比較每個node中被驅逐的pod中最早啟動的pod的啟動時間,最近啟動的pod的節點,會被選擇

2. 源碼設計

2.1 搶占核心流程

image.png
搶占的流程主要是通過Preempt來實現,其針對預選失敗的節點來進行驅逐某些低優先級的pod來滿足高優先級pod

func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
    // 只允許預選失敗的pod進行重試
	fitError, ok := scheduleErr.(*FitError)
	if !ok || fitError == nil {
		return nil, nil, nil, nil
	}
    // 是否允許搶占其他提議的pod
	if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
		klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
		return nil, nil, nil, nil
	}
    // 獲取當前集群中的所有node
	allNodes := g.cache.ListNodes()
	if len(allNodes) == 0 {
		return nil, nil, nil, ErrNoNodesAvailable
	}
    // 初步篩選潛在的可以進行搶占操作的node
	potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
	if len(potentialNodes) == 0 {
		klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
		// In this case, we should clean-up any existing nominated node name of the pod.
		return nil, nil, []*v1.Pod{pod}, nil
	}
    // 獲取所有pdb
	pdbs, err := g.pdbLister.List(labels.Everything())
	if err != nil {
		return nil, nil, nil, err
	}
    // 針對之前初步篩選的node嘗試進行搶占和預選操作,返回結果中包含所有可以通過搶占低優先級pod完成pod調度的node節點與搶占的pod
	nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
		g.predicateMetaProducer, g.schedulingQueue, pdbs)
	if err != nil {
		return nil, nil, nil, err
	}

	// 調用extenders進行再一輪的篩選
	nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
	if err != nil {
		return nil, nil, nil, err
	}

    // 從篩選結果中選擇最適合搶占的node
	candidateNode := pickOneNodeForPreemption(nodeToVictims)
	if candidateNode == nil {
		return nil, nil, nil, nil
	}

	// 如果candidateNode不為nil,則找到一個最優的執行搶占操作的node, 返回低優先的提議的pod
    // 還有搶占的pod和當前節點
	nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
	if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
		return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
	}

	return nil, nil, nil, fmt.Errorf(
		"preemption failed: the target node %s has been deleted from scheduler cache",
		candidateNode.Name)
}

2.2 搶占條件檢測

如果發現需要執行搶占的pod有提名的node並且對應node上面存在比自己優先級低的pod正在進行刪除, 則不允許進行搶占

func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool {
	if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
		klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
		return false
	}
	nomNodeName := pod.Status.NominatedNodeName
	if len(nomNodeName) > 0 {
		if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
			podPriority := util.GetPodPriority(pod)
			for _, p := range nodeInfo.Pods() {
				if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority {
                    // 正在終止的優先級低於當前pod的pod就不會進行搶占
					return false
				}
			}
		}
	}
	return true
}

2.3 篩選潛在節點

image.png
每個node在預選階段都會進行一個標記,標記當前node執行預選失敗的原因,篩選潛在節點主要是根據對應的錯誤來進行篩選,如果不是不可解決的預選錯誤,則該node節點就可以參與接下來的搶占階段

func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
	potentialNodes := []*v1.Node{}
    // 根據預選階段的錯誤原因,如果不存在無法解決的錯誤,則這些node可能在接下來的搶占流程中被使用
	for _, node := range nodes {
		if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable {
			continue
		}
		failedPredicates, _ := fitErr.FailedPredicates[node.Name]
		if !unresolvablePredicateExists(failedPredicates) { 
            // 如果我們發現並不是不可解決的調度錯誤的時候,就講這個節點加入到這里
            // 可能通過后續的調整會讓這些node重新滿足
			klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
			potentialNodes = append(potentialNodes, node)
		}
	}
	return potentialNodes
}

不可通過調整的預選失敗原因

var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{
	predicates.ErrNodeSelectorNotMatch:      {},
	predicates.ErrPodAffinityRulesNotMatch:  {},
	predicates.ErrPodNotMatchHostName:       {},
	predicates.ErrTaintsTolerationsNotMatch: {},
	predicates.ErrNodeLabelPresenceViolated: {},
	// 省略大部分,感興趣的可以自己關注下
}

2.4 並行篩選節點

image.png
篩選搶占節點主要是並行對之前篩選潛在node進行嘗試,通過驅逐低優先級pod滿足高優先級pod調度,最終會篩選一批可以通過搶占來滿足pod調度需要的節點, 其核心實現時通過selectVictimsOnNode來進行檢測,繼續往下看

func (g *genericScheduler) selectNodesForPreemption(
	pluginContext *framework.PluginContext,
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
	potentialNodes []*v1.Node,
	fitPredicates map[string]predicates.FitPredicate,
	metadataProducer predicates.PredicateMetadataProducer,
	queue internalqueue.SchedulingQueue,
	pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
	nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
	var resultLock sync.Mutex

	// We can use the same metadata producer for all nodes.
	meta := metadataProducer(pod, nodeNameToInfo)
	checkNode := func(i int) {
		nodeName := potentialNodes[i].Name
		var metaCopy predicates.PredicateMetadata
		if meta != nil {
			metaCopy = meta.ShallowCopy()
		}
		pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
		if fits {
			resultLock.Lock()
			victims := schedulerapi.Victims{
				Pods:             pods,
				NumPDBViolations: numPDBViolations,
			}
			nodeToVictims[potentialNodes[i]] = &victims
			resultLock.Unlock()
		}
	}
	workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
	return nodeToVictims, nil
}

2.5 單點篩選流程

selectVictimsOnNode即單點篩選流程是針對單個node來指向具體的驅逐搶占決策的流程, 其核心流程如下
image.png

2.5.1 優先級篩選

優先級篩選首先會對當前node上面的所有節點進行優先級排序,移除所有比當前pod低的pod

potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
	nodeInfoCopy := nodeInfo.Clone()

	removePod := func(rp *v1.Pod) {
		nodeInfoCopy.RemovePod(rp)
		if meta != nil {
			meta.RemovePod(rp, nodeInfoCopy.Node())
		}
	}
	addPod := func(ap *v1.Pod) {
		nodeInfoCopy.AddPod(ap)
		if meta != nil {
			meta.AddPod(ap, nodeInfoCopy)
		}
	}
	podPriority := util.GetPodPriority(pod)
	for _, p := range nodeInfoCopy.Pods() {
		if util.GetPodPriority(p) < podPriority {
            // 移除所有優先級比自己低的pod
			potentialVictims.Items = append(potentialVictims.Items, p)
			removePod(p)
		}
	}

2.5.2 預選判斷

對移除所有優先級比自己的pod之后,會嘗試進行預選流程,如果發現預選流程失敗,則當前node即使通過移除所有比自己優先級低的pod也不能滿足調度需求,則就進行下一個node判斷

if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
		if err != nil {
			klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
		}

		return nil, 0, false
	}

2.5.3 PDB分組與分組算法

PDB分組就是對當前節點上篩選出來的低優先級pod按照是否有PDB匹配來進行分組,分為違反PDB和未違反PDB的兩組

violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)

分組算法其實也不難,只需要遍歷所有的pdb和pod就可以得到最終的分組

func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
	for _, obj := range pods {
		pod := obj.(*v1.Pod)
		pdbForPodIsViolated := false
		// A pod with no labels will not match any PDB. So, no need to check.
		if len(pod.Labels) != 0 {
			for _, pdb := range pdbs {
				if pdb.Namespace != pod.Namespace {
					continue
				}
				selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
				if err != nil {
					continue
				}
				// A PDB with a nil or empty selector matches nothing.
				if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
					continue
				}
				// We have found a matching PDB.
				if pdb.Status.PodDisruptionsAllowed <= 0 {
					pdbForPodIsViolated = true
					break
				}
			}
		}
		if pdbForPodIsViolated {
			violatingPods = append(violatingPods, pod)
		} else {
			nonViolatingPods = append(nonViolatingPods, pod)
		}
	}
	return violatingPods, nonViolatingPods
}

2.5.4 違反PDB計數與最少驅逐匯總

會分別對違反PDB和不違反的pod集合來進行reprievePod檢測,如果加入當前pod后,不能滿足預選篩選流程,則該pod則必須被進行移除加入到victims中, 同時如果是違反PDB的pod則需要進行違反pdb計數numViolatingVictim

  reprievePod := func(p *v1.Pod) bool { 
        // 我們首先將pod加入到meta中
		addPod(p)
		fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
		// 
        if !fits {
            // 如果我們加入了pod然后導致了預選不成功,則這個pod必須給移除
			removePod(p)
			victims = append(victims, p) // 添加到我們需要移除的列表里面
			klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
		}
		return fits
	}
	for _, p := range violatingVictims {
		if !reprievePod(p) {
			numViolatingVictim++
		}
	}
	// Now we try to reprieve non-violating victims.
	for _, p := range nonViolatingVictims {
        // 嘗試移除未違反pdb的pod
		reprievePod(p)
	}
	return victims, numViolatingVictim, true

2.6 篩選最優搶占

最優篩選主要是通過pickOneNodeForPreemption實現,其中篩選數據存儲結構主要是通過重用minNodes1和minNodes2兩段內存來進行實現,這兩個node數組分別配有兩個計數器lenNodes1和lenNodes2, 針對具有相同優先級、相同數量的node,每增加一個會進行一次計數器累加, 核心算法流程如下
image.png

2.6.1 最少違反PDB

最少違反PDB是根據前面統計的違反PDB的計數統計,找到最少違反的node,如果是單個node則直接返回篩選結束

	minNumPDBViolatingPods := math.MaxInt32
	var minNodes1 []*v1.Node
	lenNodes1 := 0
	for node, victims := range nodesToVictims {
		if len(victims.Pods) == 0 {
            // 如果發現一個noed不需要任何搶占,則返回它
			return node
		}
		numPDBViolatingPods := victims.NumPDBViolations
		if numPDBViolatingPods < minNumPDBViolatingPods { 
            // 如果小於最小pdb數量, 如果數量發生變化,就重置
			minNumPDBViolatingPods = numPDBViolatingPods
			minNodes1 = nil
			lenNodes1 = 0
		}
		if numPDBViolatingPods == minNumPDBViolatingPods { 
            // 多個相同的node會進行追加,並累加計數器 
			minNodes1 = append(minNodes1, node)
			lenNodes1++
		}
	}
	if lenNodes1 == 1 {
		return minNodes1[0]
	}

2.6.2 最高優先級最小優先

最高優先級最小優先是指通過對比多個node的最高優先級的pod,優先級最低的那個node被選中,如果多個則進行下一個算法

minHighestPriority := int32(math.MaxInt32)
	var minNodes2 = make([]*v1.Node, lenNodes1)
	lenNodes2 := 0
	for i := 0; i < lenNodes1; i++ {
		node := minNodes1[i]
		victims := nodesToVictims[node]
		// highestPodPriority is the highest priority among the victims on this node.
        // 返回優先級最高的pod
		highestPodPriority := util.GetPodPriority(victims.Pods[0])
		if highestPodPriority < minHighestPriority {
            // 重置狀態
			minHighestPriority = highestPodPriority
			lenNodes2 = 0
		}
        
		if highestPodPriority == minHighestPriority {
            // 如果優先級相等則加入進去
			minNodes2[lenNodes2] = node
			lenNodes2++
		}
	}
	if lenNodes2 == 1 {
		return minNodes2[0]
	}

2.6.3 優先級總和最低優先

統計每個node上的所有被搶占的pod的優先級的總和,然后在多個node之間進行比較,優先級總和最低的節點被選中

minSumPriorities := int64(math.MaxInt64)
	lenNodes1 = 0
	for i := 0; i < lenNodes2; i++ {
		var sumPriorities int64
		node := minNodes2[i]
        // 統計所有優先級
		for _, pod := range nodesToVictims[node].Pods {
			
            sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
		}
		if sumPriorities < minSumPriorities {
			minSumPriorities = sumPriorities
			lenNodes1 = 0
		}
		if sumPriorities == minSumPriorities {
			minNodes1[lenNodes1] = node
			lenNodes1++
		}
	}
    // 最少優先級的node
	if lenNodes1 == 1 {
		return minNodes1[0]
	}

2.6.4 最少搶占數量優先

最少搶占數量優先即統計每個node被搶占的節點數量,數量最少得被選中

	minNumPods := math.MaxInt32
	lenNodes2 = 0
	for i := 0; i < lenNodes1; i++ {
		node := minNodes1[i]
		numPods := len(nodesToVictims[node].Pods)
		if numPods < minNumPods {
			minNumPods = numPods
			lenNodes2 = 0
		}
		if numPods == minNumPods {
			minNodes2[lenNodes2] = node
			lenNodes2++
		}
	}
    // 最少節點數量
	if lenNodes2 == 1 {
		return minNodes2[0]
	}

2.6.5 最近更新節點優先

該算法會篩選每個node驅逐的pod中優先級最高的pod的最早更新時間(其實就是說這個pod早就被創建了),然后在多個node之間進行比較,如果誰上面的時間越新(即這個node上的pod可能是最近被調度上去的),則就選中這個節點

 latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
	if latestStartTime == nil {
		// If the earliest start time of all pods on the 1st node is nil, just return it,
		// which is not expected to happen.
        // 如果第一個節點上所有pod的最早開始時間為零,那么返回它
		klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
		return minNodes2[0]
	}
	nodeToReturn := minNodes2[0]
	for i := 1; i < lenNodes2; i++ {
		node := minNodes2[i]
		// Get earliest start time of all pods on the current node.
        // 獲取當前node最早啟動時間
		earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
		if earliestStartTimeOnNode == nil {
			klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
			continue
		}
		if earliestStartTimeOnNode.After(latestStartTime.Time) {
			latestStartTime = earliestStartTimeOnNode
			nodeToReturn = node
		}
	}

	return nodeToReturn

閱讀總結

因為是純的算法流程,並沒有復雜的數據結構,大家看看就好,調度器的設計可能就看到這了,后面把之前的都串起來,算是一個總結,如果有興趣我就再看看 SchedulerExtender和framework的設計, 其實學習scheduler調度器部分只是因為自己對分布式調度這塊比較好奇,而且自己有運維開發的經驗,這對pod調度類似場景並不陌生,看起來總的來說相對容易一點,而且我只分析了核心的數據結構和算法,還有幾個階段,為了避免陷入對kubenretes一些復雜邏輯的處理,我都盡量簡化邏輯,就是希望即時不去看k8s scheduler的代碼,也能有所收獲

微信號:baxiaoshi2020
關注公告號閱讀更多源碼分析文章 21天大棚
更多文章關注 www.sreguide.com
本文由博客一文多發平台 OpenWrite 發布


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM