本文大綱
本系列文章已經開源到github:https://github.com/farmer-hutao/k8s-source-code-analysis
1. 預選流程
predicate過程從pkg/scheduler/core/generic_scheduler.go:389 findNodesThatFit()
方法就算正式開始了,這個方法根據給定的predicate functions過濾所有的nodes來尋找一堆可以跑pod的node集。老規矩,我們來看主干代碼:
pkg/scheduler/core/generic_scheduler.go:389
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
checkNode := func(i int) {
fits, failedPredicates, err := podFitsOnNode(
//……
)
if fits {
length := atomic.AddInt32(&filteredLen, 1)
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
}
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
// Logic of extenders
}
}
return filtered, failedPredicateMap, nil
}
如上,刪的有點多,大家也可以看一下原函數然后對比一下,看看我為什么只保留這一點。從上面代碼中我們可以發現,最重要的是一個子函數調用過程fits, failedPredicates, err := podFitsOnNode()
,這個函數的參數我沒有貼出來,下面會詳細講;下半部分是一個extender過程,extender不影響對predicate過程的理解,我們后面專門當作一個主題講。所以這里的關注點是podFitsOnNode()
函數。
2. predicate的並發
進入podFitsOnNode()
函數邏輯之前,我們先看一下調用到podFitsOnNode()
函數的匿名函數變量checkNode是怎么被調用的:
pkg/scheduler/core/generic_scheduler.go:458
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
ParallelizeUntil()
函數是用於並行執行N個獨立的工作過程的,這個邏輯寫的挺有意思,我們看一下完整的代碼(這段的分析思路寫到注釋里哦):
vendor/k8s.io/client-go/util/workqueue/parallelizer.go:38
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
// 從形參列表看,需要關注的有workers和pieces兩個數字類型的參數,doworkPiece這個函數類型的參數
// DoWorkPieceFunc類型也就是func(piece int)類型
// 注意到上面調用的時候workers的實參是16,pieces是allNodes,也就是node數量
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
// 這里定義toProcess的容量和pieces相等,也就是和node數量相等
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
// 假設有100個node,那么這里就寫了100個數到toProcess里
toProcess <- i
}
// 關閉了一個有緩存的channel
close(toProcess)
// 如果pieces數量比較少,也就是說假設node只有10個,那么workers就賦值為10個
// 到這里差不多可以猜到worker是並發工作數,當node大於16時並發是16,當node小於16時並法數就是node數
if pieces < workers {
workers = pieces
}
wg := sync.WaitGroup{}
wg.Add(workers)
// 要批量開goroutine了
for i := 0; i < workers; i++ {
// 如果100個node,這里時16;如果是10個node,這里是10
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
// 從toProcess中拿一個數,舉個例子,假如現在並發是10,那么toProcess里面存的數據其實
// 也是10個,也就是1個goroutine拿到1個數,開始了一個下面的default邏輯;
// 假設並發數是16,node數是100,這時候toProcess里面也就是100個數,
// 這時候就是16個“消費者”在消耗100個數。當然每拿到一個數需要執行到一次下面的default
select {
case <-stop:
return
default:
// 第piece個節點被doWorkPiece了;
// 對應調用過程也就是checkNode函數傳入了一個整型參數piece
doWorkPiece(piece)
}
}
}()
}
wg.Wait()
}
回想一下前面的checkNode := func(i int){……}
,上面的doWorkPiece(piece)
也就是調用到了這里的這個匿名函數func(i int){……}
;到這里就清楚如何實現並發執行多個node的predicate過程了。
3. 一個node的predicate
checkNode的主要邏輯就是上面介紹的並發加上下面這個podFitsOnNode()
函數邏輯:
pkg/scheduler/core/generic_scheduler.go:425
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)
我們從podFitsOnNode()
的函數定義入手:
pkg/scheduler/core/generic_scheduler.go:537
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
nodeCache *equivalence.NodeCache,
queue internalqueue.SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error)
關於這個函數的邏輯,注釋里的描述翻譯過來大概是這個意思:
podFitsOnNode()函數檢查一個通過NodeInfo形式給定的node是否滿足指定的predicate functions. 對於給定的一個Pod,podFitsOnNode()函數會檢查是否有某個“等價的pod”存在,然后重用那個等價pod緩存的predicate結果。 這個函數的調用入口有2處: Schedule and Preempt.
- 當從Schedule進入時:這個函數想要測試node上所有已經存在的pod外加被指定將要調度到這個node上的其他所有高優先級(優先級不比自己低,也就是>=)的pod后,當前pod是否可以被調度到這個node上。
- 當從Preempt進入時:后面講preempt時再詳細分析。
podFitsOnNode()函數的參數有點多,每個跟進去就是一堆知識點。這里建議大家從字面先過一邊,然后跟進去看一下類型定義,類型的注釋等,了解一下功能,先不深究。整體看完一邊調度器代碼后回過頭深入細節。
我們一起看一下其中這個參數:predicateFuncs map[string]algorithm.FitPredicate
;這里的predicateFuncs是一個map,表示所有的predicate函數。這個map的key是個字符串,也就是某種形式的name了;value類型跟進去看一下:
pkg/scheduler/algorithm/types.go:36
// FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
FitPredicate是一個函數類型,3個參數,pod和node都很好理解,meta跟進去簡單看一下可以發現定義的是一些和predicate相關的一些元數據,這些數據是根據pod和node信息獲取到的,類似pod的端口有哪些,pod親和的pod列表等。返回值是一個表示是否fit的bool值,predicate失敗的原因列表,一個錯誤類型。
也就是說,FitPredicate這個函數類型也就是前面一直說的predicate functions的真面目了。下面看podFitsOnNode()函數的具體邏輯吧:
pkg/scheduler/core/generic_scheduler.go:537
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
nodeCache *equivalence.NodeCache,
queue internalqueue.SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
podsAdded := false
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
// 這里省略一個for循環,下面會單獨講
}
return len(failedPredicates) == 0, failedPredicates, nil
}
這里的邏輯是從一個for循環開始的,關於這個2次循環的含義代碼里有很長的一段注釋,我們先看一下注釋里怎么說的(這里可以多看幾遍體會一下):
- 出於某些原因考慮我們需要運行兩次predicate. 如果node上有更高或者相同優先級的“指定pods”(這里的“指定pods”指的是通過schedule計算后指定要跑在一個node上但是還未真正運行到那個node上的pods),我們將這些pods加入到meta和nodeInfo后執行一次計算過程。
- 如果這個過程所有的predicates都成功了,我們再假設這些“指定pods”不會跑到node上再運行一次。第二次計算是必須的,因為有一些predicates比如pod親和性,也許在“指定pods”沒有成功跑到node的情況下會不滿足。
- 如果沒有“指定pods”或者第一次計算過程失敗了,那么第二次計算不會進行。
- 我們在第一次調度的時候只考慮相等或者更高優先級的pods,因為這些pod是當前pod必須“臣服”的,也就是說不能夠從這些pod中搶到資源,這些pod不會被當前pod“搶占”;這樣當前pod也就能夠安心從低優先級的pod手里搶資源了。
- 新pod在上述2種情況下都可調度基於一個保守的假設:資源和pod反親和性等的predicate在“指定pods”被處理為Running時更容易失敗;pod親和性在“指定pods”被處理為Not Running時更加容易失敗。
- 我們不能假設“指定pods”是Running的因為它們當前還沒有運行,而且事實上,它們確實有可能最終又被調度到其他node上了。
看了這個注釋后,上面代碼里的前幾行就很好理解了,在第一次進入循環體和第二次進入時做了不同的處理,具體怎么做的處理我們暫時不關注。下面看省略的這個for循環做了啥:
pkg/scheduler/core/generic_scheduler.go:583
// predicates.Ordering()得到的是一個[]string,predicate名字集合
for predicateID, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
err error
)
// 如果predicateFuncs有這個key,則調用這個predicate;也就是說predicateFuncs如果定義了一堆亂七八遭的名字,會被忽略調,因為predicateKey是內置的。
if predicate, exist := predicateFuncs[predicateKey]; exist {
// 降低難度,先不看緩存情況。
if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
} else {
// 真正調用predicate函數了!!!!!!!!!
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if !fit {
// ……
}
}
}
如上,我們看一下2個地方:
- predicates.Ordering()
- fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
分兩個小節吧~
3.1. predicates的順序
pkg/scheduler/algorithm/predicates/predicates.go:130
var (
predicatesOrdering = []string{
CheckNodeConditionPred,
CheckNodeUnschedulablePred,
GeneralPred,
HostNamePred,
PodFitsHostPortsPred,
MatchNodeSelectorPred,
PodFitsResourcesPred,
NoDiskConflictPred,
PodToleratesNodeTaintsPred,
PodToleratesNodeNoExecuteTaintsPred,
CheckNodeLabelPresencePred,
CheckServiceAffinityPred,
MaxEBSVolumeCountPred,
MaxGCEPDVolumeCountPred,
MaxCSIVolumeCountPred,
MaxAzureDiskVolumeCountPred,
CheckVolumeBindingPred,
NoVolumeZoneConflictPred,
CheckNodeMemoryPressurePred,
CheckNodePIDPressurePred,
CheckNodeDiskPressurePred,
MatchInterPodAffinityPred}
)
如上,這里定義了一個次序,前面的for循環遍歷的是這個[]string,這樣也就實現了不管predicateFuncs
里定義了怎樣的順序,影響不了predicate的實際調用順序。官網對於這個順序有這樣一個表格解釋:
Position | Predicate | comments (note, justification...) |
---|---|---|
1 | CheckNodeConditionPredicate |
we really don’t want to check predicates against unschedulable nodes. |
2 | PodFitsHost |
we check the pod.spec.nodeName. |
3 | PodFitsHostPorts |
we check ports asked on the spec. |
4 | PodMatchNodeSelector |
check node label after narrowing search. |
5 | PodFitsResources |
this one comes here since it’s not restrictive enough as we do not try to match values but ranges. |
6 | NoDiskConflict |
Following the resource predicate, we check disk |
7 | PodToleratesNodeTaints |
check toleration here, as node might have toleration |
8 | PodToleratesNodeNoExecuteTaints |
check toleration here, as node might have toleration |
9 | CheckNodeLabelPresence |
labels are easy to check, so this one goes before |
10 | checkServiceAffinity |
- |
11 | MaxPDVolumeCountPredicate |
- |
12 | VolumeNodePredicate |
- |
13 | VolumeZonePredicate |
- |
14 | CheckNodeMemoryPressurePredicate |
doesn’t happen often |
15 | CheckNodeDiskPressurePredicate |
doesn’t happen often |
16 | InterPodAffinityMatches |
Most expensive predicate to compute |
這個表格大家對着字面意思體會一下吧,基本還是可以聯想到意義的。
當然這個順序是可以被配置文件覆蓋的,用戶可以使用類似這樣的配置:
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts", "order": 2},
{"name" : "PodFitsResources", "order": 3},
{"name" : "NoDiskConflict", "order": 5},
{"name" : "PodToleratesNodeTaints", "order": 4},
{"name" : "MatchNodeSelector", "order": 6},
{"name" : "PodFitsHost", "order": 1}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
}
整體過完源碼后我們再實際嘗試一下這些特性,這一邊先知道有這回事吧,ok,繼續~
3.2. 單個predicate執行過程
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
這行代碼其實沒有啥復雜邏輯,不過我們還是重復講一下,清晰理解這一行很有必要。這里的predicate()
來自前幾行的if語句predicate, exist := predicateFuncs[predicateKey]
,往前跟也就是FitPredicate類型,我們前面提過,類型定義在pkg/scheduler/algorithm/types.go:36
,這個߇#x7C7B;型表示的是一個具體的predicate函數,這里使用predicate()
也就是一個函數調用的語法,很和諧了。
3.3. 具體的predicate函數
一直在講predicate,那么predicate函數到底長什么樣子呢,我們從具體的實現函數找一個看一下。開始講design的時候提到過predicate的實現在pkg/scheduler/algorithm/predicates/predicates.go
文件中,先看一眼Structure吧:
這個文件中predicate函數有點多,這樣看眼花,我們具體點開一個觀察一下:
pkg/scheduler/algorithm/predicates/predicates.go:277
func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil
}
}
}
return true, nil, nil
}
我們知道predicate函數的特點,這樣就很好在這個一千六百多行go文件中尋找predicate函數了。像上面這個NoDiskConflict()
函數,參數是pod、meta和nodeinfo,很明顯是FitPredicate類型的,標准的predicate函數。
這個函數的實現也特別簡單,遍歷pod的Volumes,然后對於pod的每一個Volume,遍歷node上的每個pod,看是否和當前podVolume沖突。如果不fit就返回false加原因;如果fit就返回true,很清晰。
