k8s-scheduler源碼分析


1.調度過程

  • K8S的scheduler的主要作用是將用戶申請的pods調度到合適的node節點上。具體的來說,就是它通過監聽API server提供的watch等接口,獲取到未調度的pods和node的相關信息,通過對node的篩選,選擇出最合適的也就是優先級最高的node節點,將其與pods進行綁定,並將綁定的結果固化到etcd中去。

  • kubernetes Scheduler 運行在 master 節點,它的核心功能是監聽 apiserver 來獲取 PodSpec.NodeName 為空的 pod,然后為每個這樣的 pod 創建一個 binding 指示 pod 應該調度到哪個節點上。

    從哪里讀取還沒有調度的 pod 呢?當然是 apiserver。怎么知道 pod 沒有調度呢?它會向 apiserver 請求 spec.nodeName 字段為空的 pod,然后調度得到結果之后,把結果寫入 apiserver。

  • Scheduler是Kubernetes的調度器,其作用是根據特定的調度算法和策略將Pod調度到指定的計算節點(Node)上,其做為單獨的程序運行,啟動之后會一直監聽API Server,獲取PodSpec.NodeName為空的Pod,對每個Pod都會創建一個綁定(binding)。

    普通用戶可以把Scheduler可理解為一個黑盒,黑盒的輸入為待調度的Pod和全部計算節點的信息,經過黑盒內部的調度算法和策略處理,輸出為最優的節點,而后將Pod調度該節點上 。

  • 如果在預選(Predicates)過程中,如果所有的節點都不滿足條件,Pod 會一直處在Pending 狀態,直到有節點滿足條件,這期間調度器會不斷的重試。經過節點過濾后,如多個節點滿足條件,會按照節點優先級(priorities)大小對節點排序,最后選擇優先級最高的節點部署Pod。

  • 優選(Priorities)

    經過預選策略(Predicates)對節點過濾后,獲取節點列表,再對符合需求的節點列表進行打分,最終選擇Pod調度到一個分值最高的節點。Kubernetes用一組優先級函數處理每一個通過預選的節點(kubernetes/plugin/pkg/scheduler/algorithm/priorities中實現)。每一個優先級函數會返回一個0-10的分數,分數越高表示節點越優, 同時每一個函數也會對應一個表示權重的值。最終主機的得分用以下公式計算得出:

    finalScoreNode = (weight1 * priorityFunc1) + (weight2 * priorityFunc2) + … + (weightn * priorityFuncn)

scheduler作為一個客戶端,從apiserver中讀取到需要分配的pod,和擁有的node,然后進行過濾和算分,最后把這個匹配信息通過apiserver寫入到etcd里面,供下一步的kubelet去拉起pod使用。

調度的過程是這樣的:

客戶端通過 kuberctl 或者 apiserver 提交資源創建的請求,不管是 deployment、replicaset、job 還是 pod,最終都會產生要調度的 pod。
Scheduler它通過監聽API server提供的watch等接口,獲取到未調度的pods和node的相關信息,循環遍歷地為每個 pod 分配節點
調度器會保存集群節點的信息。對每一個 pod,調度器先過濾掉不滿足 pod 運行條件的節點,這個過程是 Predicate
通過過濾的節點,調度器會根據一定的算法給它們打分,確定它們的優先級順序,並選擇分數最高的節點作為結果
調度器根據最終選擇出來的節點,將其與pods進行綁定,並將綁定的結果固化到etcd中去。

2.schedulerone

假設我們的集群通過創建 scheduler 的配置文件,加載默認的調度算法,通過啟動時指定的 policy source 加載 config,已經初始化好了scheduler對象sched。現在我們開始調用run方法,這個run循環執行scheduleOne方法。

//k8s.io/kubernetes/pkg/scheduler/scheduler.go:313
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

scheduleOne() 每次對一個 pod 進行調度,主要有以下步驟:

  • 從 scheduler 調度隊列中取出一個 pod,如果該 pod 處於刪除狀態則跳過
  • 執行調度邏輯 sched.schedule() 返回通過預算及優選算法過濾后選出的最佳 node
  • 如果過濾算法沒有選出合適的 node,則返回 core.FitError
  • 若沒有合適的 node 會判斷是否啟用了搶占策略,若啟用了則執行搶占機制
  • pod 對應的 spec.NodeName 寫上 scheduler 最終選擇的 node,更新 scheduler cache
  • 請求 apiserver 異步處理最終的綁定操作,寫入到 etcd
//k8s.io/kubernetes/pkg/scheduler/scheduler.go:515
func (sched *Scheduler) scheduleOne() {
    fwk := sched.Framework
//調用NextPod()方法,來獲取下一個還未調度的pods的信息
    pod := sched.NextPod()
    if pod == nil {
        return
    }
    // 1.判斷pods是否已經被刪除,如果已經被刪除,那么直接返回
    if pod.DeletionTimestamp != nil {
        ......
    }

    // 2.執行調度策略選擇 node,來獲取合適的node
    start := time.Now()
    pluginContext := framework.NewPluginContext()
    scheduleResult, err := sched.schedule(pod, pluginContext)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            // 3.這里是調度失敗的結果,如果調度失敗了,那么他就會在一定的時間內將這個pods在此加載回調度的pods隊列中。若啟用搶占機制則執行
            if sched.DisablePreemption {
                ......
            } else {
                preemptionStartTime := time.Now()
                sched.preempt(pluginContext, fwk, pod, fitError)
                ......
            }
            ......
            metrics.PodScheduleFailures.Inc()
        } else {
            klog.Errorf("error selecting node for pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        }
        return
    }
    ......
    assumedPod := pod.DeepCopy()

    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming volumes: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }


    if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        .....
    }

    // 4.為 pod 設置 NodeName 字段,更新 scheduler 緩存
     //在選擇出來合適的node節點的時候,先將這個node 的資源分配給它,但是暫時並不將其綁定,固化到etcd中,而是先將其存起來,但是這里注意,
    //node的資源已經假設分給pods了,node的資源信息就發生了變化,這是為了可以多線程的往etcd中固化信息,可以提高效率。
    //將 host 填入到 pod spec字段的nodename,假定分配到對應的節點上
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        ......
    }

    // 5.異步請求 apiserver,異步處理最終的綁定操作,寫入到 etcd
    go func() {
        err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
            if err != nil {
                ......
                return
            }
        }
            ......
            
        }
    }()
}

3.scheduler

scheduleOne() 中通過調用 sched.schedule() 來執行預選與優選算法處理:

//k8s.io/kubernetes/pkg/scheduler/scheduler.go:337
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
    result, err := sched.Algorithm.Schedule(pod, pluginContext)
    if err != nil {
    ......
    }
    return result, err
}

sched.Algorithm 是一個 interface,主要包含四個方法,GenericScheduler 是其具體的實現:

//k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:131
type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
    Preempt(*framework.PluginContext, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    Predicates() map[string]predicates.FitPredicate
    Prioritizers() []priorities.PriorityConfig
}
  • Schedule():正常調度邏輯,包含預算與優選算法的執行
  • Preempt():搶占策略,在 pod 調度發生失敗的時候嘗試搶占低優先級的 pod,函數返回發生搶占的 node,被 搶占的 pods 列表,nominated node name 需要被移除的 pods 列表以及 error
  • Predicates():predicates 算法列表
  • Prioritizers():prioritizers 算法列表

Schedule的執行流程:

  • 執行 g.findNodesThatFit() 預選算法
  • 執行 postfilter plugin
  • 若 node 為 0 直接返回失敗的 error,若 node 數為1 直接返回該 node
  • 執行 g.priorityMetaProducer() 獲取 metaPrioritiesInterface,計算 pod 的metadata,檢查該 node 上是否有相同 meta 的 pod
  • 執行 PrioritizeNodes() 算法
  • 執行 g.selectHost() 通過得分選擇一個最佳的 node
//k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:186
func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
    ......
    // 1.獲取 node 數量
    numNodes := g.cache.NodeTree().NumNodes()
    if numNodes == 0 {
        return result, ErrNoNodesAvailable
    }

    // 2.執行預選過濾算法
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
    if err != nil {
        return result, err
    }
    
    postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
    if !postfilterStatus.IsSuccess() {
        return result, postfilterStatus.AsError()
    }
//沒有可用節點,直接報錯
    
    if len(filteredNodes) == 0 {
        ......
    }

    startPriorityEvalTime := time.Now()
    // 3.若只有一個 node 則直接返回該 node
    if len(filteredNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  filteredNodes[0].Name,
            EvaluatedNodes: 1 + len(failedPredicateMap),
            FeasibleNodes:  1,
        }, nil
    }

    // 4.獲取 pod meta 信息,執行優選算法
    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework,      pluginContext)
    if err != nil {
        return result, err
    }

    // 5.根據打分選擇最佳的 node,selectHost先對每個pod調度到每個節點上的分值排序,選擇出分數最高的
    host, err := g.selectHost(priorityList)
    trace.Step("Selecting host done")
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

4.predicates 調度算法源碼分析

predicates 算法主要是對集群中的 node 進行過濾,選出符合當前 pod 運行的 nodes。

默認的 predicates 過濾算法主要分為五種類型:

1、最重要的一種類型叫作 GeneralPredicates,主要考慮 kubernetes 資源是否能夠滿足,比如 CPU 和 Memory 是否足夠,端口是否沖突、selector 是否匹配。包含 PodFitsResources、PodFitsHost、PodFitsHostPorts、PodMatchNodeSelector 四種策略,其具體含義如下所示:

  • PodFitsHost:檢查宿主機的名字是否跟 Pod 的 spec.nodeName 一致,只有匹配的節點才能運行 pod
  • PodFitsHostPorts:檢查 Pod 申請的宿主機端口(spec.nodePort)是不是跟已經被使用的端口有沖突,如果是,則不能調度
  • PodMatchNodeSelector:檢查 Pod 的 nodeSelector 或者 nodeAffinity 指定的節點是否與節點匹配等
  • PodFitsResources:檢查主機的資源是否滿足 Pod 的需求,資源的計算是根據主機上運行 pod 請求的資源作為參考的,而不是以實際運行的資源數量。

findNodesThatFit() 是 predicates 策略的實際調用方法,調度器的輸入是一個 pod(多個 pod 調度可以通過遍歷來實現) 和多個節點,輸出是一個節點,表示 pod 將被調度到這個節點上。其基本流程如下:

  • 通過 cache 中的 NodeTree() 不斷獲取下一個 node
  • 將當前 node 和 pod 傳入podFitsOnNode() 方法中來判斷當前 node 是否符合要求
  • 如果當前 node 符合要求就將當前 node 加入預選節點的數組中filtered
  • 如果當前 node 不滿足要求,則加入到失敗的數組中,並記錄原因
  • 通過workqueue.ParallelizeUntil()並發執行checkNode()函數,一旦找到足夠的可行節點數后就停止篩選更多節點
  • 若配置了 extender 則再次進行過濾已篩選出的 node
  • 最后返回滿足調度條件的 node 列表,供下一步的優選操作
//k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:464
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
    //filtered 保存通過過濾的節點
    var filtered []*v1.Node
    //failedPredicateMap 保存過濾失敗的節點,即不適合 pod 運行的節點,沒有通過過濾的節點信息保存在 failedPredicateMap 字典中,key 是節點名,value 是失敗原因的列表;
    failedPredicateMap := FailedPredicateMap{}
    filteredNodesStatuses := framework.NodeToStatusMap{}

    if len(g.predicates) == 0 {
        filtered = g.cache.ListNodes()
    } else {
        allNodes := int32(g.cache.NodeTree().NumNodes())
        // 1.設定最多需要檢查的節點數
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

        filtered = make([]*v1.Node, numNodesToFind)
        ......

        // 2.獲取該 pod 的 meta 值 ,利用metadataProducer函數來獲取pods和node的信息
        meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)

        // 3.通過 cache 中的 `NodeTree()` 不斷獲取下一個 node
        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()

            // 4.將當前 node 和 pod 傳入`podFitsOnNode()` 方法中來判斷當前 node 是否符合要求
            fits, failedPredicates, status, err := g.podFitsOnNode(
                ......
            )
            if err != nil {
                ......
            }
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
                }
            } else {
                ......
            }
        }

        // 5.啟動 16 個 goroutine 並發執行 checkNode 函數
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            ......
        }
    }

    // 6.若配置了 extender 則再次進行過濾
    if len(filtered) > 0 && len(g.extenders) != 0 {
        ......
    }
    return filtered, failedPredicateMap, filteredNodesStatuses, nil
}

對於每個 pod,都要檢查能否調度到集群中的所有節點上(只包括可調度的節點),而且多個判斷邏輯之間是獨立的,也就是說 pod 是否能否調度到某個 node 上和其他 node 無關(至少目前是這樣的,如果這個假設不再成立,並發要考慮協調的問題),所以可以使用並發來提高性能。並發是通過 workQueue 來實現的,最大並發數量是 16,這個數字是 hard code。

pod 和 node 是否匹配是調用是 podFitsOnNode 函數來判斷的:

podFitsOnNode()基本流程如下:

  • 遍歷已經注冊好的預選策略predicates.Ordering(),按順序執行對應的策略函數
  • 遍歷執行每個策略函數,並返回是否合適,預選失敗的原因和錯誤
  • 如果預選函數執行失敗,則加入預選失敗的數組中,直接返回,后面的預選函數不會再執行
  • 如果該 node 上存在 nominated pod 則執行兩次預選函數

因為引入了搶占機制,此處主要說明一下執行兩次預選函數的原因:

第一次循環,若該 pod 為搶占者(nominatedPods),調度器會假設該 pod 已經運行在這個節點上,然后更新meta和nodeInfo,nominatedPods是指執行了搶占機制且已經分配到了 node(pod.Status.NominatedNodeName 已被設定) 但是還沒有真正運行起來的 pod,然后再執行所有的預選函數。

第二次循環,不將nominatedPods加入到 node 內。

而只有這兩遍 predicates 算法都能通過時,這個 pod 和 node 才會被認為是可以綁定(bind)的。這樣做是因為考慮到 pod affinity 等策略的執行,如果當前的 pod 與nominatedPods有依賴關系就會有問題,因為nominatedPods不能保證一定可以調度且在已指定的 node 運行成功,也可能出現被其他高優先級的 pod 搶占等問題,關於搶占問題下篇會詳細介紹。

//k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:610

func (g *genericScheduler) podFitsOnNode(......) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
    var failedPredicates []predicates.PredicateFailureReason
    var status *framework.Status

    podsAdded := false

    for i := 0; i < 2; i++ {
        metaToUse := meta
        nodeInfoToUse := info
        if i == 0 {
            // 1.第一次循環加入 NominatedPods,計算 meta, nodeInfo
            podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
        } else if !podsAdded || len(failedPredicates) != 0 {
            break
        }
        // 2.按順序執行所有預選函數
        for _, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []predicates.PredicateFailureReason
                err     error
            )
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                if err != nil {
                    return false, []predicates.PredicateFailureReason{}, nil, err
                }

                // 3.任何一個預選函數執行失敗則直接返回
                if !fit {
                    failedPredicates = append(failedPredicates, reasons...)                   
                    if !alwaysCheckAllPredicates {
                        klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
                            "evaluation is short circuited and there are chances " +
                            "of other predicates failing as well.")
                        break
                    }
                }
            }
        }
        // 4.執行 Filter Plugin
        status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
        if !status.IsSuccess() && !status.IsUnschedulable() {
            return false, failedPredicates, status, status.AsError()
        }
    }

    return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}

下面是資源判斷的主要方法PodFitsResources():

func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
    node := nodeInfo.Node()
    if node == nil {
        return false, nil, fmt.Errorf("node not found")
    }
var predicateFails []algorithm.PredicateFailureReason
  
//判斷node節點上的pods個數是否已經超出了允許分配的個數
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
 
//獲取pods的需求的資源
var podRequest *schedulercache.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok {
    podRequest = predicateMeta.podRequest
} else {
    // We couldn't parse metadata - fallback to computing it.
    podRequest = GetResourceRequest(pod)
}
if podRequest.MilliCPU == 0 &&
    podRequest.Memory == 0 &&
    podRequest.NvidiaGPU == 0 &&
    podRequest.EphemeralStorage == 0 &&
    len(podRequest.ExtendedResources) == 0 &&
    len(podRequest.HugePages) == 0 {
    return len(predicateFails) == 0, predicateFails, nil
}
 
//對四個方面的進行判斷,內存、cpu、Gpu、磁盤空間,這里就是簡單的比較大小。
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
if allocatable.NvidiaGPU < podRequest.NvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceNvidiaGPU, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU))
}
 
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
}
 
..........
 
return len(predicateFails) == 0, predicateFails, nil
}

5.priorities 調度算法源碼分析

對每個節點,priority 函數都會計算出來一個 0-10 之間的數字,表示 pod 放到該節點的合適程度,其中 10 表示非常合適,0 表示非常不合適。每個不同的優先級函數都有一個權重值,這個值為正數,最終的值為權重和優先級函數結果的乘積,而一個節點的權重就是所有優先級函數結果的加和。比如有兩種優先級函數 priorityFunc1 和 priorityFunc2,對應的權重分別為 weight1 和 weight2,那么節點 A 的最終得分是:

finalScoreNodeA = (weight1 * priorityFunc1) + (weight2 * priorityFunc2)

執行 priorities 調度算法的邏輯是在 PrioritizeNodes()函數中,其目的是執行每個 priority 函數為 node 打分,分數為 0-10,其功能主要有:

  • PrioritizeNodes() 通過並行運行各個優先級函數來對節點進行打分
  • 每個優先級函數會給節點打分,打分范圍為 0-10 分,0 表示優先級最低的節點,10表示優先級最高的節點
  • 每個優先級函數有各自的權重
  • 優先級函數返回的節點分數乘以權重以獲得加權分數
  • 最后計算所有節點的總加權分數
//k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:691
func PrioritizeNodes(......) (schedulerapi.HostPriorityList, error) {
    // 1.檢查是否有自定義配置,如果沒有選擇優先級判斷這一項,那么所有的節點的優先級是一樣的,那就隨機選擇一個節點
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }
    ......
 //建立一個二維數組來記錄每個優先級算法對節點的打分情況,分值為0-10,node節點的最終分值是所有優先級算法分值的相加
    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    ......
    // 2.使用 workqueue 啟動 16 個 goroutine ,從priorityConfigs中遍歷其中的優先級算法,並打分
    workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }

            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                results[i][index].Host = nodes[index].Name
            }
        }
    })

 
//等待所有計算結束
    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

 
    scoresMap, scoreStatus := framework.RunScorePlugins(pluginContext, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
    }
//打分完之后,進行分數的匯總操作,為每個 node 匯總分數
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))
 
    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }

        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

   
    return result, nil
}

對於優先級函數,我們只講解 LeastRequestedPriorityBalancedResourceAllocation 的實現,因為它們兩個和資源密切相關。

  • LeastRequestedPriority:最低請求優先級。根據 CPU 和內存的使用率來決定優先級,使用率越低優先級越高,也就是說優先調度到資源利用率低的節點,這個優先級函數能起到把負載盡量平均分到集群的節點上。默認權重為 1
  • BalancedResourceAllocation:資源平衡分配。這個優先級函數會把 pod 分配到 CPU 和 memory 利用率差不多的節點(計算的時候會考慮當前 pod 一旦分配到節點的情況)。默認權重為 1

最小資源請求優先級函數會計算每個節點的資源利用率,它目前只考慮 CPU 和內存兩種資源,而且兩者權重相同,具體的資源公式為:

score = (CPU Usage rate * 10 + Memory Usage Rate * 10 )/2

利用率的計算一樣,都是 (capacity - requested)/capacity,capacity 指節點上資源的容量,比如 CPU 的核數,內存的大小;requested 表示節點當前所有 pod 請求對應資源的總和。

平衡資源優先級函數會計算 CPU 和內存的平衡度,並盡量選擇更均衡的節點。它會分別計算 CPU 和內存的,計算公式為:

10 - abs(cpuFraction - memoryFraction)*101

對應的 cpuFraction 和 memoryFraction 就是資源利用率

6.代碼整體思路

參考鏈接:
kubernetes之Scheduler分析_縱橫四海的博客-CSDN博客

https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_scheduler_process.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM