k8s調度器kube-scheduler的核心實現在pkg/scheduler下
algorithmprovider:調度算法的注冊與獲取功能,核心數據結構是一個字典類的結構
apis:k8s集群中的資源版本相關的接口,和apiversion、type相關的一些內容
core:調度器實例的核心數據結構與接口以及外部擴展機制的實現
framework:定義了一套調度器內部擴展機制
internal:調度器核心實例依賴的內部數據結構
metrics:指標度量
profile:基於framework的一套調度器的配置,用於管控整個調度器的運行框架
testing:一些測試代碼
util:一些通用的工具
在pkg/scheduler/scheduler.go,定義了Scheduler:
type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm NextPod func() *framework.QueuedPodInfo Error func(*framework.QueuedPodInfo, error) //默認的調度失敗處理方法 StopEverything <-chan struct{} SchedulingQueue internalqueue.SchedulingQueue //Pod的調度隊列 Profiles profile.Map //調度器配置 client clientset.Interface }
pkg/scheduler/internal/queue/scheduling_queue.go中定義了調度隊列的接口SchedulingQueue:
type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error SchedulingCycle() int64 Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod Close() NumUnschedulablePods() int //不可調度的Pod數量 Run() }
AssignedPodAdded、AssignedPodUpdated、MoveAllToActiveOrBackoffQueue底層都會調用 movePodsToActiveOrBackoffQueue方法,主要用來設置資源(Pod、Node等)更新時的回調方法。即資源更新時,之前無法被調度的Pod,會有重試的機會。
PriorityQueue是接口的具體實現:
type PriorityQueue struct { framework.PodNominator //調度的結果(Pod和Node的對應關系) stop chan struct{} //外部控制隊列的channel clock util.Clock podInitialBackoffDuration time.Duration //backoff pod 初始的等待重新調度時間 podMaxBackoffDuration time.Duration //backoff pod 最大的等待重新調度時間 lock sync.RWMutex cond sync.Cond //並發場景下實現控制pop的阻塞 activeQ *heap.Heap podBackoffQ *heap.Heap unschedulableQ *UnschedulablePodsMap schedulingCycle int64 //計數器,每pop一共pod,增加一次 moveRequestCycle int64 closed bool }
其核心數據結構主要包含三個隊列,高優先度的Pod排在前面。
(1)activeQ:存儲所有等待調度的Pod的隊列
默認是基於堆來實現,其中元素的優先級則通過對比Pod的創建時間和Pod的優先級來進行排序。
kube-scheduler發現某個Pod的nodeName是空后,就認為這個Pod處於未調度狀態,將其放到調度隊列里:
(2)podBackoffQ:存儲運行失敗的Pod的隊列
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.QueuedPodInfo) pInfo2 := podInfo2.(*framework.QueuedPodInfo) bo1 := p.getBackoffTime(pInfo1) bo2 := p.getBackoffTime(pInfo2) return bo1.Before(bo2) } // getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { duration := p.calculateBackoffDuration(podInfo) backoffTime := podInfo.Timestamp.Add(duration) return backoffTime } // 計算backoff時間 func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { duration = duration * 2 if duration > p.podMaxBackoffDuration { return p.podMaxBackoffDuration } } return duration }
(3)unschedulableQ:其實是一個Map結構,存儲暫時無法調度的Pod
type UnschedulablePodsMap struct { podInfoMap map[string]*framework.QueuedPodInfo keyFunc func(*v1.Pod) string metricRecorder metrics.MetricRecorder //有Pod從Map中新增、刪除時就會增加1 } // 構造函數 func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap { return &UnschedulablePodsMap{ podInfoMap: make(map[string]*framework.QueuedPodInfo), keyFunc: util.GetPodFullName, metricRecorder: metricRecorder, } }
新建Scheduler的方法:
func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, recorderFactory profile.RecorderFactory, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { stopEverything := stopCh if stopEverything == nil { stopEverything = wait.NeverStop } options := defaultSchedulerOptions //獲取默認的調度器選項,里面會給定默認的algorithmSourceProvider for _, opt := range opts { opt(&options) } schedulerCache := internalcache.New(30*time.Second, stopEverything) //初始化調度緩存 registry := frameworkplugins.NewInTreeRegistry() //registry是一個字典,里面存放了插件名與插件的工廠方法 if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err } snapshot := internalcache.NewEmptySnapshot() configurator := &Configurator{ //基於配置創建configurator實例 client: client, recorderFactory: recorderFactory, informerFactory: informerFactory, schedulerCache: schedulerCache, StopEverything: stopEverything, percentageOfNodesToScore: options.percentageOfNodesToScore, podInitialBackoffSeconds: options.podInitialBackoffSeconds, podMaxBackoffSeconds: options.podMaxBackoffSeconds, profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), registry: registry, nodeInfoSnapshot: snapshot, extenders: options.extenders, frameworkCapturer: options.frameworkCapturer, } metrics.Register() var sched *Scheduler source := options.schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. sc, err := configurator.createFromProvider(*source.Provider) if err != nil { return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) } sched = sc case source.Policy != nil: // Create the config from a user specified policy source. policy := &schedulerapi.Policy{} switch { case source.Policy.File != nil: if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil { return nil, err } case source.Policy.ConfigMap != nil: if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil { return nil, err } } // Set extenders on the configurator now that we've decoded the policy // In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig, // which would have set extenders in the above instantiation of Configurator from CC options) configurator.extenders = policy.Extenders sc, err := configurator.createFromConfig(*policy) if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) } sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } // Additional tweaks to the config produced by the configurator. sched.StopEverything = stopEverything sched.client = client addAllEventHandlers(sched, informerFactory) return sched, nil }
addAllEventHandlers方法會啟動所有資源對象的事件監聽,例如,新生成的Pod,spec.nodeName為空且狀態是pending。kube-Scheduler會watch到這個Pod的生成事件。
kube-scheduler的調度流程為:
(1)Cobra命令行參數解析
通過options.NewOptions函數初始化各個模塊的默認配置,例如HTTP或HTTPS服務等。
通過options.Validate函數驗證配置參數的合法性和可用性
kube-scheduler啟動時通過--config <filename>指定配置文件
對默認配置啟動的調度器,可以用 --write-config-to把默認配置寫到一個指定文件里面。
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
algorithmSource:
provider: DefaultProvider
percentageOfNodesToScore: 0
schedulerName: default-scheduler
bindTimeoutSeconds: 600
clientConnection:
acceptContentTypes: ""
burst: 100
contentType: application/vnd.kubernetes.protobuf
kubeconfig: ""
qps: 50
disablePreemption: false
enableContentionProfiling: false
enableProfiling: false
hardPodAffinitySymmetricWeight: 1
healthzBindAddress: 0.0.0.0:10251
leaderElection:
leaderElect: true
leaseDuration: 15s
lockObjectName: kube-scheduler
lockObjectNamespace: kube-system
renewDeadline: 10s
resourceLock: endpoints
retryPeriod: 2s
metricsBindAddress: 0.0.0.0:10251
profiles:
- schedulerName: default-scheduler
- schedulerName: no-scoring-scheduler
plugins:
preScore:
disabled:
- name: '*'
score:
disabled:
- name: '*'
algorithmSource:算法提供者,即調度器配置(過濾器、打分器等一些配置文件的格式),目前提供三種方式:
Provider(DefaultProvider優先打散、ClusterAutoscalerProvider優先堆疊)、file、configMap
percentageOfNodesToscore:控制Node的取樣規模;
SchedulerName:調度器名稱,默認名稱是default-scheduler;
bindTimeoutSeconds:Bind階段的超時時間
ClientConnection:配置跟kube-apiserver交互的一些參數配置。比如contentType是用來跟kube-apiserver交互的序列化協議,這里指定為protobuf;
disablePreemption:關閉搶占協議;
hardPodAffinitySymnetricweight:配置PodAffinity和NodeAffinity的權重是多少。
profiles:可以定義多個。Pod通過spec.schedulerName指定使用的調度器(默認調度器是default-scheduler)
將cc對象(kube-scheduler組件的運行配置)傳入cmd/kube-scheduler/app/server.go中的Run函數,Run函數定義了kube-scheduler組件啟動的邏輯,它是一個運行不退出的常駐進程
(1)Configz registration
if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) }
(2)運行EventBroadcaster事件管理器。
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
(3)運行HTTP服務
/healthz:用於健康檢查
var checks []healthz.HealthChecker // 設置健康檢查 if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) } if cc.InsecureServing != nil { separateMetrics := cc.InsecureMetricsServing != nil handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil) if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start healthz server: %v", err) } } var checks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) }
/metrics:用於監控指標,一般用於Prometheus指標采集
if cc.InsecureMetricsServing != nil { handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil) if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start metrics server: %v", err) } }
(4)運行HTTPS服務
if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh returned by c.SecureServing.Serve if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } }
(5)實例化所有的Informer,運行所有已經實例化的Informer對象
包括Pod、Node、PV、PVC、SC、CSINode、PDB、RC、RS、Service、STS、Deployment
cc.InformerFactory.Start(ctx.Done()) cc.InformerFactory.WaitForCacheSync(ctx.Done()) // 等待所有運行中的Informer的數據同步到本地
(6)參與選主:
if cc.LeaderElection != nil { //需要參與選主 cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { close(waitingForLeader) sched.Run(ctx) }, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) //實例化LeaderElector對象 if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx) //調用client-go中tools/leaderelection/leaderelection.go中的Run()參與領導選舉 return fmt.Errorf("lost lease") }
LeaderCallbacks中定義了兩個回調函數:
OnStartedLeading函數是當前節點領導者選舉成功后回調的函數,定義了kube-scheduler組件的主邏輯;
OnStoppedLeading函數是當前節點領導者被搶占后回調的函數,會退出當前的kube-scheduler協程。
(7)運行sched.Run調度器。
sched.Run(ctx)
其運行邏輯為:
func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
首先調用了pkg/scheduler/internal/queue/scheduling_queue.go中PriorityQueue的Run方法:
func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
其邏輯為:
每隔1秒,檢測backoffQ里是否有pod可以被放進activeQ里
每隔30秒,檢測unschedulepodQ里是否有pod可以被放進activeQ里(默認條件是等待時間超過60 秒)
然后調用了sched.scheduleOne,它是kube-scheduler組件的調度主邏輯,通過wait.Until定時器執行,內部會定時調用sched.scheduleOne函數,當sched.config.StopEverythingChan關閉時,該定時器才會停止並退出。

kube-scheduler首先從activeQ里pop一個等待調度的Pod出來,並從NodeCache里拿到相關的Node數據
NodeCache橫軸為zoneIndex(即Node按照zone進行分堆,從而保證拿到的Node按zone打散),縱軸為nodeIndex。
在filter階段,每pop一個node進行過濾,zoneIndex往后自增一個位置,然后從該zone的node列表中取一個Node出來(如果當前zone的無Node,就會從下一個zone拿),取出后nodeIndex也要往后自增一個位置。
根據取樣比例判斷Filter到的Node是否足夠。如果取樣的規模已經達到了設置的取樣比例,Filter就會結束。
取樣比例通過percentageOfNodesToScore(0~100)設置
當集群中的可調度節點少於50個時,調度器仍然會去檢查所有的Node
若不設置取樣比例,默認的比例會隨着節點數量的增多不斷降低(最低到5%)
Scheduling Framework是一種可插入的架構,在原有的調度流程中定義了豐富的擴展點(extention point)接口
開發者可以通過實現擴展點所定義的接口來實現插件,從而將自身的調度邏輯集成到Scheduling Framework中。
pkg/scheduler/framework/plugins/names/names.go中記載了所有自帶插件的插件名
需要啟用的在pkg/scheduler/algorithmprovider/registry.go中進行注冊
Scheduling Framework在執行調度流程運行到相應的擴展點時,會調用用戶注冊的插件,影響調度決策的結果。

核心調度流程在pkg/scheduler/core/generic_scheduler.go:
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") if len(feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } // When only one node after predicate, just use it. if len(feasibleNodes) == 1 { return ScheduleResult{ SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) if err != nil { return result, err } host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses), FeasibleNodes: len(feasibleNodes), }, err }
下面為Scheduling Framework全流程,灰色插件默認不啟用:
1、scheduling cycle
scheduling cycle是調度的核心流程,主要進行調度決策,挑選出唯一的節點。
scheduling cycle是同步執行的,同一個時間只有一個scheduling cycle,是線程安全的
擴展點1:Sort
用於排序調度隊列中的Pod,接口只定義了一個函數Less,用於堆排序待調度Pod時進行比較
type QueueSortPlugin interface { Plugin Less(*PodInfo, *PodInfo) bool }
比較函數在同一時刻只有一個,所以Sort插件只能Enable一個,如果用戶Enable了2個則調度器啟動時會報錯退出
-
PrioritySort:首先比較優先級,然后再比較timestamp:
type PrioritySort struct{} func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { p1 := corev1helpers.PodPriority(pInfo1.Pod) p2 := corev1helpers.PodPriority(pInfo2.Pod) return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp)) } func PodPriority(pod *v1.Pod) int32 { if pod.Spec.Priority != nil { return *pod.Spec.Priority } return 0 }
預選階段先並發
運行PreFilter,只有當所有的PreFilter插件都返回success 時,才能進入Filter階段,否則Pod將會被拒絕掉,標識此次調度流程失敗;
再並發
運行Filter的所有插件,每個Node只要被任一Filter插件認為不滿足調度要求就會被濾除。
為了提升效率,執行順序可以被配置,這樣用戶就可以將過濾掉大量節點的策略(例如NodeSelector的Filter)放到前邊執行,從而減少后邊Filter策略執行的次數
擴展點2:PreFilter
PreFilter是調度流程啟動之前的預處理,可以進行Pod信息的加工、集群或Pod必須滿足的預置條件的檢查等。
-
NodeResourcesFit
-
NodePorts
-
PodTopologySpread
-
InterPodAffinity
-
VolumeBinding:檢查Pod掛載的PVC,如果其對應SC的VolumeBindingMode是Immediate模式,該PVC必須已經是bound,否則需要返回UnschedulableAndUnresolvable
-
NodeAffinity
-
ServiceAffinity
擴展點3:Filter
-
NodeUnschedulable:Node是否不允許調度
-
NodeResourcesFit:檢查節點是否有Pod運行所需的資源
-
Nodename:Node是否符合Pod在spec.nodeSelector中的要求
-
NodePorts:若Pod定義了Ports.hostPort屬性,則檢查其值指定的端口是否已經被節點上其他容器或服務占用
-
NodeAffinity:Pod和Node的親和和反親和調度
-
VolumeRestrictions:檢查掛載該Node上的卷是否滿足存儲提供者的要求
-
TainttToleration:檢查Pod Tolerates和Node Taints是否匹配
-
NodeVolumeLimits、 EBSLimits、 GCEPDLimits、 AzureDiskLimits、 CinderVolume:校驗PVC指定的Provision在CSI plugin或非CSI Plugin(后三個)上報的單機最大掛盤數(存儲插件提供方一般對每個節點的單機最大掛載磁盤數是有限制的)
-
VolumeBinding:檢查Pod掛載的PVC,如果其是bound狀態,檢查節點是否滿足PV的拓撲要求;如果還沒有bound,檢查節點是否能有滿足拓撲、存儲空間要求的PV
-
VolumeZone:檢查檢查PV的Label,如果定義了zone的信息,則必須和Node的zone匹配
-
PodTopologySpread:檢查Pod的拓撲邏輯
-
InterPodaffinity:檢查Pod間的親和、反親和邏輯
-
NodeLabel
-
ServiceAffinity
擴展點4:PostFilter
主要用於處理Pod在Filter階段失敗后的操作,如搶占、Autoscale觸發等。
-
DefaultPreemption:當高優先級的Pod沒有找到合適的Node時,會執行Preempt搶占算法,搶占的流程:
①一個Pod進入搶占的時候,首先會判斷Pod是否擁有搶占的資格,有可能上次已經搶占過一次。
②如果符合搶占資格,會先對所有的節點進行一次過濾,過濾出符合這次搶占要求的節點。然后
③模擬一次調度,把優先級低的Pod先移除出去,再嘗試能否把待搶占的Pod放置到此節點上。然后通過這個過程從過濾剩下的節點中選出一批節點進行搶占。
④ProcessPreemptionWithExtenders是一個擴展的鈎子,用戶可以在這里加一些自己搶占節點的策略。如果沒有擴展的鈎子,這里面不做任何動作。
⑤PickOneNodeForPreemption,從上面選出的節點里挑選出最合適的一個節點,策略包括:
優先選擇打破PDB最少的節點;
其次選擇待搶占Pods中最大優先級最小的節點;
再次選擇待搶占Pods優先級加和最小的節點;
接下來選擇待搶占Pods數目最小的節點;
最后選擇擁有最晚啟動Pod的節點;
通過過濾之后,會選出一個最合適的節點。對這個節點上待搶占的Pod進行delete,完成搶占過程。
擴展點5:PreScore
獲取到通過Filter階段的節點列表后,進行一些信息預處理、生成日志或者監控信息。
-
SelectorSpread
-
InterPodaffinity
-
PodTopologySpread
-
TaintToleration
擴展點6:Score
對Filter過濾后的剩余節點進行打分。
-
SelectorSpread
-
NodeResourcesBalancedAllocation :碎片率(CPU 的使用比例和內存使用比例的差值 )={ 1 - Abs[CPU(Request / Allocatable) - Mem(Request / Allocatable)] } * Score。如果這個差值越大,就表示碎片越大,優先不分配到這個節點上。
-
NodeResourcesLeastAllocated:優先打散,公式是 (Allocatable - Request) / Allocatable * Score
-
ImageLocality:如果節點里面存在鏡像的話,優先把Pod調度到這個節點上。這里還會去考慮鏡像的大小,會按照節點上已經存在的鏡像大小優先級親和
-
InterPodaffinity
-
NodeAffinity
-
NodePreferAvoidpods
-
PodTopologySpread:權重為2(因為是用戶指定的)
-
TaintToleration
-
NodeResourcesMostAllocated:優先堆疊,公式是Request / Allocatable * Score
-
RequestedToCapacityRatio:指定比率。用戶指定配置參數可以指定不同資源使用比率的分數,從而達到控制集群上每個節點上pod的分布。
-
nodeLabel
-
ServiceAffinity:替換了曾經的SelectorSpreadPriority(因為Service代表一組服務,只要能做到服務的打散分配就足夠了)。
擴展點7:NormalizeScore
標准化完成后,Scheduler會綜合PreScore+Score所有插件的打分。
擴展點8:Reserve
分配Pod到Node的時候,需要進行賬本預占(Reserve),將調度結果放到調度緩存(Schedule Cache)
。預占的過程會把Pod的狀態標記為Assumed(處於內存態)、在Node的狀態中添加該Pod的數據賬本。
-
volumebinding:調用AssumePodVolumes方法,更改調度緩存中已經Match的PV的annotation[pv.kubernetes.io/bound-by-controller]="yes"和未匹配到PV的PVC的 annotation[volume.kubernetes.io/selected-node]=所選節點。最后更改調度緩存中Pod的.spec.nodeName。
PS:未來可能會將UnReserve與Reserve統一到一起,即要求開發者在實現Reserve的同時定義UnReserve,保證數據能夠有效的清理,避免留下臟數據
擴展點9:Permit
Pod在Reserve階段完成資源預留后、Bind操作前,開發者可以定義自己的策略在Permit階段進行攔截,根據條件對Pod進行 allow(允許Pod通過Permit階段)、reject(Pod調度失敗)和wait(可設置超時時間)這3種操作。
Schedule Theread周而復始的從activeQ拿出Pod,進入scheduling cycle的調度流水線。
scheduling cycle結束后,這個Pod會異步交給Wait Thread,Wait Thread如果等待成功了,就會交給binding cycle
2、Binding cycle
擴展點10:prebind
-
VolumeBinding:將之前Reserve階段的volumebinding實際更新到apiserver中,等待PV Controller完成binding。最終所有PV都處於bound狀態且NodeAffinity得到滿足。
擴展點11:bind
進行最后實際的綁定,更新Pod和Node的數據
-
DefaultBinder
選中的節點在和待調度Pod進行Bind的時候,有可能會Bind失敗,此時需要做回退,把Pod的Assumed狀態退回Initial,從Node里面把Pod數據賬本擦除掉,會把Pod重新丟回到unschedulableQ隊列里面。在unschedulableQ里,如果一個Pod一分鍾沒調度過,就會重新回到activeQ。它的輪詢周期是30s。
調度失敗的Pod會放到backoffQ,在backoffQ里等待的時間會比在unschedulableQ里更短,backoffQ里的降級策略是2的指數次冪降級。假設重試第一次為1s,那第二次就是2s,第三次就是4s,但最大到10s。
最終,某個Node上的kubelet會watch到這個Pod屬於自己所在的節點。kubelet會在節點上創建Pod,包括創建容器storage、network。等所有的資源都准備完成,kubelet會把Pod狀態更新為Running
參考資料:
[1] https://kubernetes.io/docs/home/
[2] https://edu.aliyun.com/roadmap/cloudnative
[3] 鄭東旭《Kubernetes源碼剖析》
下一部分:
k8s調度器擴展機制