圖解kubernetes調度器SchedulingQueue核心源碼實現


SchedulingQueue是kubernetes scheduler中負責進行等待調度pod存儲的對,Scheduler通過SchedulingQueue來獲取當前系統中等待調度的Pod,本文主要討論SchedulingQueue的設計與實現的各種實現, 了解探究其內部實現與底層源碼,本系列代碼基於kubernets1.1.6分析而來,圖解主要位於第二部分

SchedulingQueue設計

隊列與優先級

隊列與場景

類型 描述 通常實現
隊列 普通隊列是一個FIFO的數據結構,根據元素入隊的次序依次出隊 數組或者鏈表
優先級隊列 優先級隊列通常是指根據某些優先級策略,高優先級會優先被獲取 數組或者樹

其實在大多數的調度場景中,大多都是采用優先級隊列來實現,優先滿足優先級比較高的任務或者需求,從而減少后續高優先級對低優先級的搶占,scheduler中也是如此

優先級的選擇

k8s中調度的單元是Pod,scheduler中根據pod的優先級的高低來進行優先級隊列的構建, 這個其實是在kubernets的adminission准入插件中,會為用戶創建的pod根據用戶的設置,進行優先級字段的計算

三級隊列

活動隊列

活動隊列存儲當前系統中所有正在等待調度的隊列

不可調度隊列

當pod的資源在當前集群中不能被滿足時,則會被加入到一個不可調度隊列中,然后等待稍后再進行嘗試

backoff隊列

backoff機制是並發編程中常見的一種機制,即如果任務反復執行依舊失敗,則會按次增長等待調度時間,降低重試效率,從而避免反復失敗浪費調度資源

針對調度失敗的pod會優先存儲在backoff隊列中,等待后續重試

阻塞與搶占

阻塞設計

當隊列中不存在等待調度的pod的時候,會阻塞scheduler等待有需要調度的pod的時候再喚醒調度器,獲取pod進行調度

搶占相關

nominatedPods存儲pod被提議運行的node,主要用於搶占調度流程中使用,本節先不分析

源碼分析

數據結構

kubernetes中默認的schedulingQueue實現是PriorityQueue,本章就以該數據結構來分析

type PriorityQueue struct {
	stop  <-chan struct{}
	clock util.Clock
    // 存儲backoff的pod計時器
	podBackoff *PodBackoffMap

	lock sync.RWMutex
    // 用於協調通知因為獲取不到調度pod而阻塞的cond
	cond sync.Cond

    // 活動隊列
	activeQ *util.Heap
	
    // backoff隊列
	podBackoffQ *util.Heap
	
    // 不可調度隊列
	unschedulableQ *UnschedulablePodsMap
	// 存儲pod和被提名的node, 實際上就是存儲pod和建議的node節點
	nominatedPods *nominatedPodMap
	// schedulingCycle是一個調度周期的遞增序號,當pod pop的時候會遞增
	schedulingCycle int64
	// moveRequestCycle緩存schedulingCycle, 當未調度的pod重新被添加到activeQueue中
	// 會保存schedulingCycle到moveRequestCycle中
	moveRequestCycle int64
	closed bool
}

PriorityQueue作為實現SchedulingQueue的實現,其核心數據結構主要包含三個隊列:activeQ、podBackoffQ、unscheduleQ內部通過cond來實現Pop操作的阻塞與通知,接下來先分析核心的調度流程,最后再分析util.Heap里面的具體實現

activeQ

存儲所有等待調度的Pod的隊列,默認是基於堆來實現,其中元素的優先級則通過對比pod的創建時間和pod的優先級來進行排序

	// activeQ is heap structure that scheduler actively looks at to find pods to
	// schedule. Head of heap is the highest priority pod.
	activeQ *util.Heap

優先級比較函數

// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool {
	pInfo1 := podInfo1.(*framework.PodInfo)
	pInfo2 := podInfo2.(*framework.PodInfo)
	prio1 := util.GetPodPriority(pInfo1.Pod)
	prio2 := util.GetPodPriority(pInfo2.Pod)
    // 首先根據優先級的高低進行比較,然后根據pod的創建時間,越高優先級的Pod越被優先調度
    // 越早創建的pod越優先
	return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

podbackOffQ

podBackOffQ主要存儲那些在多個schedulingCycle中依舊調度失敗的情況下,則會通過之前說的backOff機制,延遲等待調度的時間

	// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
	// are popped from this heap before the scheduler looks at activeQ
	podBackoffQ *util.Heap

podBackOff

上面提到podBackOffQ隊列中並沒有存儲pod的backOff的具體信息,比如backoff的計數器,最后一次更新的時間等,podBackOff則類似一個記分板,記錄這些信息,供podBackOffQ使用

	// podBackoff tracks backoff for pods attempting to be rescheduled
	podBackoff *PodBackoffMap

// PodBackoffMap is a structure that stores backoff related information for pods
type PodBackoffMap struct {
	// lock for performing actions on this PodBackoffMap
	lock sync.RWMutex
	// initial backoff duration
	initialDuration time.Duration // 當前值是1秒
	// maximal backoff duration
	maxDuration time.Duration // 當前值是1分鍾
	// map for pod -> number of attempts for this pod
	podAttempts map[ktypes.NamespacedName]int
	// map for pod -> lastUpdateTime pod of this pod
	podLastUpdateTime map[ktypes.NamespacedName]time.Time
}

unschedulableQ

存儲已經嘗試調度但是當前集群資源不滿足的pod的隊列

moveRequestCycle

當因為集群資源發生變化會嘗試進行unschedulableQ中的pod轉移到activeQ,moveRequestCycle就是存儲資源變更時的schedulingCycle

func (p *PriorityQueue) MoveAllToActiveQueue() {
    // 省略其他代碼
    p.moveRequestCycle = p.schedulingCycle
}

schedulingCycle

schedulingCycle是一個遞增的序列每次從activeQ中pop出一個pod都會遞增

func (p *PriorityQueue) Pop() (*v1.Pod, error) {
    //省略其他
    	p.schedulingCycle++
}

並發活動隊列

並發從活動隊列中獲取pod

image.png
SchedulingQueue提供了一個Pop接口用於從獲取當前集群中等待調度的pod,其內部實現主要通過上面cond與activeQ來實現

當前隊列中沒有可調度的pod的時候,則通過cond.Wait來進行阻塞,然后在忘activeQ中添加pod的時候通過cond.Broadcast來實現通知

func (p *PriorityQueue) Pop() (*v1.Pod, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	for p.activeQ.Len() == 0 {
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
        // 
		p.cond.Wait()
	}
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.PodInfo)
	p.schedulingCycle++
	return pInfo.Pod, err
}

加入調度pod到活動隊列

image.png
當pod加入活動隊列中,除了加入activeQ的優先級隊列中,還需要從podBackoffQ和unschedulableQ中移除當前的pod,最后進行廣播通知阻塞在Pop操作的scheudler進行最新pod的獲取

func (p *PriorityQueue) Add(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pInfo := p.newPodInfo(pod)
	// 加入activeQ
	if err := p.activeQ.Add(pInfo); err != nil {
		klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
		return err
	}
	// 從unschedulableQ刪除
	if p.unschedulableQ.get(pod) != nil {
		klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
		p.unschedulableQ.delete(pod)
	}
	// Delete pod from backoffQ if it is backing off
	// 從podBackoffQ刪除
	if err := p.podBackoffQ.Delete(pInfo); err == nil {
		klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
	}
	// 存儲pod和被提名的node
	p.nominatedPods.add(pod, "")
	p.cond.Broadcast()

	return nil
}

schedulingCycle與moveRequestCycle

未調度的隊列的及時重試

image.png
導致調度周期schedulingCyclye變更主要因素如下:
1.當集群資源發生變化的時候:比如新添加pv、node等資源,那之前在unschedulableQ中因為資源不滿足需求的pod就可以進行放入activeQ中或者podBackoffQ中,及時進行調度
2.pod被成功調度: 之前由於親和性不滿足被放入到unschedulableQ中的pod,此時也可以進行嘗試,而不必等到超時之后,再加入

這兩種情況下會分別觸發MoveAllToActiveQueue和movePodsToActiveQueue變更moveRequestCycle使其等於schedulingCycle

對重試機制的影響

當前一個pod失敗的時候,有兩種選擇一是加入podBackoffQ中,二是加入unschedulableQ中,那么針對一個失敗的pod如何選擇該進入那個隊列中呢
image.png
結合上面的moveRequestCycle變更時機,什么時候moveRequestCycle會大於等於podSchedulingCycle呢?答案就是當前集群中進行過集群資源的變更或者pod被成功分配,那這個時候我們如果重試一個失敗的調度則可能會成功,因為集群資源變更了可能有新的資源加入

	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
		}
	} else {
		p.unschedulableQ.addOrUpdate(pInfo)
	}

失敗處理邏輯的注入

注入調度失敗邏輯處理

在創建scheduler Config的時候會通過MakeDefaultErrorFunc注入一個失敗處理函數, 在scheduler調度的時候會進行調用
kubernetes/pkg/scheduler/factory/factory.go: MakeDefaultErrorFunc會將沒有調度到任何一個node的pod重新放回到優先級隊列中

	podSchedulingCycle := podQueue.SchedulingCycle()
	// 省略非核心代碼
	if len(pod.Spec.NodeName) == 0 {
        //重新放回隊列
        if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
            klog.Error(err)
        }
    }

失敗處理的回調

當調度pod的失敗的時候, scheduler會同時調用sched.Error就是上面注入的失敗處理邏輯,來將調度失敗未分配node的pod節點重新加入到隊里鍾
kubernetes/pkg/scheduler/scheduler.go

func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
	// 錯誤回調
    sched.Error(pod, err)
	sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
	if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
		Type:    v1.PodScheduled,
		Status:  v1.ConditionFalse,
		Reason:  reason,
		Message: err.Error(),
	}); err != nil {
		klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
	}
}

PodBackoffMap

PodBackoffMap主要用於存儲pod的最后一次失敗的更新時間與實現次數,從而根據這些數據來進行pod的backoffTime的計算

數據結構設計

type PodBackoffMap struct {
	// lock for performing actions on this PodBackoffMap
	lock sync.RWMutex
	// 初始化 backoff duration
	initialDuration time.Duration // 當前值是1秒
	// 最大 backoff duration
	maxDuration time.Duration // 當前值是1分鍾
	// 記錄pod重試的次數
	podAttempts map[ktypes.NamespacedName]int
	// 記錄pod的最后一次的更新時間
	podLastUpdateTime map[ktypes.NamespacedName]time.Time
}

backoffTime計算算法

初始化的時候回設定initialDuration和maxDuration,在當前版本中分別是1s和10s,也就是backoffQ中的pod最長10s就會重新加入activeQ中(需要等待定時任務進行輔助)

在每次失敗回調的時候,都會進行BackoffPod方法來進行計數更新,在后續獲取pod的backoffTime的時候,只需要獲取次數然后結合initialDuration進行算法計算,結合pod最后一次的更新時間,就會獲取pod的backoffTime的終止時間
image.png

backoffDuration計算

其實最終的計算很簡單就是2的N次冪

func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
	// initialDuration是1s
	backoffDuration := pbm.initialDuration
	if _, found := pbm.podAttempts[nsPod]; found {
		// podAttempts里面包含pod的嘗試失敗的次數
		for i := 1; i < pbm.podAttempts[nsPod]; i++ {
			backoffDuration = backoffDuration * 2
			// 最大10s
			if backoffDuration > pbm.maxDuration {
				return pbm.maxDuration
			}
		}
	}
	return backoffDuration
}

podBackoffQ

image.png

優先級函數

podBackoffQ實際上會根據pod的backoffTime來進行優先級排序,所以podBackoffQ的隊列頭部,就是最近一個要過期的pod

func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
	pInfo1 := podInfo1.(*framework.PodInfo)
	pInfo2 := podInfo2.(*framework.PodInfo)
	bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
	bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
	return bo1.Before(bo2)
}

調度失敗加入到podBackoffQ

如果調度失敗,並且moveRequestCycle=podSchedulingCycle的時候就加入podBackfoffQ中

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
	// 省略檢查性代碼
	// 更新pod的backoff 信息
	p.backoffPod(pod)

	// moveRequestCycle將pod從unscheduledQ大於pod的調度周期添加到 如果pod的調度周期小於當前的調度周期
	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
		}
	} else {
		p.unschedulableQ.addOrUpdate(pInfo)
	}

	p.nominatedPods.add(pod, "")
	return nil

}

從unschedulableQ遷移

在前面介紹的當集群資源發生變更的時候,會觸發嘗試unschedulabelQ中的pod進行轉移,如果發現當前pod還未到達backoffTime,就加入到podBackoffQ中

		if p.isPodBackingOff(pod) {
			if err := p.podBackoffQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
				addErrorPods = append(addErrorPods, pInfo)
			}
		} else {
			if err := p.activeQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
				addErrorPods = append(addErrorPods, pInfo)
			}
		}

podBackoffQ定時轉移

在創建PriorityQueue的時候,會創建兩個定時任務其中一個就是講backoffQ中的pod到期后的轉移,每秒鍾嘗試一次

func (p *PriorityQueue) run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

因為是一個堆結果,所以只需要獲取堆頂的元素,然后確定是否到期,如果到期后則進行pop處來,加入到activeQ中

func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()

	for {
        // 獲取堆頂元素
		rawPodInfo := p.podBackoffQ.Peek()
		if rawPodInfo == nil {
			return
		}
		pod := rawPodInfo.(*framework.PodInfo).Pod
        // 獲取到期時間
		boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
		if !found {
			// 如果當前已經不在podBackoff中,則就pop出來然后放入到activeQ
			klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
			p.podBackoffQ.Pop()
			p.activeQ.Add(rawPodInfo)
			defer p.cond.Broadcast()
			continue
		}

		// 未超時
		if boTime.After(p.clock.Now()) {
			return
		}
		// 超時就pop出來
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
			return
		}
        // 加入到activeQ中
		p.activeQ.Add(rawPodInfo)
		defer p.cond.Broadcast()
	}
}

unschedulableQ

image.png

調度失敗

調度失敗后,如果當前集群資源沒有發生變更,就加入到unschedulable,原因上面說過

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
	// 省略檢查性代碼
	// 更新pod的backoff 信息
	p.backoffPod(pod)

	// moveRequestCycle將pod從unscheduledQ大於pod的調度周期添加到 如果pod的調度周期小於當前的調度周期
	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
		}
	} else {
		p.unschedulableQ.addOrUpdate(pInfo)
	}

	p.nominatedPods.add(pod, "")
	return nil

}

定時轉移任務

定時任務每30秒執行一次

func (p *PriorityQueue) run() {
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

邏輯其實就非常簡單如果當前時間-pod的最后調度時間大於60s,就重新調度,轉移到podBackoffQ或者activeQ中

func (p *PriorityQueue) flushUnschedulableQLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.PodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulableQ.podInfoMap {
		lastScheduleTime := pInfo.Timestamp
		// 如果該pod1分鍾內沒有被調度就加入到podsToMove
		if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
		// podsToMove將這些pod移動到activeQ
		p.movePodsToActiveQueue(podsToMove)
	}
}

調度隊列總結

數據流設計總結

image.png

3.1.1 三隊列與后台定時任務

從設計上三隊列分別存儲:活動隊列、bakcoff隊列、不可調度隊列,其中backoff中會根據任務的失敗來逐步遞增重試時間(最長10s)、unschedulableQ隊列則延遲60s

通過后台定時任務分別將backoffQ隊列、unschedulableQ隊列來進行重試,加入到activeQ中,從而加快完成pod的失敗重試調度

cycle與優先調度

schedulingCycle、moveRequestCycle兩個cycle其實本質上也是為了加快失敗任務的重試調度,當集群資源發生變化的時候,進行立即重試,那些失敗的優先級比較高、親和性問題的pod都可能會被優先調度

鎖與cond實現線程安全pop

內部通過lock保證線程安全,並通過cond來實現阻塞等待,從而實現阻塞scheduler worker的通知

今天就分析到這里,其實參考這個實現,我們也可以從中抽象出一些設計思想,實現自己的一個具有優先級、快速重試、高可用的任務隊列,先分析到這,下一個分析的組件是SchedulerCache, 感興趣可以加我微信一起交流學習,畢竟三個臭皮匠算計不過諸葛亮

微信號:baxiaoshi2020
關注公告號閱讀更多源碼分析文章 21天大棚
更多文章關注 www.sreguide.com
本文由博客一文多發平台 OpenWrite 發布


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM