[源碼解析] 深度學習分布式訓練框架 horovod (20) --- Elastic Training Operator


[源碼解析] 深度學習分布式訓練框架 horovod (20) --- Elastic Training Operator

0x00 摘要

Horovod 是一款基於 AllReduce 的分布式訓練框架。憑借其對 TensorFlow、PyTorch 等主流深度學習框架的支持,以及通信優化等特點,Horovod 被廣泛應用於數據並行的訓練中。

本文是 horovod on k8s 的最后一篇,看看 MPI-Operator 可能被如何改進,主要就是根據 Elastic Training Operator 作者 團隊的博客內容來學習源碼。所以本文以大量源碼為主。

本系列其他文章鏈接如下:

[源碼解析] 深度學習分布式訓練框架 Horovod (1) --- 基礎知識

[源碼解析] 深度學習分布式訓練框架 horovod (2) --- 從使用者角度切入

[源碼解析] 深度學習分布式訓練框架 horovod (3) --- Horovodrun背后做了什么

[源碼解析] 深度學習分布式訓練框架 horovod (4) --- 網絡基礎 & Driver

[源碼解析] 深度學習分布式訓練框架 horovod (5) --- 融合框架

[源碼解析] 深度學習分布式訓練框架 horovod (6) --- 后台線程架構

[源碼解析] 深度學習分布式訓練框架 horovod (7) --- DistributedOptimizer

[源碼解析] 深度學習分布式訓練框架 horovod (8) --- on spark

[源碼解析] 深度學習分布式訓練框架 horovod (9) --- 啟動 on spark

[源碼解析] 深度學習分布式訓練框架 horovod (10) --- run on spark

[源碼解析] 深度學習分布式訓練框架 horovod (11) --- on spark --- GLOO 方案

[源碼解析] 深度學習分布式訓練框架 horovod (12) --- 彈性訓練總體架構

[源碼解析] 深度學習分布式訓練框架 horovod (13) --- 彈性訓練之 Driver

[源碼解析] 深度學習分布式訓練框架 horovod (14) --- 彈性訓練發現節點 & State

[源碼解析] 深度學習分布式訓練框架 horovod (15) --- 廣播 & 通知

[源碼解析] 深度學習分布式訓練框架 horovod (16) --- 彈性訓練之Worker生命周期

[源碼解析] 深度學習分布式訓練框架 horovod (17) --- 彈性訓練之容錯

[源碼解析] 深度學習分布式訓練框架 horovod (18) --- kubeflow tf-operator

[源碼解析] 深度學習分布式訓練框架 horovod (17) --- 彈性訓練之容錯

[源碼解析] 深度學習分布式訓練框架 horovod (18) --- kubeflow tf-operator

[源碼解析] 深度學習分布式訓練框架 horovod (19) --- kubeflow MPI-operator

0x01 背景知識

0x01, 0x02 兩節均來自於 Elastic Training Operator 團隊博客內容,這個博客真得很給力。

1.1 已有彈性能力

Kubernetes 和雲計算提供敏捷性和伸縮性,我們可以通過 cluster-AutoScaler 等組件為訓練任務設置彈性策略,利用 Kubernetes 的彈性能力,按需創建,減少 GPU 設備空轉。

但這種伸縮模式面對訓練這種離線任務還是略有不足:

  • 不支持容錯,當部分 Worker 由於設備原因失敗,整個任務需要停止重來。
  • 訓練任務一般時間較長,占用算力大,任務缺少彈性能力。當資源不足時,除非任務終止,無法按需為其他業務騰出資源。
  • 訓練任務時間較長,不支持 worker 動態配置, 無法安全地使用搶占實例,發揮雲上最大性價比

如何給訓練任務賦予彈性能力,是提高性價比的關鍵路徑。近期 horovod 等分布式框架逐漸支持了 Elastic Training,即彈性訓練能力。也就是允許一個訓練任務在執行的過程中動態的擴容或者縮容訓練 worker, 從不會引起訓練任務的中斷。需要在代碼中做少量修改適配,可參考:https://horovod.readthedocs.io/en/stable/elastic_include.html。

1.2 mpi-operator 的缺點

在 mpi-operator 中,參與訓練的 Worker 都是作為靜態資源設計和維護,支持彈性訓練模式后,給任務增加了靈活性,同時也給運維層帶來了挑戰,例如:

  • 必須通過 horovod 提供的 horovordrun 作為入口,horovod 中 launcher 通過 ssh 登陸 worker,需要打通 launcher 和 worker 之間的登陸隧道。
  • 負責計算彈性的 Elastic Driver 模塊通過指定 discover_host 腳本獲取最新 worker 拓撲信息,從而拉起或停止 worker 實例。當 worker 變化時,首先要更新 discover_host 腳本的返回值。
  • 在搶占或價格計算等場景中,有時需要指定 worker 縮容,K8s 原生的編排元語 deployment,statefulset 無法滿足指定縮容的場景。

針對以上問題,我們設計開發了 et-operator,提供 TrainingJob CRD 描述訓練任務, ScaleOut 和 ScaleIn CRD 描述擴容和縮容操作, 通過它們的組合,使我們的訓練任務更具有彈性。將這個方案開源,歡迎大家提需求、交流、吐槽。

開源方案地址:https://github.com/AliyunContainerService/et-operator

0x02 總體架構

TrainingJob Controller 主要有以下功能:

  • 維護 TrainingJob 的創建/刪除生命周期,以及子資源管理。
  • 執行擴縮容操作。
  • 容錯,當 worker 被驅逐,創建新的 worker 加入到訓練中。

2.1 資源創建

TrainingJob 子資源創建順序如下:

  • 創建打通 ssh 所需的密鑰對, 創建 secret。
  • 創建 workers,包含 service 和 pod,掛載 secret 公鑰。
  • 創建 configmap, 包含 discover_host 腳本 , hostfile 文件。
  • 創建 launcher,掛載 configmap。由於 hostfile 后續會隨着拓撲關系修改,所以 hostfile 單獨通過 initcontainer 從 configmap 拷貝到單獨目錄。

TrainingJob 相關資源:

2.png

2.2 角色

TrainingJob CR 的配置分為 Lanucher 和 Worker。在 Launcher 中指定任務的鏡像和啟動執行, 默認 et-operator 會根據 worker 分配情況,生成一個 hostfile 文件和 discover_host 腳本,discover_host 腳本掛載到 Launcher 的 /etc/edl/discover_hosts.sh 文件, 在入口腳本的 horovodrun 執行中通過 --host-discovery-script 參數指定。在 Worker 設置中指定 worker 的鏡像和 GPU 占用 ,並可以通過 maxReplicas / minReplicas 指定 workers 的副本數允許范圍。

3.png

2.3 程序主流程

程序主流程圖如下:

0x03 入口

其實,學習 ETO 主要就是學習如何擴容和縮容。但是為了學習這個,我們還是需要梳理一下程序邏輯

不熟悉 K8S 的同學順便也一起看看其 CRD 如何使用。

3.1 創建

入口代碼是 main.go/main 函數,從入口可以看出,

  • 生成了 Controller.Manager。
  • 利用這個 Manager,構建了三個 Reconciler :TrainingJobReconciler,ScaleInReconciler,ScaleOutReconciler。
  • 然后啟動 Manager;
func main() {
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:             scheme,
		MetricsBindAddress: metricsAddr,
		LeaderElection:     enableLeaderElection,
		Port:               9443,
	})

	const jobPollInterval = "5s"
  
	if err = controllers.NewReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
		os.Exit(1)
	}
	if err = controllers.NewScaleOutReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
		os.Exit(1)
	}
	if err = controllers.NewScaleInReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
		os.Exit(1)
	}

	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		os.Exit(1)
	}
}

3.2 設置

這里的配置就是建立了消息的響應函數,具體就是響應哪些 CR。

  • 除了 TrainingJob 外,et-operator 同時支持 ScaleOut 和 ScaleIn 兩種 CRD,下發訓練任務擴容和縮容操作。

  • 當下發一個 ScaleOut CR,ScaleOutController 觸發 Reconcile, 這里工作很簡單,根據 ScaleOut CR 中的 Selector 字段,找到 Scaler 對應的 TrainingJob,設置到 CR 的 OwnerReferences 上。

  • TrainingJobController 中監聽到屬於 TrainingJob 的 ScaleOut CR 有更新, 觸發 TrainingJob 的 Reconcile,遍歷過濾 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根據創建時間和狀態時間決定執行的擴容或者縮容。

  • 執行縮容時,可以通過 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定縮容的 worker。通過 count 配置縮容的數量,則通過 index 計算由高到低縮容 Worker。

func (r *ScaleInReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&kaiv1alpha1.ScaleIn{}).
		Complete(r)
}

func (r *ScaleOutReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&kaiv1alpha1.ScaleOut{}).
		Complete(r)
}

func (r *TrainingJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&kaiv1alpha1.TrainingJob{}).
		Owns(&kaiv1alpha1.ScaleIn{}).
		Owns(&kaiv1alpha1.ScaleOut{}).
		Owns(&corev1.Pod{}).
		Owns(&corev1.Service{}).
		Owns(&corev1.ConfigMap{}).
		Owns(&corev1.Secret{}).
		// Ignore status-only and metadata-only updates
		//WithEventFilter(predicate.GenerationChangedPredicate{}).
		Complete(r)
}

0x04 TrainingJobReconciler

順着代碼梳理一下,尋找其設計思想精微之處。

4.1 Reconcile

k8s operator 中reconcile方法 的作用就是不斷的watch,當資源變化時 就會觸發reconcile方法,理論上有多少次的變化就會執行多少次的reconcile方法。

當有消息來的時候,Reconcile 方法會得到調用。

func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	// Fetch latest training job instance.
	sharedTrainingJob := &kaiv1alpha1.TrainingJob{}
	err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob)
	trainingJob := sharedTrainingJob.DeepCopy()
	// Check reconcile is required.
	// No need to do reconcile or job has been deleted.
	r.Scheme.Default(trainingJob)
	return r.ReconcileJobs(trainingJob)
}

4.2 ReconcileJobs

因為消息中狀態是 "",所以運行了 initializeJob,並且進行 reconcileResource。

func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
	oldJobStatus := job.Status.DeepCopy()

	defer func() {
		latestJob := &kaiv1alpha1.TrainingJob{}
		err := r.Get(context.Background(), types.NamespacedName{
			Name:      job.Name,
			Namespace: job.Namespace,
		}, latestJob)
		if err == nil {
			if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion {
				latestJob.Status = job.Status
				job = latestJob
			}
		}
		r.updateObjectStatus(job, oldJobStatus)
	}()

	switch job.Status.Phase {
    case commonv1.JobSucceeded, commonv1.JobFailed:
      err = r.cleanup(job)
    case "", commonv1.JobCreated: // 如果狀態為空 或者 JobCreated,則初始化
      r.initializeJob(job)
      err = r.reconcileResource(job)
    case commonv1.JobRunning:
      err = r.reconcileJobRunning(job)
    case commonv1.Scaling:
      err = r.executeScaling(job)
	}

	if err != nil {
		if IsRequeueError(err) {
			return RequeueAfterInterval(r.PollInterval, nil)
		}
		return RequeueAfterInterval(r.PollInterval, err)
	}
	return NoRequeue()
}

4.3 reconcileResource

reconcileResource 其實就是調用 doSteps,調用一個狀態機繼續初始化。

func (r *TrainingJobReconciler) reconcileResource(job *kaiv1alpha1.TrainingJob) error {
	steps := r.newSteps()
	err := r.doSteps(job, steps)
	return err
}

4.4 doSteps

newSteps 構建了一個簡單的狀態機,是一個初始化步驟,按照序列執行,doSteps 會根據狀態進行不同的分支處理。

有幾點需要說明:

  • Created 之后的幾個狀態,應該是: WorkersCreated ---> WorkersReady ----> LauncherCreated ---> JobRunning
  • 這個是事后狀態,即對應 action 完成之后應該達到的狀態。
  • 在 for 循環之中,如果當前 Job 已經達到了某個狀態,就跳過繼續,直到某一個未完狀態,就去執行對應的action。所以理論上說,會從 WorkersCreated 逐步執行到 JobRunning。
  • 在某個狀態對應的 Action 中,執行完成之后,會設置 Job 為這個 完成狀態。

代碼如下:

func (r *TrainingJobReconciler) newSteps() []Step {
	return []Step{
		Step{
			JobCondition: commonv1.WorkersCreated,
			Action:       r.createTrainingJobWorkers,
		},
		Step{
			JobCondition: commonv1.WorkersReady,
			Action:       r.waitWorkersRunning,
		},
		Step{
			JobCondition: commonv1.LauncherCreated,
			Action:       r.createLauncher,
		},
		Step{
			JobCondition: commonv1.JobRunning,
			Action:       r.syncLauncherState,
		},
	}
}

func (r *TrainingJobReconciler) doSteps(job *kaiv1alpha1.TrainingJob, steps []Step) error {
	for _, step := range steps {
		if hasCondition(*job.GetJobStatus(), step.JobCondition) {
			continue
		}
		err := step.Action(job)
		break
	}
	return nil
}

所以具體如下:

           Request("")
K8S  +-------------------->  Reconcile
                                 +
                                 |
                                 |
                                 v
          +----------------------+---------------------+
          |                 ReconcileJobs              |
          |                      +                     |
          |                      |                     |
          |        +------------------------------+    |
          |        |             |                |    |
          |        v             v                v    |
          |  "", JobCreated   JobRunning      Scaling  |
          +--------+-----------------------------------+
                   |
                   |
                   v
           reconcileResource
                   +
                   |
                   |
                   v
         +---------+---------------+
         | doSteps                 |
         |                         |
         |                         |
         |     WorkersCreated +---------> createTrainingJobWorkers
         |                         |
         |                         |
         |     WorkersReady  +----------> waitWorkersRunning
         |                         |
         |                         |
         |     LauncherCreated +--------> createLauncher
         |                         |
         |                         |
         |     JobRunning  +------------> syncLauncherState
         |                         |
         +-------------------------+

4.5 createTrainingJobWorkers

在 doSteps 步驟中,先來到 createTrainingJobWorkers 這個Action。這里會設置 Job 狀態為 WorkersCreated。

func (r *TrainingJobReconciler) createTrainingJobWorkers(job *kaiv1alpha1.TrainingJob) error {
	if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
		if cm, err := r.GetOrCreateSecret(job); cm == nil || err != nil {
			updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg)
			return nil
		}
	}

	workers := getJobReplicasWorkers(job)
	job.Status.TargetWorkers = workers
    
    // 創建worker
	if err := r.CreateWorkers(job, workers); err != nil {
		updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg)
		return nil
	}
    // 設置新狀態
	updateJobConditions(job.GetJobStatus(), common.WorkersCreated, "", msg)
	return nil
}

4.5.1 CreateWorkers

CreateWorkers 會進行創建worker,如本文前面介紹,worker 包含 service 和 pod,所以創建過程具體為:

  • 調用 另一個同名函數CreateWorkers 來間接創建 workerService。

  • 調用 newWorker 去創建 Pod。

func (r *TrainingJobReconciler) CreateWorkers(job *kaiv1alpha1.TrainingJob, workers []string) error {
	return r.createWorkers(job, workers, func(name string, index string) *corev1.Pod {
		worker := newWorker(job, name, index)
		return worker
	})
}

4.5.1.1 createWorkers

這里會循環調用 createWorker 依據配置生成一系列 workers

func (r *TrainingJobReconciler) createWorkers(job *kaiv1alpha1.TrainingJob, workers []string, newPod PodTplGenerator) error {
    // 遍歷,創建
	for _, podName := range workers {
		index, err := getWorkerIndex(job.Name, podName)
		if err != nil {
			return err
		}
		_, err = r.createWorker(job, int32(index), newPod)
		if err != nil {
			return err
		}
	}
	return nil
}

4.5.1.2 createWorker

這里會依據參數對 worker Pod 進行判斷,如果不存在,則創建 某一個 worker

func (r *TrainingJobReconciler) createWorker(job *kaiv1alpha1.TrainingJob, index int32, workerPodTempl PodTplGenerator) (*corev1.Pod, error) {
	name := getWorkerName(job.Name, int(index))
	indexStr := strconv.Itoa(int(index))
	pod := &corev1.Pod{}
	nsn := types.NamespacedName{
		Name:      name,
		Namespace: job.Namespace,
	}
	err := r.Get(context.Background(), nsn, pod)

	if err != nil {
		// If the worker Pod doesn't exist, we'll create it.
		if errors.IsNotFound(err) {
            // 如果沒有pod,這里也可以創建pod
			worker := workerPodTempl(name, indexStr)
			if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
				util.MountRsaKey(worker, job.Name)
			}
			if err = r.Create(context.Background(), worker); err != nil {
				return nil, err
			}
		} 
	}

	service := &corev1.Service{}
	err = r.Get(context.Background(), nsn, service)
	if errors.IsNotFound(err) {
        // 調用newService 進行具體創建
		err = r.Create(context.Background(), newService(job, name, indexStr))
	}
	return nil, nil
}

4.5.1.3 newService

這里才來到具體創建service,真是百轉千回。

func newService(obj interface{}, name string, index string) *corev1.Service {
	job, _ := obj.(*kaiv1alpha1.TrainingJob)
	labels := GenLabels(job.Name)
	labels[labelTrainingRoleType] = worker
	labels[replicaIndexLabel] = index
	return &corev1.Service{ // 具體創建
		ObjectMeta: metav1.ObjectMeta{
			Name:      name,
			Namespace: job.Namespace,
			Labels:    labels,
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
			},
		},
		Spec: corev1.ServiceSpec{
			ClusterIP: "None",
			Selector:  labels,
			Ports: []corev1.ServicePort{
				{
					Name: "ssh-port",
					Port: 22,
				},
			},
		},
	}
}


4.5.2 newWorker

newWorker 則構建了 Pod,就是比較常見的套路。

func newWorker(obj interface{}, name string, index string) *corev1.Pod {
	job, _ := obj.(*kaiv1alpha1.TrainingJob)
	labels := GenLabels(job.Name)
	labels[labelTrainingRoleType] = worker
	labels[replicaIndexLabel] = index
	podSpec := job.Spec.ETReplicaSpecs.Worker.Template.DeepCopy()

	// keep the labels which are set in PodTemplate
	if len(podSpec.Labels) == 0 {
		podSpec.Labels = make(map[string]string)
	}
	for key, value := range labels {
		podSpec.Labels[key] = value
	}

	// RestartPolicy=Never
	setRestartPolicy(podSpec)

	container := podSpec.Spec.Containers[0]

	// if we want to use ssh, will start sshd service firstly.
	if len(container.Command) == 0 {
		if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
			container.Command = []string{"sh", "-c", "/usr/sbin/sshd  && sleep 365d"}
		} else {
			container.Command = []string{"sh", "-c", "sleep 365d"}
		}
	}
	podSpec.Spec.Containers[0] = container

    // 創建了pod
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:        name,
			Namespace:   job.Namespace,
			Labels:      podSpec.Labels,
			Annotations: podSpec.Annotations,
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
			},
		},
		Spec: podSpec.Spec,
	}
}

邏輯如下:

           Request("")
K8S  +-------------------->  Reconcile
                                 +
                                 |
                                 |
                                 v
          +----------------------+---------------------+
          |                 ReconcileJobs              |
          |                      +                     |
          |                      |                     |
          |        +------------------------------+    |
          |        |             |                |    |
          |        v             v                v    |
          |  "", JobCreated   JobRunning      Scaling  |
          +--------+-----------------------------------+
                   |
                   |
                   v
           reconcileResource
                   +
                   |
                   |
                   v
         +---------+---------------+
         | doSteps                 |                                           +----> createWorkers +----> createWorker +----> newService
         |                         |                                           |
         |                         |                                           +
         |     WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers  +------->  newWorker +------> WorkersCreated
         |                         |
         |                         |
         |     WorkersReady  +----------> waitWorkersRunning
         |                         |
         |                         |
         |     LauncherCreated +--------> createLauncher
         |                         |
         |                         |
         |     JobRunning  +------------> syncLauncherState
         |                         |
         +-------------------------+

手機如下:

4.8 createLauncher

建立完 worker 之后,就開始建立 Launcher。所以繼續執行 createLauncher。

func (r *TrainingJobReconciler) createLauncher(job *kaiv1alpha1.TrainingJob) error {
	if _, err := r.GetOrCreateLauncherServiceAccount(job); err != nil {
		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
		return nil
	}
	if _, err := r.GetOrCreateLauncherRole(job, 0); err != nil {
		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
		return nil
	}
	if _, err := r.GetLauncherRoleBinding(job); err != nil {
		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
		return nil
	}

	if cm, err := r.CreateHostConfigMap(job); cm == nil || err != nil {
		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
		return nil
	}

	launcher, err := r.GetLauncherJob(job)

	if launcher == nil {
		if _, err := r.CreateLauncher(job); err != nil {
			updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
			return nil
		}
	}

	updateJobConditions(job.GetJobStatus(), commonv1.LauncherCreated, "", msg)
	return nil
}

我們取兩個重點步驟。

4.8.1 CreateHostConfigMap

這里獲取關於host的配置。

func (r *TrainingJobReconciler) CreateHostConfigMap(job *kaiv1alpha1.TrainingJob) (*corev1.ConfigMap, error) {
	return r.createConfigMap(job, newHostfileConfigMap)
}

func (r *TrainingJobReconciler) createConfigMap(job *kaiv1alpha1.TrainingJob, newCm func(job *kaiv1alpha1.TrainingJob) *corev1.ConfigMap) (*corev1.ConfigMap, error) {
	cm := &corev1.ConfigMap{}
	name := ctrl.Request{}
	name.NamespacedName.Namespace = job.GetNamespace()
	name.NamespacedName.Name = job.GetName() + configSuffix
	err := r.Get(context.Background(), name.NamespacedName, cm)
	if errors.IsNotFound(err) {
		if err = r.Create(context.Background(), newCm(job)); err != nil {
			return cm, err
		}
	}
	return cm, nil
}

4.8.2 創建pod

4.8.2.1 CreateLauncher

這里進行pod的創建

func (r *TrainingJobReconciler) CreateLauncher(obj interface{}) (*corev1.Pod, error) {
	job, ok := obj.(*kaiv1alpha1.TrainingJob)
	launcher := newLauncher(job) // 創建pod
	if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
		util.MountRsaKey(launcher, job.Name)
	}
	err := r.Create(context.Background(), launcher)
	return launcher, nil
}

4.8.2.2 newLauncher

這里就是具體構建 Pod。

func newLauncher(obj interface{}) *corev1.Pod {
	job, _ := obj.(*kaiv1alpha1.TrainingJob)
	launcherName := job.Name + launcherSuffix
	labels := GenLabels(job.Name)
	labels[labelTrainingRoleType] = launcher
	podSpec := job.Spec.ETReplicaSpecs.Launcher.Template.DeepCopy()
	// copy the labels and annotations to pod from PodTemplate
	if len(podSpec.Labels) == 0 {
		podSpec.Labels = make(map[string]string)
	}
	for key, value := range labels {
		podSpec.Labels[key] = value
	}
	podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, initContainer(job))
	//podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, kubedeliveryContainer())

	container := podSpec.Spec.Containers[0]
	container.VolumeMounts = append(container.VolumeMounts,
		corev1.VolumeMount{
			Name:      hostfileVolumeName,
			MountPath: hostfileMountPath,
		},
		corev1.VolumeMount{
			Name:      configVolumeName,
			MountPath: configMountPath,
		},
		corev1.VolumeMount{
			Name:      kubectlVolumeName,
			MountPath: kubectlMountPath,
		})

	if job.GetAttachMode() == kaiv1alpha1.AttachModeKubexec {
		container.Env = append(container.Env, corev1.EnvVar{
			Name:  "OMPI_MCA_plm_rsh_agent",
			Value: getKubexecPath(),
		})
	}
	podSpec.Spec.Containers[0] = container
	podSpec.Spec.ServiceAccountName = launcherName

	setRestartPolicy(podSpec)
	hostfileMode := int32(0444)
	scriptMode := int32(0555)

	podSpec.Spec.Volumes = append(podSpec.Spec.Volumes,
		corev1.Volume{
			Name: hostfileVolumeName,
			VolumeSource: corev1.VolumeSource{
				EmptyDir: &corev1.EmptyDirVolumeSource{},
			},
		},
		corev1.Volume{
			Name: kubectlVolumeName,
			VolumeSource: corev1.VolumeSource{
				EmptyDir: &corev1.EmptyDirVolumeSource{},
			},
		},
		corev1.Volume{
			Name: configVolumeName,
			VolumeSource: corev1.VolumeSource{
				ConfigMap: &corev1.ConfigMapVolumeSource{
					LocalObjectReference: corev1.LocalObjectReference{
						Name: job.Name + configSuffix,
					},
					Items: []corev1.KeyToPath{
						{
							Key:  hostfileName,
							Path: hostfileName,
							Mode: &hostfileMode,
						},
						{
							Key:  discoverHostName,
							Path: discoverHostName,
							Mode: &hostfileMode,
						},
						{
							Key:  kubexeclFileName,
							Path: kubexeclFileName,
							Mode: &scriptMode,
						},
					},
				},
			},
		})
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:        launcherName,
			Namespace:   job.Namespace,
			Labels:      podSpec.Labels,
			Annotations: podSpec.Annotations,
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
			},
		},
		Spec: podSpec.Spec,
	}
}

至此,一個新的訓練job被運行起來,其邏輯拓展如下:

           Request("")
K8S  --------------------->  Reconcile
                                 +
                                 |
                                 |
                                 v
          +----------------------+---------------------+
          |                 ReconcileJobs              |
          |                      +                     |
          |                      |                     |
          |        +------------------------------+    |
          |        |             |                |    |
          |        v             v                v    |
          |  "", JobCreated   JobRunning      Scaling  |
          +--------+-----------------------------------+
                   |
                   |
                   v
           reconcileResource
                   +
                   |
                   |
                   v
         +---------+---------------+
         | doSteps                 |                                           +----> createWorkers +----> createWorker +----> newService
         |                         |                                           |
         |                         |                                           |
         |     WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers  +------->  newWorker +------> WorkersCreated
         |                         |
         |                         |
         |     WorkersReady  +----------> waitWorkersRunning
         |                         |
         |                         |
         |     LauncherCreated +--------> createLauncher+----> CreateHostConfigMap +-----> CreateLauncher  +------>  newLauncher
         |                         |
         |                         |
         |     JobRunning  +------------> syncLauncherState
         |                         |
         +-------------------------+

手機如下:

完成了新job的創建,我們看看本文的關鍵技術點,scaleOut 和 scaleIn。

0x05 ScaleOut

5.1 思路

ScaleOut 任務 CR如下:

4.png

當下發一個 ScaleOut CR,ScaleOutController 觸發 Reconcile, 這里工作很簡單,根據 ScaleOut CR 中的 Selector 字段,找到 Scaler 對應的 TrainingJob,設置到 CR 的 OwnerReferences 上。

以一個 ScaleOut 操作舉例:

- apiVersion: kai.alibabacloud.com/v1alpha1
  kind: ScaleOut
  metadata:
    creationTimestamp: "2020-11-04T13:54:26Z
    name: scaleout-ptfnk
    namespace: default
    ownerReferences:
    - apiVersion: kai.alibabacloud.com/v1alpha1
      blockOwnerDeletion: true
      controller: true
      kind: TrainingJob
      name: elastic-training // 指向擴容對象TrainingJob
      uid: 075b9c4a-22f9-40ce-83c7-656b329a2b9e
  spec:
  selector:
    name: elastic-training
  toAdd:
    count: 2

5.2 Reconcile

當下發一個 ScaleOut CR,ScaleOutController 觸發 Reconcile。主要就是調用 setScalingOwner。

func (r *ScaleOutReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	scaleOut, err := getScaleOut(req.NamespacedName, r.Client)
	if err != nil {
		// Error reading the object - requeue the request.
		return RequeueImmediately()
	}
	if scaleOut == nil || scaleOut.DeletionTimestamp != nil {
		return NoRequeue()
	}

	if isScaleFinished(*scaleOut.GetJobStatus()) {
		return NoRequeue()
	}

  return setScalingOwner(r, scaleOut, r.PollInterval)
}

5.3 setScalingOwner

setScalingOwner 是關鍵之一。

這里主要是處理當 ScaleOut CR 沒有設置 OwnerReferences 的情況,就設置一個。

邏輯是 根據 ScaleOut CR 中的 Selector 字段,找到 Scaler 對應的 TrainingJob,設置到 CR 的 OwnerReferences 上。

func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) {
	ownerRefs := scaler.GetOwnerReferences()
	if len(ownerRefs) == 0 {
		trainingJob := &kaiv1alpha1.TrainingJob{}
		nsn := types.NamespacedName{}
		nsn.Namespace = scaler.GetNamespace()
		nsn.Name = scaler.GetSelector().Name
		err := r.Get(context.Background(), nsn, trainingJob)
		gvk := kaiv1alpha1.SchemeGroupVersionKind
		ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}))
		scaler.SetOwnerReferences(ownerRefs)

		initializeJobStatus(scaler.GetJobStatus())
		updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg)
		err = r.Status().Update(context.Background(), scaler)
		err = r.Update(context.Background(), scaler)
	}
	return NoRequeue()
}

// RequeueAfterInterval requeues after a duration when duration > 0 is specified.
func RequeueAfterInterval(interval time.Duration, err error) (ctrl.Result, error) {
	return ctrl.Result{RequeueAfter: interval}, err
}

5.4 TrainingJobController

TrainingJobController 中監聽到屬於 TrainingJob 的 ScaleOut CR 有更新, 觸發 TrainingJob 的 Reconcile,遍歷過濾 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根據創建時間和狀態時間決定執行的擴容或者縮容

5.4.1 Reconcile

func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

	rlog := r.Log.WithValues("trainingjob", req.NamespacedName)
	// Fetch latest training job instance.
	sharedTrainingJob := &kaiv1alpha1.TrainingJob{}
	err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob)

	trainingJob := sharedTrainingJob.DeepCopy()
	// Check reconcile is required.
	// No need to do reconcile or job has been deleted.

	r.Scheme.Default(trainingJob)

	return r.ReconcileJobs(trainingJob)
}

5.4.2 ReconcileJobs

func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
	oldJobStatus := job.Status.DeepCopy()

	logger.Infof("jobName: %v, phase %s", job.Name, job.Status.Phase)

	defer func() {
		latestJob := &kaiv1alpha1.TrainingJob{}
		err := r.Get(context.Background(), types.NamespacedName{
			Name:      job.Name,
			Namespace: job.Namespace,
		}, latestJob)
		if err == nil {
			if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion {
				latestJob.Status = job.Status
				job = latestJob
			}
		}
		r.updateObjectStatus(job, oldJobStatus)
	}()

	switch job.Status.Phase {
	case commonv1.JobSucceeded, commonv1.JobFailed:
		err = r.cleanup(job)
	case "", commonv1.JobCreated:
		r.initializeJob(job)
		err = r.reconcileResource(job)
	case commonv1.JobRunning:
		err = r.reconcileJobRunning(job)
	case commonv1.Scaling:
		err = r.executeScaling(job)
	default:
		logger.Warnf("job %s unknown status %s", job.Name, job.Status.Phase)
	}

	if err != nil {
		if IsRequeueError(err) {
			return RequeueAfterInterval(r.PollInterval, nil)
		}
		return RequeueAfterInterval(r.PollInterval, err)
	}
	return NoRequeue()
}

以下根據當前 job 狀態不同,就有兩條線,先是 JobRunning ,然后是 Scaling,最后恢復成 JobRunning。

我們一一分析。

5.5 JobRunning

首先是來到 JobRunning 狀態,我們依次看看如何處理。

5.5.1 reconcileJobRunning

func (r *TrainingJobReconciler) reconcileJobRunning(job *kaiv1alpha1.TrainingJob) error {
	if err := r.syncLauncherState(job); err != nil {
		return err
	}
	if err := r.syncWorkersState(job); err != nil {
		return err
	}

	if job.Status.Phase == commonv1.JobRunning {
		return r.setTrainingJobScaler(job) // 既然是JobRunning狀態,就可以開始進行設置scaler
	}

	return nil
}

5.5.2 setTrainingJobScaler

首先,通過 availableScaleOutList 或者 availableScaleInList ,然后進行update。

func (r *TrainingJobReconciler) setTrainingJobScaler(job *kaiv1alpha1.TrainingJob) error {
	scaleOut, err := r.availableScaleOutList(job) // 找到scaleout list

	scaleIn, err := r.availableScaleInList(job) // 找到scaleIn list

	scalerList := append(scaleOut, scaleIn...) // 合並

	// Select the latest scaling job
	r.updateLatestScaler(job, scalerList) // 開始設置
	return nil
}

5.5.3 updateLatestScaler

依據創建時間和狀態時間,找到最后一個Scaler。

func (r *TrainingJobReconciler) updateLatestScaler(job *kaiv1alpha1.TrainingJob, scalers []Scaler) error {
	var latestScaler Scaler
	if len(scalers) == 0 {
		return nil
	}
	for i, _ := range scalers {
		scalerItem := scalers[i]
        // 依據創建時間和狀態時間,找到最后一個Scaler
		if latestScaler == nil || latestScaler.GetCreationTimestamp().Time.Before(scalerItem.GetCreationTimestamp().Time) {
			latestScaler = scalerItem
		}
	}
	return r.updateCurrentScaler(job, latestScaler)
}

5.5.4 updateCurrentScaler

對找到的scaler進行設置。

func (r *TrainingJobReconciler) updateCurrentScaler(job *kaiv1alpha1.TrainingJob, scaleItem Scaler) error {
	job.Status.CurrentScaler = scaleItem.GetFullName()
	msg := fmt.Sprintf("trainingJobob(%s/%s) execute %s", job.Namespace, job.Name, scaleItem.GetFullName())
    
    // 設置狀態
	r.updateScalerState(scaleItem, job, newCondition(common.Scaling, scalingStartReason, msg))

	if err := r.updateObjectStatus(scaleItem, nil); err != nil {
		return err
	}
	return nil
}

5.5.5 updateScalerState

這時候會設置 common.Scaling。所以下次運行,會到 Scaling 分支。

func (r *TrainingJobReconciler) updateScalerState(scaleObj Scaler, trainingJob *kaiv1alpha1.TrainingJob, condition common.JobCondition) error {
    
	jobPhase := common.Scaling // 設置 common.Scaling。所以下次運行,會到 Scaling 分支
	currentJob := scaleObj.GetFullName()
	if condition.Type == common.ScaleSucceeded || condition.Type == common.ScaleFailed {
		jobPhase = common.JobRunning
		currentJob = ""
	}

	setCondition(trainingJob.GetJobStatus(), condition)
	updateStatusPhase(trainingJob.GetJobStatus(), jobPhase)
	updateTrainingJobCurrentScaler(trainingJob.GetJobStatus(), currentJob)

	setCondition(scaleObj.GetJobStatus(), condition)
	updateStatusPhase(scaleObj.GetJobStatus(), condition.Type)

	return nil
}

邏輯如下:

           1 Request("")
  K8S  +-------------------->  Reconcile  <------------------+
           2 ScaleOut CR           +                         |
  K8S  +-------------------->      |                         |
                                   |                         |
                                   v                         |
            +----------------------+---------------------+   |
            |                 ReconcileJobs              |   |
            |                      +                     |   |
            |                      |                     |   |
            |        +------------------------------+    |   |
            |     1  |             | 2            3 |    |   |
            |        v             v                v    |   |
            |  "", JobCreated   JobRunning      Scaling  |   |
            +--------+-------------+---------------------+   |
                     |             |                         |
                  1  |             | 2                       |
                     v             v                         |
             reconcileResource   reconcileJobRunning         |
                     +             +                         |
                  1  |             | 2                       |
                     |             |                         |
                     v             v                         |
+--------------------+----+      setTrainingJobScaler        |
| doSteps                 |        +                         |
|                         |        | 2                       |
|                         |        |                         |
|     WorkersCreated      |        v                         |
|                         |      updateScalerState           |
|                         |        +                         |
|     WorkersReady        |        |                         |
|                         |        | 2                       |
|                         |        v                         |
|     LauncherCreated     |      common.Scaling              |
|                         |        +                         |
|                         |        |                         |
|     JobRunning          |        | 2                       |
|                         |        |                         |
+-------------------------+        +-------------------------+


5.6 Scaling

5.6.1 executeScaling

依據 scale 的類型不同,進行不同擴展。

func (r *TrainingJobReconciler) executeScaling(job *kaiv1alpha1.TrainingJob) error {
	if err := r.syncLauncherState(job); err != nil {
		return err
	}

	if job.Status.CurrentScaler == "" {
		updateStatusPhase(job.GetJobStatus(), common.JobRunning)
		return nil
	}

	if isFinished(*job.GetJobStatus()) {
		return nil
	}

	scalerType, scalerName := getScalerName(job.Status.CurrentScaler)
    // 根據 in 還是 out 進行不同的處理
	if scalerType == "ScaleIn" {
		scaleIn, err := getScaleIn(scalerName, r)

		if scaleIn == nil || isScaleFinished(*scaleIn.GetJobStatus()) {
			finishTrainingScaler(job.GetJobStatus())
			return nil
		}

		oldStatus := scaleIn.Status.DeepCopy()
		defer r.updateObjectStatus(scaleIn, oldStatus)

        // 執行具體縮容操作
		if err = r.executeScaleIn(job, scaleIn); err != nil {
			return err
		}
	} else if scalerType == "ScaleOut" {
		scaleOut, err := getScaleOut(scalerName, r)

		if scaleOut == nil || isScaleFinished(*scaleOut.GetJobStatus()) {
			finishTrainingScaler(job.GetJobStatus())
			return nil
		}

		oldStatus := scaleOut.Status.DeepCopy()
		defer r.updateObjectStatus(scaleOut, oldStatus)

        // 執行具體擴容操作
		if err = r.executeScaleOut(job, scaleOut); err != nil {
		}
	}
	return nil
}

5.6.2 executeScaleOut

進行擴展。

  • 使用 setScaleOutWorkers 對 scaleOut.Status.AddPods 進行添加新 pods。
  • 使用 workersAfterScaler 得到 最終的 worker。
  • 使用 executeScaleScript 進行scale 操作。
func (r *TrainingJobReconciler) executeScaleOut(job *kaiv1alpha1.TrainingJob, scaleOut *kaiv1alpha1.ScaleOut) error {

  initializeJobStatus(scaleOut.GetJobStatus())

	if err := r.validateScaleOut(scaleOut); err != nil {
		r.updateScalerFailed(scaleOut, job, err.Error())
		return err
	}

	if err := r.setScaleOutWorkers(job, scaleOut); err != nil {
		return err
	}

	err := r.ScaleOutWorkers(job, scaleOut)
	if err != nil {
		msg := fmt.Sprintf("%s create scaleout workers failed, error: %v", scaleOut.GetFullName(), err)
		r.ScaleOutFailed(job, scaleOut, msg)
		return err
	}

	scaleOutWorkers, err := r.getScalerOutWorkers(job, scaleOut)

	workerStatuses, _ := r.workerReplicasStatus(scaleOut.GetJobStatus(), scaleOutWorkers)

	if workerStatuses.Active < *scaleOut.Spec.ToAdd.Count {
		if IsScaleOutTimeout(scaleOut) {
			msg := fmt.Sprintf("scaleout job %s execution timeout", scaleOut.GetFullName())
			r.ScaleOutFailed(job, scaleOut, msg)
		}
		return NewRequeueError(fmt.Errorf("wait for workers running"))
	}

	hostWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleOut)

	// execute scalein script
    // 執行scale腳本
	if err := r.executeScaleScript(job, scaleOut, hostWorkers); err != nil {
		msg := fmt.Sprintf("%s execute script failed, error: %v", scaleOut.GetFullName(), err)
		r.ScaleOutFailed(job, scaleOut, msg)
		return err
	} else {
		job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleOut)
		r.updateScalerSuccessd(scaleOut, job)
	}

	return nil
}

5.6.3 executeScaleScript

這時候調用 hostfileUpdateScript,更新 host file;

最終調用 executeOnLauncher執行腳本。

func (r *TrainingJobReconciler) executeScaleScript(trainingJob *kaiv1alpha1.TrainingJob, scaler Scaler, workers []string) error {
	if isScriptExecuted(*scaler.GetJobStatus()) {
		return nil
	}
	msg := fmt.Sprintf("trainingjob(%s/%s): execute script on launcher for %s", trainingJob.Namespace, trainingJob.Name, scaler.GetFullName())

	slots := getSlots(trainingJob)
	scriptSpec := scaler.GetScriptSpec()

	var script string
    // 得到腳本
	if scriptSpec.Script != "" {
		script = scalerScript(scriptSpec.GetTimeout(), scriptSpec.Env, scriptSpec.Script, scaler.GetPodNames(), slots)
	} else {
		hostfilePath := getHostfilePath(trainingJob)
		script = hostfileUpdateScript(hostfilePath, workers, slots)
	}

    // 執行腳本
	_, _, err := r.executeOnLauncher(trainingJob, script)

	updateJobConditions(scaler.GetJobStatus(), common.ScriptExecuted, "", msg)
	return nil
}

5.6.3.1 hostfileUpdateScript

得到最終的腳本string。

func hostfileUpdateScript(hostfile string, workers []string, slot int) string {
	return fmt.Sprintf(
		`echo '%s' > %s`, getHostfileContent(workers, slot), hostfile)
}

5.6.3.2 getHostfileContent

獲取host file內容

func getHostfileContent(workers []string, slot int) string {
	var buffer bytes.Buffer
	for _, worker := range workers {
		buffer.WriteString(fmt.Sprintf("%s:%d\n", worker, slot))
	}
	return buffer.String()
}

5.6.3.3 executeOnLauncher

在pod上執行

func (r *TrainingJobReconciler) executeOnLauncher(trainingJob *kaiv1alpha1.TrainingJob, script string) (string, string, error) {
	var err error
	var launcherPod *corev1.Pod
	if launcherPod, err = r.GetLauncherJob(trainingJob); err != nil {
	}

	if launcherPod != nil {
		stdOut, stdErr, err := kubectlOnPod(launcherPod, script)
		return stdOut, stdErr, nil
	}
	return "", "", nil
}


5.6.3.4 kubectlOnPod

拉動 worker。

func kubectlOnPod(pod *corev1.Pod, cmd string) (string, string, error) {
	cmds := []string{
		"/bin/sh",
		"-c",
		cmd,
	}
	stdout, stderr, err := util.ExecCommandInContainerWithFullOutput(pod.Name, pod.Spec.Containers[0].Name, pod.Namespace, cmds)
	if err != nil {
		return stdout, stderr, err
	}
	return stdout, stderr, nil
}

邏輯如下:

           1 Request("")
  K8S  +-------------------->  Reconcile  <------------------+
           2 ScaleOut CR           +                         |
  K8S  +-------------------->      |                         |
                                   |                         |
                                   v                         |
            +----------------------+---------------------+   |
            |                 ReconcileJobs              |   |
            |                      +                     |   |
            |                      |                     |   |
            |        +------------------------------+    |   |
            |     1  |             | 2            3 |    |   |
            |        v             v                v    |   |   3
            |  "", JobCreated   JobRunning      Scaling +----------->  executeScaling
            +--------+-------------+---------------------+   |              +
                     |             |                         |              |
                  1  |             | 2                       |              | 3
                     v             v                         |              v
             reconcileResource   reconcileJobRunning         |        executeScaleOut
                     +             +                         |              +
                  1  |             | 2                       |              |
                     |             |                         |              | 3
                     v             v                         |              v
+--------------------+----+      setTrainingJobScaler        |      executeScaleScript
| doSteps                 |        +                         |              +
|                         |        | 2                       |              |
|                         |        |                         |              | 3
|     WorkersCreated      |        v                         |              v
|                         |      updateScalerState           |     hostfileUpdateScript
|                         |        +                         |              +
|     WorkersReady        |        |                         |              | 3
|                         |        | 2                       |              |
|                         |        v                         |              v
|     LauncherCreated     |      common.Scaling              |       executeOnLauncher
|                         |        +                         |              +
|                         |        |                         |              |
|     JobRunning          |        | 2                       |              | 3
|                         |        |                         |              v
+-------------------------+        +-------------------------+         kubectlOnPod


0x06 ScaleIn

6.1 思路

ScaleIn 任務 CR如下:

5.png

執行縮容時,可以通過 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定縮容的 worker。

通過 count 配置縮容的數量,則通過 index 計算由高到低縮容 Worker。

apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleIn
metadata:
  name: scalein-workers
spec:
  selector:
    name: elastic-training
  toDelete:
    count: 1

如果想要縮容特定的 Worker,可以配置 podNames:

apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleIn
metadata:
  name: scalein-workers
spec:
  selector:
    name: elastic-training
  toDelete:
    podNames:
    - elastic-training-worker-1

運行一個縮容示例,指定數量縮容 1 個 worker:

kubectl create -f examples/scale_in_count.yaml

6.2 Reconcile

當下發一個 scaleInCR,Controller 觸發 Reconcile。主要就是調用 setScalingOwner。

func (r *ScaleInReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	//silog := r.Log.WithValues("scalein", req.NamespacedName)
	scaleIn, err := getScaleIn(req.NamespacedName, r.Client)

	if isScaleFinished(*scaleIn.GetJobStatus()) {
		return NoRequeue()
	}

    // 以上基本都是各種校驗
	return setScalingOwner(r, scaleIn, r.PollInterval)
}

6.3 setScalingOwner

setScalingOwner 是關鍵之一。

這里主要是處理當 ScaleIn CR 沒有設置 OwnerReferences 的情況,就設置一個。

邏輯是 根據 ScaleIn CR 中的 Selector 字段,找到 Scaler 對應的 TrainingJob,設置到 CR 的 OwnerReferences 上。

下面移除各種錯誤檢查代碼。

func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) {
	ownerRefs := scaler.GetOwnerReferences()
	if len(ownerRefs) == 0 {
		trainingJob := &kaiv1alpha1.TrainingJob{}
		nsn := types.NamespacedName{}
		nsn.Namespace = scaler.GetNamespace()
		nsn.Name = scaler.GetSelector().Name
		err := r.Get(context.Background(), nsn, trainingJob)

		gvk := kaiv1alpha1.SchemeGroupVersionKind
		ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}))
		scaler.SetOwnerReferences(ownerRefs)

		initializeJobStatus(scaler.GetJobStatus())
		updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg)
		err = r.Status().Update(context.Background(), scaler)
		err = r.Update(context.Background(), scaler)
	}
	return NoRequeue()
}

6.4 executeScaleIn

JobRunning 狀態處理與 ScaleOut類似,所以略過,直接看處理executeScaleIn。

執行縮容時,可以通過 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定縮容的 worker。

通過 count 配置縮容的數量,則通過 index 計算由高到低縮容 Worker。

具體結合代碼就是:

setsSaleInToDelete 指定哪些需要刪除;

executeScaleScript 執行腳本;

DeleteWorkers 刪除 worker;

func (r *TrainingJobReconciler) executeScaleIn(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
	if scaleIn.DeletionTimestamp != nil || isScaleFinished(*scaleIn.GetJobStatus()) {
		logger.Info("reconcile cancelled, scalein does not need to do reconcile or has been deleted")
		return nil
	}

	initializeJobStatus(scaleIn.GetJobStatus())

	//TODO: Validate the scalein count for minSize
	err := r.setsSaleInToDelete(job, scaleIn)

	currentWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleIn)

	// execute scalein script
	if err := r.executeScaleScript(job, scaleIn, currentWorkers); err != nil {
		msg := fmt.Sprintf("%s execute script failed, error: %v", scaleIn.GetFullName(), err)
		r.updateScalerFailed(scaleIn, job, msg)
		return nil
	}

	toDeleteWorkers := scaleIn.GetPodNames()
	remainWorkers := false
	if scaleIn.Spec.Script == "" {
		if shutdownWorkers, err := r.checkWorkerShutdown(job, toDeleteWorkers); err != nil {
			return err
		} else {
			if len(toDeleteWorkers) != len(shutdownWorkers) {
				remainWorkers = true
				toDeleteWorkers = shutdownWorkers
			}
		}
	}
	if err := r.DeleteWorkers(job, toDeleteWorkers); err != nil {
		msg := fmt.Sprintf("%s delete resource failed, error: %v", scaleIn.GetFullName(), err)
		r.updateScalerFailed(scaleIn, job, msg)
		return nil
	}

	// wait pods deleted
	deleted, _ := r.isWorkersDeleted(job.Namespace, scaleIn.GetPodNames())
	if deleted {
		job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleIn)
		job.Status.CurrentWorkers = currentWorkers
		r.updateScalerSuccessd(scaleIn, job)
		return nil
	}

	if remainWorkers {
		msg := "wait for workers process shutdown"
		logger.Info(msg)
		return NewRequeueError(fmt.Errorf(msg))
	}

	return nil
}

6.5 setsSaleInToDelete

通過 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定縮容的 worker。

func (r *TrainingJobReconciler) setsSaleInToDelete(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
	podNames := scaleIn.Status.ToDeletePods
	if len(podNames) != 0 {
		return /*filterPodNames(workers, podNames, false), */ nil
	}
	workers, err := r.GetWorkerPods(job)

	toDelete := scaleIn.Spec.ToDelete

	if toDelete.PodNames != nil {
		workers = filterPodNames(workers, toDelete.PodNames, false)
	} else if toDelete.Count > 0 {
		if toDelete.Count < len(workers) {
			allPodNames := getSortPodNames(job.Name, workers)
			deletePodNames := allPodNames[len(workers)-toDelete.Count:]
			workers = filterPodNames(workers, deletePodNames, false)
		} 
	} 
  
	for _, worker := range workers {
		scaleIn.Status.ToDeletePods = append(scaleIn.Status.ToDeletePods, worker.Name)
	}

	return nil
}


6.6 DeleteWorkers

具體刪除worker service 和 pods。

func (r *TrainingJobReconciler) DeleteWorkers(trainingJob *kaiv1alpha1.TrainingJob, workers []string) error {
	if err := r.DeleteWorkerServices(trainingJob, workers); err != nil {
		return fmt.Errorf("delete services failed: %++v", err)
	}

	if err := r.DeleteWorkerPods(trainingJob, workers); err != nil {
		return fmt.Errorf("delete pods failed: %++v", err)
	}
	return nil
}

6.7 DeleteWorkerPods

刪除pods。

func (r *TrainingJobReconciler) DeleteWorkerPods(job *kaiv1alpha1.TrainingJob, pods []string) error {
	workerPods, err := r.GetWorkerPods(job)

	if pods != nil {
		workerPods = filterPodNames(workerPods, pods, false)
	}
	for _, pod := range workerPods {
		deleteOptions := &client.DeleteOptions{GracePeriodSeconds: utilpointer.Int64Ptr(0)}
		if err := r.Delete(context.Background(), &pod, deleteOptions); err != nil && !errors.IsNotFound(err) {
			r.recorder.Eventf(job, corev1.EventTypeWarning, trainingJobFailedReason, "Error deleting worker %s: %v", pod.Name, err)
			//return err
		}
		r.recorder.Eventf(job, corev1.EventTypeNormal, trainingJobSucceededReason, "Deleted pod %s", pod.Name)
	}
	return nil
}

具體邏輯如下:

      1 Request("")
 K8S-----------------> Reconcile  <------------------+
      2 ScaleOut CR        +                         |
 K8S----------------->     |                         |
                           |                         |
                           v                         |
    +----------------------+---------------------+   |
    |                 ReconcileJobs              |   |
    |                      +                     |   |
    |                      |                     |   |
    |        +------------------------------+    |   |
    |     1  |             | 2            3 |    |   |
    |        v             v                v    |   | 3
    |  "", JobCreated   JobRunning      Scaling +---------> executeScaling -----+
    +--------+-------------+---------------------+   |          +               |
             |             |                         |          |               |
          1  |             | 2                       |          | 3             | 4
             v             v                         |          v               v
     reconcileResource   reconcileJobRunning         |    executeScaleOut  executeScaleIn
             +             +                         |          +               +
          1  |             | 2                       |          |               |
             |             |                         |          | 3             | 4
             v             v                         |          v               v
+------------+--------+  setTrainingJobScaler        | executeScaleScript executeScaleScript
| doSteps             |    +                         |          +               +
|                     |    | 2                       |          |               |
|                     |    |                         |          | 3             | 4
|    WorkersCreated   |    v                         |          v               v
|                     |  updateScalerState           | hostfileUpdateScript  DeleteWorkers
|                     |    +                         |          +               +
|    WorkersReady     |    |                         |          | 3             | 4
|                     |    | 2                       |          |               |
|                     |    v                         |          v               v
|    LauncherCreated  |  common.Scaling              |   executeOnLauncher  DeleteWorkerPods
|                     |    +                         |          +               +
|                     |    |                         |          |               |
|    JobRunning       |    | 2                       |          | 3             | 4
|                     |    |                         |          v               v
+---------------------+    +-------------------------+     kubectlOnPod      Delete


至此,Horovod系列分析完畢,下一篇開始分析參數服務器,敬請期待。

0xEE 個人信息

★★★★★★關於生活和技術的思考★★★★★★

微信公眾賬號:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,敬請關注。

在這里插入圖片描述

0xFF 參考

ElasticDL 分析

在 Kubernetes 上彈性深度學習訓練利器 -- Elastic Training Operator


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM