k8s調度器介紹(調度框架版本)


從一個pod的創建開始

  1. 由kubectl解析創建pod的yaml,發送創建pod請求到APIServer。
  2. APIServer首先做權限認證,然后檢查信息並把數據存儲到ETCD里,創建deployment資源初始化。
  3. kube-controller通過list-watch機制,檢查發現新的deployment,將資源加入到內部工作隊列,檢查到資源沒有關聯pod和replicaset,然后創建rs資源,rs controller監聽到rs創建事件后再創建pod資源。
  4. scheduler 監聽到pod創建事件,執行調度算法,將pod綁定到合適節點,然后告知APIServer更新pod的spec.nodeName
  5. kubelet 每隔一段時間通過其所在節點的NodeName向APIServer拉取綁定到它的pod清單,並更新本地緩存。
  6. kubelet發現新的pod屬於自己,調用容器API來創建容器,並向APIService上報pod狀態。
  7. Kub-proxy為新創建的pod注冊動態DNS到CoreOS。為Service添加iptables/ipvs規則,用於服務發現和負載均衡。
  8. deploy controller對比pod的當前狀態和期望來修正狀態。

調度器介紹

從上述流程中,我們能大概清楚kube-scheduler的主要工作,負責整個k8s中pod選擇和綁定node的工作,這個選擇的過程就是應用調度策略,包括NodeAffinity、PodAffinity、節點資源篩選、調度優先級、公平調度等等,而綁定便就是將pod資源定義里的nodeName進行更新。

設計

kube-scheduler的設計有兩個歷史階段版本:

  1. 基於謂詞(predicate)和優先級(priority)的篩選。
  2. 基於調度框架的調度器,新版本已經把所有的舊的設計都改造成擴展點插件形式(1.19+)。

所謂的謂詞和優先級都是對調度算法的分類,在scheduler里,謂詞調度算法是來選擇出一組能夠綁定pod的node,而優先級算法則是在這群node中進行打分,得出一個最高分的node。

而調度框架的設計相比之前則更復雜一點,但確更加靈活和便於擴展,關於調度框架的設計細節可以查看官方文檔——624-scheduling-framework,當然我也有一遍文章對其做了翻譯還加了一些便於理解的補充——KEP: 624-scheduling-framework。總結來說調度框架的出現是為了解決以前webhooks擴展器的局限性,一個是擴展點只有:篩選、打分、搶占、綁定,而調度框架則在這之上又細分了11個擴展點;另一個則是通過http調用擴展進程的方式其實效率不高,調度框架的設計用的是靜態編譯的方式將擴展的程序代碼和scheduler源碼一起編譯成新的scheduler,然后通過scheduler配置文件啟用需要的插件,在進程內就能通過函數調用的方式執行插件。

調度流程

現在網上大部分的kube-scheduler調度流程文章都不是基於新的調度框架所寫的,還是謂詞和優先級的流程。基於調度框架實現的調度流程總的來說就是執行一個個插件的過程,如下圖:

整個過程可以分為兩個周期:調度周期(scheduling cycle)、綁定周期(Binding Cycle),這兩個周期的區別不僅僅是包含插件,還有每個周期的上下文(Cycle Context),這個上下文將貫穿各自的周期使周期內的每個插件之間能夠進行數據的交流。Sort插件是不屬於兩個周期任何一個,它的職責就是對調度隊列中的Pod進行排序。

一個pod的調度過程在調度插件里是線性執行下去的,但是綁定周期的執行是異步的,也就是說scheduler在執行A Pod的綁定周期時,其實也同時開始了B Pod的調度周期。這也是比較合理的,畢竟Bind插件是需要和APIServer進行通信來更新調度pod的nodeName,這個網絡IO過程存在着不可確定性。

調度周期:

Filter插件的功能類似之前的謂詞調度,這個過程就是根據調度策略函數(在調度框架里就是多個Filter插件函數)進行node篩選,篩選的原理就是將被篩選的node和待調度的pod以及周期上下文等作為參數一並傳入這些函數,最后收集通過了所有篩選函數的node進入下一階段,在這個階段將會以node為單位進行並發處理。

PostFilter插件雖說是發生在Filter之后,但是確只能在Filter插件沒有返回合適的node才執行。在scheduler里默認的PostFilter插件只有一個功能,進行搶占調度。搶占調度的原理:首先會將node上低於待調度pod的優先級的Pod全部剔除,當然這個只是模擬過程並不是真正將Pod從干掉,然后再次執行Filter插件,如果失敗了那就是搶占調度失敗,成功了則將前面剔除的pod一個一個加回來,每一次都執行Filter插件從而找出調度該Pod所需要剔除的最少的低優先級Pod。

Score插件的功能類比以前的優先級調度,這個過程是對前一階段得出的node列表進行再篩選,得出最終要調度的node。NormalizeScore再調度框架里也不能算是一個單獨擴展點,它往往是配合着score插件一起出現,為了將統一插件打分的分數。在調度框架里是作為Score插件可選的實現接口,同樣的Score插件的也是會並發的在每個node上執行。

Reserve 插件有兩種函數,reserve函數在綁定前為Pod做准備動作,Unreserve函數則在綁定周期間發生錯誤的時候做恢復。默認的Reserve插件使用情況是處理pod關聯里pvc與pv的綁定和解綁。

綁定周期:

整個綁定周期都是在一個異步的協程中,在執行進入綁定周期前會執行Pod的assume(假定)過程,這個過程做的主要是假設Pod已經綁定到目標node上,所以會更新scheduler的node緩存信息,這樣當調度下一個pod到前一個pod真正在node上創建的過程中,能夠用真正的node信息進行調度。

Scheduler的啟動流程

現在我們了解了scheduler是如何執行調度算法、pod綁定過程的,但是對於什么時候執行調度和調度的pod怎么獲得其實還並不清楚,所以我們需要深入到scheduler的代碼來了解這一切。

上面是一個簡略版的調度器處理pod流程:

首先scheduler會啟動一個client-go的Informer來監聽Pod事件(不只Pod其實還有Node等資源變更事件),這時候注冊的Informer回調事件會區分Pod是否已經被調度(spec.nodeName),已經調度過的Pod則只是更新調度器緩存,而未被調度的Pod會加入到調度隊列,然后經過調度框架執行注冊的插件,在綁定周期前會進行Pod的假定動作,從而更新調度器緩存中該Pod狀態,最后在綁定周期執行完向ApiServer發起BindAPI,從而完成了一次調度過程。

先找到在/cmd/kube-scheduler/scheduler.go的入口函數

func main() {
	command := app.NewSchedulerCommand()
	code := cli.Run(command)
	os.Exit(code)
}

k8中組件通用的啟動模版,我們需要找到這個command定義的

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	...
  cmd := &cobra.Command{ // 定義了一個cobra的Comand結構體, cmd.Execute(),會執行定義的Run函數。
		Run: func(cmd *cobra.Command, args []string) {
			if err := runCommand(cmd, opts, registryOptions...); err != nil { 
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		}
		...
	}
}

查看runCommand定義

func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
	...
	cc, sched, err := Setup(ctx, opts, registryOptions...) // 初始化配置、Scheduler
	...
	return Run(ctx, cc, sched)
}

查看Run定義

func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	// To help debugging, immediately log version
	klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

	// 全局配置
	if cz, err := configz.New("componentconfig"); err == nil {
		cz.Set(cc.ComponentConfig)
	} else {
		return fmt.Errorf("unable to register configz: %s", err)
	}

	// 事件管理器
	cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

	// 選舉檢查
	var checks []healthz.HealthChecker
	if cc.ComponentConfig.LeaderElection.LeaderElect {
		checks = append(checks, cc.LeaderElection.WatchDog)
	}

	// http和metric服務
	if cc.InsecureServing != nil {
		...
	}
	if cc.InsecureMetricsServing != nil {
		... 
	}
	// https服務
	if cc.SecureServing != nil {
		...
	}

	// 啟動所有Informer
	cc.InformerFactory.Start(ctx.Done())

	// 等待informer緩存完畢
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// 選舉機制啟動
	if cc.LeaderElection != nil {
		...
	}

	// 非選舉機制啟動過, 無論是選舉和非選舉啟動都會調用最后處理邏輯都會到sched.Run()
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

sched.Run在/pkg/scheduler/scheduler.go

func (sched *Scheduler) Run(ctx context.Context) {
	...
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0) 
	sched.SchedulingQueue.Close()
}

其中wait.UntilWithContext將會不間斷的調用sched.scheduleOne函數,這么看schedulerOne就是處理Pod調度的工作函數了,到這里我們得回到上面New出sched的地方cc, sched, err := Setup(...)

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
	c, err := opts.Config() // 從Options(命令行收集)初始化schedler的配置
	
	cc := c.Complete() // 補充配置

	// Create the scheduler.
	sched, err := scheduler.New(...), // 初始化Scheduler
	)
	return &cc, sched, nil
}

查看New方法

func New(...) (*Scheduler, error) {
  options := defaultSchedulerOptions // 設置默認配置項
  ...
	configurator := &Configurator{  // 創建配置器
    ...
	}
 
	sched, err := configurator.create()  // 通過配置起器創建scheduler
	if err != nil {
		return nil, fmt.Errorf("couldn't create scheduler: %v", err)
	}
  // 為informer設置監聽事件,包括pod(已調度(字段NodeName)-添加到SchedulerCache, 為調度則添加到SchedulingQueue隊列中。
  // Node、PV、PVC、SC、CSINode、Service
  addAllEventHandlers(sched, informerFactory, podInformer)
  return sched, nil
}

查看配置起Configuratorcreate

func (c *Configurator) create() (*Scheduler, error) {
  // 創建提名隊列,用於存儲發生搶占的Pod
	nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister())
	profiles, err := profile.NewMap(...) // 調度框架配置

	podQueue := internalqueue.NewSchedulingQueue()  // 創建調度框架
  
	algo := NewGenericScheduler() // 創建調度算法,這里面主要是執行篩選和打分插件

	return &Scheduler{
		SchedulerCache:  c.schedulerCache,  // 調度緩存
		Algorithm:       algo, // 調度算法
		Extenders:       extenders,  // webhook擴展
		Profiles:        profiles,  // 調度框架配置
		NextPod:         internalqueue.MakeNextPodFunc(podQueue), // 獲取調度Pod
		Error:           MakeDefaultErrorFunc(),  // 調度失敗處理
		StopEverything:  c.StopEverything,  // 停止器
		SchedulingQueue: podQueue,  // 調度隊列
	}, nil
}

這里我們發現了SchedulingQueue是 由NewSchedulingQueue聲明的一個對象。

/pkg/scheduler/internal/queue/scheduling_queue.go

func NewPriorityQueue(
	lessFn framework.LessFunc,
	opts ...Option,
) *PriorityQueue {
	...
	pq := &PriorityQueue{  // 定義了3種隊列,activeQ、unschedulableQ、podBackoffQ
		PodNominator:              options.podNominator,
		clock:                     options.clock,
		stop:                      make(chan struct{}),
		podInitialBackoffDuration: options.podInitialBackoffDuration,
		podMaxBackoffDuration:     options.podMaxBackoffDuration,
		activeQ:                   heap.NewWithRecorder(), 
		unschedulableQ:            newUnschedulablePodsMap(),
		moveRequestCycle:          -1,
	}
  pq.podBackoffQ = heap.NewWithRecorder()
	return pq
}

SchedulingQueue的結構

type SchedulingQueue interface {
	...
	Pop() (*framework.QueuedPodInfo, error)
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error
	MoveAllToActiveOrBackoffQueue(event string)
}

找到了sched的屬性SchedulingQueue實際上是一個PriorityQueue對象,我們找到它的Run方法。

func (p *PriorityQueue) Run() {
	// 每一秒從podBackoffQ拿出最近的pod檢查是否可以加入到activeQ
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) 
	// 沒30秒從無法調度pod的隊列拿出pod檢查是否可以加入到activeQ
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

現在我們找到了整個sched的啟動和調度隊列管理的功能,接下來查看具體調度一個pod的詳細經過。

sched.Run中我們找打了scheduleOne方法:/pkg/scheduler/scheduler.go

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	podInfo := sched.NextPod() // 獲取activeQ的下一個pod
  fwk, err := sched.frameworkForPod(pod) // 從Pod里獲取設置調度框架,默認`default-schdeler`
	...
	scheduleResult, err := sched.Algorithm.Schedule()  // 執行調度算法:Filter和Score等插件
	...
	err = sched.assume()  // 假定pod
	...
	go func() { // 異步執行bind
		...
		err := sched.bind()
		...
	}
}

這個函數正是處理pod調度的主函數,而獲取需要調度的pod是執行sched.NextPod(),然后就是執行調度框架里的各個注冊插件,至此這就是所有的scheduler的工作代碼了,如果要看詳細的流程,可以查看我寫的思維導圖。
github思維導圖地址:https://github.com/goofy-z/k8s-learning/blob/master/K8s源碼學習/kube-scheduler/scheduler.xmind
在線思維導圖:https://www.processon.com/view/link/6167925d5653bb1336dca0ca


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM