在kubernetes的scheduler調度器的設計中為用戶預留了兩種擴展機制SchdulerExtender與Framework,本文主要淺談一下SchdulerExtender的實現, 因為還有一篇Framework, 所以本文的k8s代碼切到1.18版本
1. 設計思路
1.1 實現機制
SchdulerExtender是kubernets外部擴展方式,用戶可以根據需求獨立構建調度服務,實現對應的遠程調用接口(目前是http), scheduler在調度的對應階段會根據用戶定義的資源和接口來進行遠程調用,對應的service根據自己的資源數據和scheduler傳遞過來的中間調度結果來進行決策
1.2 服務插拔
extender只需要實現對應插件的接口,並編寫yaml文件來進行注冊對應的服務接口,就可以實現scheduler的擴展,不需要修改任何調度器的代碼,即可實現調度插件的插拔
1.3 資源存儲
因為是獨立的服務,extender可以實現自定義資源的存儲與獲取,甚至可以不依賴於etcd使用第三方的存儲來進行資源的存儲,主要是用於kubernetes中不支持的那些資源的調度擴展
2. SchedulerExtender
2.1 接口與實現
2.1.1 接口聲明
Scheduler主要用於擴展
type SchedulerExtender interface {
// Name returns a unique name that identifies the extender.
Name() string
//預選階段, 進行篩選
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// 優選階段,參與優選評分
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
// extender對pod指向綁定操作
Bind(binding *v1.Binding) error
// 擴展是否支持bind
IsBinder() bool
// 是否對對應的pod的資源感興趣
IsInterested(pod *v1.Pod) bool
// 搶占階段
ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims,
nodeInfos listers.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error)
// 是否支持搶占
SupportsPreemption() bool
// IsIgnorable returns true indicates scheduling should not fail when this extender
// is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
IsIgnorable() bool
}
2.1.2 默認實現
// HTTPExtender implements the algorithm.SchedulerExtender interface.
type HTTPExtender struct {
extenderURL string
preemptVerb string
filterVerb string
prioritizeVerb string
bindVerb string
weight int64 // 對應的權重
client *http.Client // 負責http接口通過
nodeCacheCapable bool // 是否傳遞node元數據
managedResources sets.String // 當前extender管理的資源
ignorable bool
}
extender的默認是海鮮是同過 HTTPExtender實現,即基於http協議通過json來進行數據傳遞,其核心數據結構如下
2.2 關鍵實現機制
2.2.1 遠程通信接口
其實通信很簡單,通過http協議json序列化方式來進行遠程post的提交,並序列化返回的結果
// Helper function to send messages to the extender
func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
// 序列化
out, err := json.Marshal(args)
if err != nil {
return err
}
// 拼接url
url := strings.TrimRight(h.extenderURL, "/") + "/" + action
req, err := http.NewRequest("POST", url, bytes.NewReader(out))
if err != nil {
return err
}
// 設置http header
req.Header.Set("Content-Type", "application/json")
// 發送數據接收結果
resp, err := h.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
}
// 序列化返回結果
return json.NewDecoder(resp.Body).Decode(result)
}
2.2.2 node cache
nodeCacheCapable是聲明extender的一個參數,即對應的extender是否會緩存node的數據,如果緩存數據,則只需要傳遞node的名字,而不會進行所有元數據的傳遞,可以減少通信的數據包大小
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
// 只會傳遞node的名字
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
// 傳遞node所有元數據
nodeList.Items = append(nodeList.Items, *node)
}
}
// 構建傳遞的數據
args = &extenderv1.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
2.2.3 managedResources
在進行extender的調用的時候,會進行檢測extenders會否對對應的pod的container的資源感興趣,如果感興趣,則進行調用,否則則會進行跳過
func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
if h.managedResources.Len() == 0 {
return true
}
// pod的容器
if h.hasManagedResources(pod.Spec.Containers) {
return true
}
// pod的初始化容器
if h.hasManagedResources(pod.Spec.InitContainers) {
return true
}
return false
}
func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
for i := range containers {
container := &containers[i]
// 檢查container的requests里面是否有感興趣的資源
for resourceName := range container.Resources.Requests {
if h.managedResources.Has(string(resourceName)) {
return true
}
}
// 檢查container的limits里面是否有感興趣的資源
for resourceName := range container.Resources.Limits {
if h.managedResources.Has(string(resourceName)) {
return true
}
}
}
return false
}
2.3 過濾接口Filter
Filter主要是用於在預選階段完成后調用extender進行二次過濾
2.3.1 循環串行調用
在findNodesThatPassExtenders中會遍歷所有的extender來確定是否關心對應的資源,如果關心就會調用Filter接口來進行遠程調用,並將篩選結果傳遞給下一個extender,逐步縮小篩選集合,注意這個階段的插件調用是串行,因為每個插件都以上個插件的結果來繼續篩選
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
for _, extender := range g.extenders {
if len(filtered) == 0 {
break
}
// 判斷對應的extender是否關心pod中容器的資源
if !extender.IsInterested(pod) {
continue
}
// 進行遠程過程的調用
filteredList, failedMap, err := extender.Filter(pod, filtered)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
}
return nil, err
}
// 通過結果
for failedNodeName, failedMsg := range failedMap {
if _, found := statuses[failedNodeName]; !found {
statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else {
statuses[failedNodeName].AppendReason(failedMsg)
}
}
// 傳遞給下一個extender之前的FIlter結果
filtered = filteredList
}
return filtered, nil
}
2.3.2 遠程過濾接口
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node,
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
var (
result extenderv1.ExtenderFilterResult
nodeList *v1.NodeList
nodeNames *[]string
nodeResult []*v1.Node
args *extenderv1.ExtenderArgs
)
fromNodeName := make(map[string]*v1.Node)
for _, n := range nodes {
fromNodeName[n.Name] = n
}
if h.filterVerb == "" {
return nodes, extenderv1.FailedNodesMap{}, nil
}
// 根據nodeCacheCapable來進行參數的傳遞
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
args = &extenderv1.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
// 調用對應service的filter接口
if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
}
// 根據nodeCacheCapable和結果來進行結果數據的組合
if h.nodeCacheCapable && result.NodeNames != nil {
nodeResult = make([]*v1.Node, len(*result.NodeNames))
for i, nodeName := range *result.NodeNames {
if n, ok := fromNodeName[nodeName]; ok {
nodeResult[i] = n
} else {
return nil, nil, fmt.Errorf(
"extender %q claims a filtered node %q which is not found in the input node list",
h.extenderURL, nodeName)
}
}
} else if result.Nodes != nil {
nodeResult = make([]*v1.Node, len(result.Nodes.Items))
for i := range result.Nodes.Items {
nodeResult[i] = &result.Nodes.Items[i]
}
}
return nodeResult, result.FailedNodes, nil
}
2.4 優先級接口Prioritize
2.4.1 並行優先級統計
優先級階段調用extender插件是並行的,通過並行的調用extender獲取主機結果,然后再串行的匯總結果,計算算法為:主機得分=得分*當前extender的優先級
var mu sync.Mutex
var wg sync.WaitGroup
combinedScores := make(map[string]int64, len(nodes))
for i := range g.extenders {
if !g.extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
// 並行調用 extender
go func(extIndex int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()
wg.Done()
}()
prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
}
mu.Lock()
// 串行進行結果的匯總
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
if klog.V(10) {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score)
}
// 主機的結果=得分*當前extender的優先級
combinedScores[host] += score * weight
}
mu.Unlock()
}(i)
}
// wait for all go routines to finish
wg.Wait()
2.4.2 合並優先級結果
結果匯總的得分,在當前版本中的計算:主機得分=主機得分*(100/10),
for i := range result {
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, therefore we need to scale the score returned by extenders to the score range used by the scheduler.
result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
}
2.4.3 優先級接口調用
優先級調用接口跟Filter流程上都是一樣的,只需要拼接傳遞數據,然后返回結果即可,不同的是返回結果中會返回當前extender的優先級,以用於后續計算
func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) {
var (
result extenderv1.HostPriorityList
nodeList *v1.NodeList
nodeNames *[]string
args *extenderv1.ExtenderArgs
)
if h.prioritizeVerb == "" {
result := extenderv1.HostPriorityList{}
for _, node := range nodes {
result = append(result, extenderv1.HostPriority{Host: node.Name, Score: 0})
}
return &result, 0, nil
}
// 根據node cache來進行傳遞參數的構建
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
args = &extenderv1.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
if err := h.send(h.prioritizeVerb, args, &result); err != nil {
return nil, 0, err
}
// 返回結果
return &result, h.weight, nil
}
2.5 綁定階段
綁定階段其實就只需要把當前結果傳遞給對應的插件,即可
func (h *HTTPExtender) Bind(binding *v1.Binding) error {
var result extenderv1.ExtenderBindingResult
if !h.IsBinder() {
// This shouldn't happen as this extender wouldn't have become a Binder.
return fmt.Errorf("Unexpected empty bindVerb in extender")
}
req := &extenderv1.ExtenderBindingArgs{
PodName: binding.Name,
PodNamespace: binding.Namespace,
PodUID: binding.UID,
Node: binding.Target.Name,
}
if err := h.send(h.bindVerb, &req, &result); err != nil {
return err
}
if result.Error != "" {
return fmt.Errorf(result.Error)
}
return nil
}
新年回來第一次更新,文章內容相對簡單一點,今天就到這里了,謝謝大佬們觀看,希望對大佬們有用,擴展機制的后續總結會在分析完framework之后,希望大佬們能幫轉發下,謝謝大家
微信號:baxiaoshi2020 歡迎一起交流學習分享,有個小群歡迎大佬光臨
個人博客: www.sreguide.com