Pod 啟動流程詳解


Pod 啟動流程詳解

1. 概述

在 Kubernetes 集群中,每個 Node 節點上都會啟動一個 Kubelet 服務進程,該進程用於處理 Master 下發到本節點的 Pod 並管理其生命周期。換句話說,Pod 的創建、刪除、更新等操作,都是由 kubelet 進行管理的,它將處理 Pod 與 Container Runtime 之間所有的轉換邏輯,包括掛載卷、容器日志、垃圾回收等。

kubelet 可以通過以下幾種方式獲取本節點上需要管理的 Pod 的信息:

  • file:kubelet啟動參數“--config”指定的配置文件目錄下的文件(默認目錄為“/etc/kubernetes/manifests/”)。
  • http:通過“--manifest-url”參數設置。
  • api server:kubelet通過API Server監聽etcd目錄,同步Pod列表。

其中,以配置文件方式http方式創建的Pod叫做靜態podstatic pod)。簡單介紹如下:

靜態Pod是由kubelet進行管理的僅存在於特定Node上的Pod。它們不能通過API Server進行管理,無法與ReplicationController、Deployment 或者DaemonSet進行關聯,並且kubelet無法對它們進行健康檢查。靜態 Pod總是由kubelet創建的,並且總在kubelet所在的Node上運行。

本文所講的是通過 api server 方式創建的 pod。

一個 pod 的創建要從客戶端鍵入kubectl create -f xxx.yaml開始,這里假定:

  1. 客戶端發送的HTTP請求已經通過校驗(API Server的身份認證、鑒權和准入控制);
  2. 已經把資源對象信息持久化到 etcd 中;
  3. master節點的調度器已經把pod調度到合適的節點上;

接下的步驟就涉及到在工作節點上真正創建 pod,這也是本文的重點,這部分由 kubelet 組件負責。這部分內容結合這篇文章一同消化。

2. 源碼分析

1. kubelet 循環控制(syncLoop)

syncLoop()函數是同步Pod狀態的主函數,是一個永遠都不會退出的無限循環函數。

syncLoop 中首先定義了一個 syncTicker 和 housekeepingTicker,即使沒有需要更新的 pod 配置,kubelet 也會定時去做同步 pod 和清理 pod 的工作。然后在 for 循環中一直調用 syncLoopIteration,如果在每次循環過程中出現比較嚴重的錯誤,kubelet 會記錄到 runtimeState 中,遇到錯誤就等待 5 秒,然后繼續循環。

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.Info("Starting kubelet main sync loop.")
	// The syncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
  // syncTicker 每1秒檢測一次是否有需要同步的 pod workers
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
  // 每2秒檢測一次是否有需要清理的 pod
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
  // pod 的生命周期變化
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	// Responsible for checking limits in resolv.conf
	// The limits do not have anything to do with individual pods
	// Since this is called in syncLoop, we don't need to call it anywhere else
	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
		kl.dnsConfigurer.CheckLimitsForResolvConf()
	}

	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.Errorf("skipping pod synchronization - %v", err)
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

2. 監聽 Pod 變化(syncLoopIteration)

syncLoopIteration() 會對多個 channel 進行遍歷,一旦發現某個 channel 有消息就交給對應的 handler 去處理。它會從以下 channel 中獲取消息:

  • configCh:該信息源由 kubeDeps 對象中的 PodConfig 子模塊提供,該模塊將同時 watch 3 個不同來源的 pod 信息的變化(file,http,apiserver),一旦某個來源的 pod 信息發生了更新(創建/更新/刪除),這個 channel 中就會出現被更新的 pod 信息和更新的具體操作。
  • syncCh:定時器管道,每隔一秒去同步最新保存的 pod 狀態
  • houseKeepingCh:負責 pod 清理工作
  • plegCh:該信息源由 kubelet 對象中的 pleg 子模塊提供,該模塊主要用於周期性地向 container runtime 查詢當前所有容器的狀態,如果狀態發生變化,則這個 channel 產生事件。
  • livenessManager.Updates():健康檢查發現某個 pod 不可用,kubelet 將根據 Pod 的 restartPolicy 自動執行正確的操作
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1.  configCh:       a channel to read config events from
// 2.  handler:        the SyncHandler to dispatch pods to
// 3.  syncCh:         a channel to read periodic sync events from
// 4.  houseKeepingCh: a channel to read housekeeping events from
// 5.  plegCh:         a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
//             handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
//                     containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
  // 1.configCh: Update from a config source; dispatch it to the right handler callback. 
	case u, open := <-configCh:
		...
		switch u.Op {
		case kubetypes.ADD:
			...
		case kubetypes.UPDATE:
			...
		case kubetypes.REMOVE:
			...
		case kubetypes.RECONCILE:
			...
		case kubetypes.DELETE:
			...
		case kubetypes.RESTORE:
			...
		case kubetypes.SET:
			...
		}
		...
  // 2.plegCh  
	case e := <-plegCh:
		...
  // 3.syncCh: Sync pods waiting for sync
	case <-syncCh:
		....
  // 4.livenessManager  
	case update := <-kl.livenessManager.Updates():
		...
  // 5.housekeepingCh
	case <-housekeepingCh:
		...
	}
	return true
}

3. 處理新增 Pod(HandlePodAddtions)

對於事件中的每個Pod,HandlePodAddtions()會執行以下操作:

  1. 把所有的Pod根據其創建時間進行排序,保證最早創建的Pod會最先被處理;
  2. 遍歷Pod列表,把Pod逐個加入到podManager中。podManager 子模塊負責管理這台機器上的 pod 的信息、pod 和 mirrorPod 之間的對應關系等等。所有被管理的 pod 都要出現在里面,如果 podManager 中找不到某個 pod,就認為這個 pod 被刪除了。
  3. 判斷是否是 mirror pod(即 static pod),如果是 mirror pod 則調用其單獨的方法;
  4. 驗證 pod 是否能在該節點運行,如果不可以直接拒絕;
  5. 通過 dispatchWork 把創建 pod 的工作下發給 podWorkers 子模塊做異步處理;
  6. 在 probeManager 中添加 pod,如果 pod 中定義了 readiness 和 liveness 健康檢查,啟動 goroutine 定期進行檢測。
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
  // 1.把所有的 Pod 根據其創建時間進行排序,保證最早創建的 Pod 會最先被處理;
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
		// Responsible for checking limits in resolv.conf
		if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
			kl.dnsConfigurer.CheckLimitsForResolvConf()
		}
		existingPods := kl.podManager.GetPods()
    // 2. 把 pod 加入到 podManager 中
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		kl.podManager.AddPod(pod)

    // 3. 判斷是否是 mirror pod(即 static pod)
		if kubepod.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}

		if !kl.podIsTerminated(pod) {
			// Only go through the admission process if the pod is not
			// terminated.

			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			activePods := kl.filterOutTerminatedPods(existingPods)

      // 通過 canAdmitPod 方法校驗Pod能否在該計算節點創建(如:磁盤空間)
			// Check if we can admit the pod; if not, reject it.
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    // 通過 dispatchWork 把創建 pod 的工作下發給 podWorkers 子模塊做異步處理
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    // 在 probeManager 中添加 pod,如果 pod 中定義了 readiness 和 liveness 健康檢查,啟動 goroutine 定期進行檢測。
		kl.probeManager.AddPod(pod)
	}
}

static pod 是由 kubelet 直接管理的,k8s apiserver 並不會感知到 static pod 的存在,當然也不會和任何一個 rs 關聯上,完全是由 kubelet 進程來監管,並在它異常時負責重啟。Kubelet 會通過 apiserver 為每一個 static pod 創建一個對應的 mirror pod,如此一來就可以通過 kubectl 命令查看對應的 pod,並且可以通過 kubectl logs 命令直接查看到static pod 的日志信息。

4. 下發任務(dispatchWork)

dispatchWorker() 的主要作用是把某個對 Pod 的操作(創建/更新/刪除)下發給 podWorkers。

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	if kl.podIsTerminated(pod) {
		if pod.DeletionTimestamp != nil {
			// If the pod is in a terminated state, there is no pod worker to
			// handle the work item. Check if the DeletionTimestamp has been
			// set, and force a status update to trigger a pod deletion request
			// to the apiserver.
			kl.statusManager.TerminatePod(pod)
		}
		return
	}
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
			if err != nil {
				metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
				metrics.DeprecatedPodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
			}
		},
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

5. 更新事件的channel(UpdatePod)

podWorkers 子模塊主要的作用就是處理針對每一個的 Pod 的更新事件,比如 Pod 的創建,刪除,更新。而 podWorkers 采取的基本思路是:為每一個 Pod 都單獨創建一個 goroutine 和更新事件的 channel,goroutine 會阻塞式的等待 channel 中的事件,並且對獲取的事件進行處理。而 podWorkers 對象自身則主要負責對更新事件進行下發。

podWorkers.UpdatePod() 方法如下:

// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
	pod := options.Pod
	uid := pod.UID
	var podUpdates chan UpdatePodOptions
	var exists bool

	p.podLock.Lock()
	defer p.podLock.Unlock()
  
	if podUpdates, exists = p.podUpdates[uid]; !exists {
		// We need to have a buffer here, because checkForUpdates() method that
		// puts an update into channel is called from the same goroutine where
		// the channel is consumed. However, it is guaranteed that in such case
		// the channel is empty, so buffer of size 1 is enough.
		// 創建channel
    podUpdates = make(chan UpdatePodOptions, 1)
		p.podUpdates[uid] = podUpdates

		// Creating a new pod worker either means this is a new pod, or that the
		// kubelet just restarted. In either case the kubelet is willing to believe
		// the status of the pod for the first pod worker sync. See corresponding
		// comment in syncPod.
    // 啟動一個goroutine
		go func() {
			defer runtime.HandleCrash()
			p.managePodLoop(podUpdates)
		}()
	}
  
  // 下發更新事件
	if !p.isWorking[pod.UID] {
		p.isWorking[pod.UID] = true
		podUpdates <- *options
	} else {
		// if a request to kill a pod is pending, we do not let anything overwrite that request.
		update, found := p.lastUndeliveredWorkUpdate[pod.UID]
		if !found || update.UpdateType != kubetypes.SyncPodKill {
			p.lastUndeliveredWorkUpdate[pod.UID] = *options
		}
	}
}

6. 調用 syncPodFn 方法同步 pod(managePodLoop)

managePodLoop() 調用 syncPodFn 方法去同步 pod,syncPodFn 實際上就是 kubelet.SyncPod(接下來馬上講到)。在完成這次 sync 動作之后,會調用 wrapUp 函數。

這個函數將會做幾件事情:

  • 將這個 pod 信息插入 kubelet 的 workQueue 隊列中,等待下一次周期性的對這個 pod 的狀態進行 sync
  • 將在這次 sync 期間堆積的沒有能夠來得及處理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即處理。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	var lastSyncTime time.Time
	for update := range podUpdates {
		err := func() error {
			podUID := update.Pod.UID
			// This is a blocking call that would return only if the cache
			// has an entry for the pod that is newer than minRuntimeCache
			// Time. This ensures the worker doesn't start syncing until
			// after the cache is at least newer than the finished time of
			// the previous sync.
			status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
			if err != nil {
				...
			}
			err = p.syncPodFn(syncPodOptions{
				mirrorPod:      update.MirrorPod,
				pod:            update.Pod,
				podStatus:      status,
				killPodOptions: update.KillPodOptions,
				updateType:     update.UpdateType,
			})
			lastSyncTime = time.Now()
			return err
		}()
		
    // notify the call-back function if the operation succeeded or not
		if update.OnCompleteFunc != nil {
			update.OnCompleteFunc(err)
		}
		if err != nil {
			...
		}
		p.wrapUp(update.Pod.UID, err)
	}
}

7. 完成創建容器前的准備工作【核心】(syncPod)

syncPod()方法中,主要完成以下幾件事情:

  1. 如果狀態是刪除pod,立即執行並返回;如果pod的狀態是正在被創建,則記錄pod worker的啟動延時;

  2. 調用generateAPIPodStatus()創建PodStatus對象,並同步 PodStatus 到 kubelet.statusManager(狀態管理器)

    • Pod 的 status 定義在PodStatus對象中,其中有一個 phase 字段,它是 Pod 在其生命周期中的簡單宏觀概述。
  3. 檢查 pod 是否能運行在本節點;

  4. 在 statusManager 中更新 pod 的狀態;

  5. kill 掉不應該被運行的pod;(Kill the pod if it should not be running)

    • 哪些pod屬於不應該運行的呢?根據判斷條件,是這3類:!runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed。比如,在第3步中會檢查該pod能否運行在該節點上,如果不能,則kill掉這個pod
  6. 如果 Kubelet 啟動時指定了 --cgroups-per-qos 參數,Kubelet 就會為該 Pod 創建 cgroup 並設置對應的資源限制。

  7. 如果是 static Pod,就創建或者更新對應的 mirrorPod

  8. 為 pod 創建數據目錄,通常包括:

    • Pod 目錄 (通常是 /var/run/kubelet/pods/<podID>);
    • Pod 的掛載卷目錄 (<podDir>/volumes);
    • Pod 的插件目錄 (<podDir>/plugins)。
  9. 卷管理器(Volume manager)會掛載 Spec.Volumes 中定義的相關數據卷,然后等待掛載成功;

  10. 為 pod 拉取 secrets;

  11. 通過調用 container runtime 的 SyncPod 方法,真正實現容器的啟動。也就是說,前面所做的工作只是容器啟動的准備工作。

可以看到該方法是創建 pod 實體(即容器)之前需要完成的准備工作。

// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
//   start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
//   already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
	// pull out the required options
	pod := o.pod
	mirrorPod := o.mirrorPod
	podStatus := o.podStatus
	updateType := o.updateType

	// if we want to kill a pod, do it now!
  // 判斷是否刪除 pod
	if updateType == kubetypes.SyncPodKill {
		...
	}

	...

	// Record pod worker start latency if being created
	// TODO: make pod workers record their own latencies
  // 判斷是否創建 pod
  // 如果 Pod 正在創建, 就會暴露一些指標(metrics),可以用於在 Prometheus 中追蹤 Pod 啟動延時;
	if updateType == kubetypes.SyncPodCreate {
		if !firstSeenTime.IsZero() {
			// This is the first time we are syncing the pod. Record the latency
			// since kubelet first saw the pod if firstSeenTime is set.
			metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
			metrics.DeprecatedPodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
		} else {
			klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
		}
	}

	// Generate final API pod status with pod and status manager status
  // 生成一個 APIPodStatus 對象,表示 Pod 當前階段的狀態。
	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
	podStatus.IP = apiPodStatus.PodIP

	// Record the time it takes for the pod to become running.
	existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
	if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
		!firstSeenTime.IsZero() {
		metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
		metrics.DeprecatedPodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
	}

  // 檢查 pod 是否能運行在本節點
	runnable := kl.canRunPod(pod)
	if !runnable.Admit {
		...
	}

	// Update status in the status manager
  // 更新pod狀態,statusManage的任務是通過 kube-apiserver 異步更新 etcd 中的記錄;
	kl.statusManager.SetPodStatus(pod, apiPodStatus)

	// Kill pod if it should not be running
  // 如果pod處於非running狀態,則直接kill掉
	if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
		...
	}

	// If the network plugin is not ready, only start the pod if it uses the host network
  // 加載網絡插件
	if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
		return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
	}

	// Create Cgroups for the pod and apply resource parameters
	// to them if cgroups-per-qos flag is enabled.
  // 如果 Kubelet 啟動時指定了 --cgroups-per-qos 參數,
  // Kubelet 就會為該 Pod 創建 cgroup 並設置對應的資源限制。
  // 這是為了更好的 Pod 服務質量(QoS)
	pcm := kl.containerManager.NewPodContainerManager()
	// If pod has already been terminated then we need not create
	// or update the pod's cgroup
	if !kl.podIsTerminated(pod) {
		...
		// Create and Update pod's Cgroups
		// 創建並更新pod的cgroups
		if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
			...
		}
	}

	// Create Mirror Pod for Static Pod if it doesn't already exist
	if kubepod.IsStaticPod(pod) {
		...
	}

	// Make data directories for the pod
  // 創建數據目錄
	if err := kl.makePodDataDirs(pod); err != nil {
		...
	}

	// Volume manager will not mount volumes for terminated pods
  // 掛載volume
	if !kl.podIsTerminated(pod) {
		// Wait for volumes to attach/mount
		...
		}
	}

	// Fetch the pull secrets for the pod
  // 獲取secret信息
	pullSecrets := kl.getPullSecretsForPod(pod)

	// Call the container runtime's SyncPod callback
  // 調用containerRuntime的SyncPod方法開始創建容器
 	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
	kl.reasonCache.Update(pod.UID, result)
	if err := result.Error(); err != nil {
		...
	}

	return nil
}

8. 創建容器

kuberuntime 子模塊的 SyncPod() 函數真正負責pod實體的創建,SyncPod() 通過執行以下幾件事情來同步正在運行的pod的狀態至desired狀態:

  1. Compute sandbox and container changes.
  2. Kill pod sandbox if necessary.
  3. Kill any containers that should not be running.
  4. Create sandbox if necessary.
  5. Create init containers.
  6. Create normal containers.(即啟動工作負載容器)
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(pod, podStatus)
	klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
	if podContainerChanges.CreateSandbox {
		ref, err := ref.GetReference(legacyscheme.Scheme, pod)
		if err != nil {
			klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
		}
		if podContainerChanges.SandboxID != "" {
			m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
		} else {
			klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
		}
	}

	// Step 2: Kill the pod if the sandbox has changed.
	if podContainerChanges.KillPod {
		if podContainerChanges.CreateSandbox {
			klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
		} else {
			klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
		}

		killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
		result.AddPodSyncResult(killResult)
		if killResult.Error() != nil {
			klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
			return
		}

		if podContainerChanges.CreateSandbox {
			m.purgeInitContainers(pod, podStatus)
		}
	} else {
		// Step 3: kill any running containers in this pod which are not to keep.
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
			klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
			killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
			result.AddSyncResult(killContainerResult)
			if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
				killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
				klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
				return
			}
		}
	}

	// Keep terminated init containers fairly aggressively controlled
	// This is an optimization because container removals are typically handled
	// by container garbage collector.
	m.pruneInitContainersBeforeStart(pod, podStatus)

	// We pass the value of the podIP down to generatePodSandboxConfig and
	// generateContainerConfig, which in turn passes it to various other
	// functions, in order to facilitate functionality that requires this
	// value (hosts file and downward API) and avoid races determining
	// the pod IP in cases where a container requires restart but the
	// podIP isn't in the status manager yet.
	//
	// We default to the IP in the passed-in pod status, and overwrite it if the
	// sandbox needs to be (re)started.
	podIP := ""
	if podStatus != nil {
		podIP = podStatus.IP
	}

	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		...
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
		if err != nil {
			...
		}
		podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
		if err != nil {
			...
		}

		// If we ever allow updating a pod from non-host-network to
		// host-network, we may use a stale IP.
    // 如果 pod 網絡是 host 模式,容器也相同;其他情況下,容器會使用 None 網絡模式,讓 kubelet 的網絡插件自己進行網絡配置
		if !kubecontainer.IsHostNetworkPod(pod) {
			// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
			podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
			klog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
		}
	}

	// Get podSandboxConfig for containers to start.
	configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
	result.AddSyncResult(configPodSandboxResult)
  // 獲取PodSandbox的配置,如metadata,clusterDNS,容器的端口映射等
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
	if err != nil {
		...
	}

	// Step 5: start the init container.
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
		...
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
			...
		}
    ...
	}

	// Step 6: start containers in podContainerChanges.ContainersToStart.
  // 即啟動業務容器
	for _, idx := range podContainerChanges.ContainersToStart {
		container := &pod.Spec.Containers[idx]
		...
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
			...
		}
	}

	return
}

9. 啟動容器

最后由 startContainer() 來啟動容器,主要有以下幾個步驟:

  1. 拉取容器的鏡像。如果是私有倉庫的鏡像,就會使用 PodSpec 中指定的 imagePullSecrets 來拉取該鏡像;
  2. 通過 CRI 創建容器。 Kubelet 使用 PodSpec 中的信息填充了一個 ContainerConfig 數據結構(在其中定義了 command, image, labels, mounts, devices, environment variables 等),然后通過 protobufs 發送給 CRI。 對於 Docker 來說,它會將這些信息反序列化並填充到自己的配置信息中,然后再發送給 Dockerd 守護進程。在這個過程中,它會將一些元數據(例如容器類型,日志路徑,sandbox ID 等)添加到容器中;
  3. 然后 Kubelet 將容器注冊到 CPU 管理器,它通過使用 UpdateContainerResources CRI 方法給容器分配給本地節點上的 CPU 資源;
  4. 最后容器真正地啟動
  5. 如果 Pod 中包含 Container Lifecycle Hooks,容器啟動之后就會運行這些 Hooks。 Hook 的類型包括兩種:Exec(執行一段命令) 和 HTTP(發送HTTP請求)。如果 PostStart Hook 啟動的時間過長、掛起或者失敗,容器將永遠不會變成 Running 狀態。
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
	// Step 1: pull the image.
  // 檢查鏡像是否存在,不存在則到 Docker Registry 或是 Private Registry 拉取鏡像。
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
	if err != nil {
		...
	}

	// Step 2: create the container.
	ref, err := kubecontainer.GenerateContainerRef(pod, container)
	if err != nil {
		...
	}
  
  // For a new container, the RestartCount should be 0
  // 設置 RestartCount
	restartCount := 0
	containerStatus := podStatus.FindContainerStatusByName(container.Name)
	if containerStatus != nil {
		restartCount = containerStatus.RestartCount + 1
	}

  // 生成容器的配置信息
	containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
	...

  // 調用 CRI 調用容器運行時創建容器
	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
	if err != nil {
		...
	}
	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
	if err != nil {
		...
	}
	...

	// Step 3: start the container.
  // 真正的啟動容器
	err = m.runtimeService.StartContainer(containerID)
	if err != nil {
		...
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))

	// Symlink container logs to the legacy container log location for cluster logging
	// support.
	// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
	containerMeta := containerConfig.GetMetadata()
	sandboxMeta := podSandboxConfig.GetMetadata()
	legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
		sandboxMeta.Namespace)
	containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
	// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
	// Because if containerLog path does not exist, only dandling legacySymlink is created.
	// This dangling legacySymlink is later removed by container gc, so it does not make sense
	// to create it in the first place. it happens when journald logging driver is used with docker.
	if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
		if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
			klog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
				legacySymlink, containerID, containerLog, err)
		}
	}

	// Step 4: execute the post start hook.
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		kubeContainerID := kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}
    // runner.Run 這個方法的主要作用就是在業務容器起來的時候,
    // 首先會執行一個 container hook(PostStart 和 PreStop),做一些預處理工作。
    // 只有 container hook 執行成功才會運行具體的業務服務,否則容器異常。
		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
		if handlerErr != nil {
		...
    }
	}

	return "", nil
}

Pod 創建流程:

kubelet_pod_create2

3. 總結

本文分析了新建 pod 的流程,當一個 pod 完成調度,與一個 node 綁定起來之后,這個 pod 就會觸發 kubelet 在循環控制里注冊的 handler,上圖中的 HandlePods 部分。此時,通過檢查 pod 在 kubelet 內存中的狀態,kubelet 就能判斷出這是一個新調度過來的 pod,從而觸發 Handler 里的 ADD 事件對應的邏輯處理。然后 kubelet 會為這個 pod 生成對應的 podStatus,接着檢查 pod 所聲明的 volume 是不是准備好了,然后調用下層的容器運行時。如果是 UPDATE 事件的話,kubelet 就會根據 pod 對象具體的變更情況,調用下層的容器運行時進行容器的重建。


參考:

  1. http://likakuli.com/post/2019/08/05/pod_create/
  2. https://www.cnblogs.com/kkbill/p/12694537.html#pod-sync


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM