基於K8s調度器實現自定義調度


背景:為了實現基於K8s的數據庫服務的調度功能
 
難點:  1,原生K8s的資源只有cpu和mem,但是MySQL調度需要考慮磁盤資源,
            2,原生調度策略不符合線上環境,比如線上容器和物理機存在混跑,服務存在定制策略等
 
方案: 1,基於K8s 調度器的源碼進行修改,定制化調度器,所有服務器調度時指定新調度器實現自定義策略
            2,將需要的元數據,比如MySQL端口容量,服務器磁盤容量等信息通過腳本同步到K8s中的annotations中
 
流程:
            1,下載調度器源碼至本地服務器
 可從官方下載,此處原為內部git地址,省去
            2,本地修改完成后,編譯,
CGO_ENABLED=0 GOOS=linux GOARCH=amd64  make all WHAT=cmd/kube-scheduler/
            3,編譯完成&&沒有報錯,打包images 並 push到倉庫
docker build -f Dockerfile -t registry.xxxx.xxxx.com.cn/xxxx/scheduler:1.0 .
docker login registry.xxx.xxx.com.cn -u xxxx
docker push registry.xxxx.xxxx.com.cn/xxxx/scheduler
            4,在南北K8s集群上找到運行着的 scheduler,並delete,scheduler以deployment方式部署,delete完成后確認下新scheduler是否running即可
kubectl get pod -n kube-system | grep kube-scheduler-db
kubectl delete pod kube-scheduler-db-xxxxxx-xxxxxxx -n kube-system
            5,至此 scheduler 更新流程流程,之后可以觀察修改生效情況,重大更新建議先測試再正式上線
 
修改的細節(舉例)
            scheduler 的目錄結構如下
     
            我們需要修改的部分集中在\kubernetes-develop\pkg\scheduler\algorithm目錄下,
 
該目錄下有兩個文件夾,這兩個目錄下每一個文件代表着一種策略
predicates 此目錄下為初選策略,初選即為過濾,通過此策略將不滿足硬性標准的服務器淘汰
priorities 此目錄下為優選策略,優選即為打分,通過此策略將所有通過初選的服務器打分,選取分數最高服務器進行調度
 
predicates 中以xxxx_disk_predicate.go 為例,此策略過濾不滿足MySQL磁盤空間要求的服務器,實際代碼以線上為准
package predicates
 
import (
       "fmt"
       "k8s.io/api/core/v1"
       "k8s.io/kubernetes/pkg/scheduler/algorithm"
       schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
 
func NevisDiskPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 
       var remainDiskPercentage int64 = 10 //disk最少剩余10%
       var requestedDisk int64 //node總的已request的值
       var HoldDiskMin int64 = 250 //剩余容量保底值
       node := nodeInfo.Node() //這里獲得的是正在進行判斷的node實例
 
       if node == nil {  // node無效就直接退出
              return false, nil, fmt.Errorf("node not found")
       }
       
       //獲取服務類型 關於磁盤調度目前只對MySQL和pika生效,所以需要判斷服務器的資源池
       scheduleService := pod.Annotations[AnnotationSchedulerService]
 
       requestDiskCurrentPod := NevisPodDISKRequest(pod) // 當前POD的disk request
       for _, p := range nodeInfo.Pods() { // 獲取node上每個pod的diskrequest之和 requestedDISK
              requestDiskPerPod := xxxxPodDISKRequest(p)
              requestedDisk += requestDiskPerPod
       }
 
 
       nodeFreeDisk := xxxxNodeDISKFree(node) //獲取node的free disk  
       nodeTotalDisk := xxxxNodeDISKTotal(node) //獲取node的total disk
        //NevisNodeDISKFree 和 NevisNodeDISKTotal 函數在utils.go里,具體的是獲取node的annotations數據,
        //這里需要注意線上的annotations 值的各種狀態都需要考慮,缺省值需要設定,否則會影響之后的判斷
 
       // 過濾策略
       // 1,已經request 加 即將request的 要小於 totaldisk的90%,
       // 2,freedisk 減 即將request的 要大於 totaldisk的10%
       // 3,考慮到有些服務器磁盤容量很小(<4T),增加最低保留空間 為 250G
        // 策略需要注意線上是容器和物理機混跑狀態,需要考慮的全面一些,
 
       HoldDisk := int64(nodeTotalDisk * remainDiskPercentage/100)
       if HoldDisk < HoldDiskMin {
              HoldDisk = HoldDiskMin
       }
 
       if ((scheduleService == SchedulerServiceMySQL) || (scheduleService == SchedulerServicePika)){
        //進行資源池判斷,不滿足條件的服務器直接過濾
        // 關於服務類型的判斷應該放在函數的入口,以避免無效的運算,這里是舉例就不修改了
              if  (int64(requestedDisk) + int64(requestDiskCurrentPod) > int64(nodeTotalDisk - HoldDisk)|| ((nodeFreeDisk - int64(requestDiskCurrentPod)) < HoldDisk)) {
                     return false, []algorithm.PredicateFailureReason{
                            &PredicateFailureError{
                                   PredicateName: "xxxxDiskPredicate",
                                   PredicateDesc: fmt.Sprintf("node doesn't has enough DISK request %d, requested %d", requestDiskCurrentPod, requestedDISK),
                            },
                     }, nil
              }
              return true, nil, nil
       }
       return true, nil, nil
}
 
func init() {
       predicates := Ordering()
       predicates = append(predicates, NevisDiskPred)
       SetPredicatesOrdering(predicates)
}

 

priorities 中以more_mem.go 為例,此策略將給有更多空閑內存的服務器打高分,實際代碼以線上為准
package priorities
 
import (
       "fmt"
       "strconv"
       "k8s.io/klog"
       "k8s.io/api/core/v1"
       "k8s.io/apimachinery/pkg/labels"
       v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
       schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
       schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
 
func CalculateNodeMoreMemMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
 
 
       node := nodeInfo.Node() 
       if node == nil {
              return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
       }
 
       var allMemory int64 //服務器的總內存
       var usedMemory int64 //服務器已使用內存
       allMemory = NevisNodeMemAll(node)
 
        //此處是通過node的10255端口獲取node的實例mem使用量,講道理邏輯里應該盡量不適用這種調用,因為可能拖慢調度的速度,因為這是所有過初選的node都需要執行的
                // 但是由於數據庫本身不會存在非常頻繁的調度,而且實現非常簡單,所以如此實現了
       resp, err := (&http.Client{Timeout: 5 * time.Second}).Get("http://" + node.Status.Addresses[0].Address + ":10255/stats")
              if err != nil {
                     klog.Warningf("get node status failed: %s", err)
              } else {
                     var ci cv1.ContainerInfo
                     if err := json.NewDecoder(resp.Body).Decode(&ci); err != nil {
                            resp.Body.Close()
                            klog.Warningf("decode node status failed: %s", err)
                     } else {
                            usedMemory = int64(ci.Stats[0].Memory.RSS)
                     }
              }
 
        // 打分策略,如果內存沒有使用,即為10分,全部用完為0分,按比例分配
       count := 10 - (usedMemory / allMemory * int64(10))
       // 這里在打log。相關的log可通過 kubectl logs kube-scheduler-db-xxxx-xxxx -n kube-system 查看
       klog.Warningf("message i want", count) 
        //返回打分集合,注意下格式即可
       return schedulerapi.HostPriority{
              Host:  node.Name,
              Score: int(count),
       }, nil
}
 
// 實際去計算打分結果的時候 是采用map - reduce的方式,了解hadoop原理的應該知道,這里不多說,感興趣可自行了解
var CalculateNodeMoreMemReduce = NormalizeReduce(schedulerapi.MaxPriority, false)

程序中還包含一些工具類,放在utils.go下,此處省略

在寫完所有的調度策略后,別急還沒完,我們還需要去工廠函數里注冊下我們新加的策略
位置在\pkg\scheduler\algorithmprovider\defaults\defaults.go
 
func defaultPredicates() 里增加關於初選策略的函數,如
factory.RegisterFitPredicate(predicates.xxxDiskPred, predicates.xxxDiskPredicate),
 
在 func defaultPriorities() 里增加關於優選策略的函數,如
factory.RegisterPriorityFunction2("MoreCpu", priorities.CalculateNodeMoreMemMap, priorities.CalculateNodeMoreMemReduce, 1000000),
在優選策略中需要注意的是傳入的最后一個值為權重,此處為1000000,用於設置不同優選策略的權重,一個node的實際分數為所有 優選策略分數 * 權重 后之和
這里設置為如此大的數是為了讓此策略成為 最重要的決定值,其余的策略目前我們並不關心,在之后的開發中可以是當修改,
其他自帶的優選策略共有6種,除了NodePreferAvoidPodsPriority權重為10000外,其余均為1,這幾種優選策略也比較好理解,自行看下代碼即可,不說了
 
 
最后總結下已經做出的修改
初選   
        函數名:xxxxDiskPredicate  
        作用服務: MySQL && pika 
        目標:過濾所有不滿足磁盤空間要求的服務器,
        策略:all_request_disk + request_disk <  total_disk * 90%
                  free_disk - request_disk > total_disk * 10%
 
 
        函數名:xxxxMemoryMysqlPredicate
        作用服務:MySQL
        目標:過濾滿足內存要求的服務器
        策略: free_mem - request_mem > total_mem * 10% 
         
 
        函數名:xxxxCPUPredicate
        作用服務:所有
        目標:過濾不滿足 cpu 要求的服務器
        策略:free_cpu > request_cpu
        
        函數名:xxxxMemoryPredicate
        作用服務:redis
        目標:過濾 不滿足 mem 要求的服務器
        策略:需要滿足redis的降級和非降級兩種模式,具體看代碼吧,寫出來廢紙
 
 
優選 
        函數名:CalculateNodeMoreMemMap
        作用服務:MySQL
        目標:提高 空閑mem多服務器的分數
        策略(free-mem / total-mem)* 10 * 權重
 
 
 
 
 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM