kubelet源碼分析——關閉Pod


上一篇說到kublet如何啟動一個pod,本篇講述如何關閉一個Pod,引用一段來自官方文檔介紹pod的生命周期的話

  1. 你使用 kubectl 工具手動刪除某個特定的 Pod,而該 Pod 的體面終止限期是默認值(30 秒)。
  2. API 服務器中的 Pod 對象被更新,記錄涵蓋體面終止限期在內 Pod 的最終死期,超出所計算時間點則認為 Pod 已死(dead)。 如果你使用 kubectl describe 來查驗你正在刪除的 Pod,該 Pod 會顯示為 "Terminating" (正在終止)。 在 Pod 運行所在的節點上:kubelet 一旦看到 Pod 被標記為正在終止(已經設置了體面終止限期),kubelet 即開始本地的 Pod 關閉過程。
    1. 如果 Pod 中的容器之一定義了 preStop 回調, kubelet 開始在容器內運行該回調邏輯。如果超出體面終止限期時,preStop 回調邏輯 仍在運行,kubelet 會請求給予該 Pod 的寬限期一次性增加 2 秒鍾。

    說明: 如果 preStop 回調所需要的時間長於默認的體面終止限期,你必須修改 terminationGracePeriodSeconds 屬性值來使其正常工作。

    1. kubelet 接下來觸發容器運行時發送 TERM 信號給每個容器中的進程 1。

    說明: Pod 中的容器會在不同時刻收到 TERM 信號,接收順序也是不確定的。 如果關閉的順序很重要,可以考慮使用 preStop 回調邏輯來協調。

  3. 與此同時,kubelet 啟動體面關閉邏輯,控制面會將 Pod 從對應的端點列表(以及端點切片列表, 如果啟用了的話)中移除,過濾條件是 Pod 被對應的 服務以某 選擇算符選定。 ReplicaSets和其他工作負載資源 不再將關閉進程中的 Pod 視為合法的、能夠提供服務的副本。關閉動作很慢的 Pod 也無法繼續處理請求數據,因為負載均衡器(例如服務代理)已經在終止寬限期開始的時候 將其從端點列表中移除。
  4. 超出終止寬限期限時,kubelet 會觸發強制關閉過程。容器運行時會向 Pod 中所有容器內 仍在運行的進程發送 SIGKILL 信號。 kubelet 也會清理隱藏的 pause 容器,如果容器運行時使用了這種容器的話。
  5. kubelet 觸發強制從 API 服務器上刪除 Pod 對象的邏輯,並將體面終止限期設置為 0 (這意味着馬上刪除)。
  6. API 服務器刪除 Pod 的 API 對象,從任何客戶端都無法再看到該對象。

簡單概括為

  1. 刪除一個Pod后系統默認給30s的寬限期,並將它的狀態設置成Terminating
  2. kublectl發現Pod狀態為Terminating則嘗試執行preStop生命周期勾子,並可多給2s的寬限期
  3. 同時控制面將Pod中svc的endpoint中去除
  4. 寬限期到則發送TERM信號
  5. Pod還不關閉再發送SIGKILL強制關閉,並清理sandbox
  6. kubelet刪除Pod資源對象。

下面則從kublet源碼中查看這個過程

kubelet.syncLoop

前文講到kubelet.syncLoop這個循環包含了kublet主要的核心的操作,Pod的啟動從這里開始,Pod的關閉也從這里開始,與之前Pod啟動的極為相似,最終還是到達了kublet的sync方法

kubelet.syncLoop	/pkg/kubelet/kubelet.go
|--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
	|--u, open := <-configCh
	|--handler.HandlePodUpdates(u.Pods)即Kubelet.HandlePodUpdates
		|--kl.handleMirrorPod(pod, start)
			|--kl.dispatchWork
		|--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
			|--kl.podWorkers.UpdatePod即podWorkers.UpdatePod	/pkg/kubelet/pod_worker.go
				|--p.managePodLoop
					|--p.syncPodFn

Kubelet.dispatchWork

但是需要穿插提前說一下這個方法,當pod的container是Termial(status.State.Terminated不為空)且DeletionTimestamp不為空(資源被調用刪除后這個字段會填值),就會調用statusManager.TerminatePod,這個方法的作用后續會說,按着順序走調用podWorkers.UpdatePod方法,傳入的UpdateType是SyncPodUpdate。

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
	if pod.DeletionTimestamp != nil && containersTerminal {
		kl.statusManager.TerminatePod(pod)
		return
	}

	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
			if err != nil {
				metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
			}
		},
	})
}

Pod sync(Kubelet.syncPod)

還是走到kubelet.syncPod方法,在這個方法里面一開始也有一個killPod方法的的調用,但是本次進入傳參updateType是SyncPodUpdate,因此會往下走,走到runnable.Admit的判斷才是進入調用killPod方法

func (kl *Kubelet) syncPod(o syncPodOptions) error {

	// if we want to kill a pod, do it now!
	if updateType == kubetypes.SyncPodKill {
		kl.statusManager.SetPodStatus(pod, apiPodStatus)
		// we kill the pod with the specified grace period since this is a termination
		if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
	
		}
		return nil
	}

	// Kill pod if it should not be running
	if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
		var syncErr error
		if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
			syncErr = fmt.Errorf("error killing pod: %v", err)
			utilruntime.HandleError(syncErr)
		} else {
			if !runnable.Admit {
				// There was no error killing the pod, but the pod cannot be run.
				// Return an error to signal that the sync loop should back off.
				syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
			}
		}
		return syncErr
	}
}

killPod

Kubelet.syncPod		/pkg/kubelet/kubelet.go
|--Kubelet.killPod	/pkg/kubelet/kubelet_pods.go
	|--kl.containerRuntime.KillPod	
	|==kubeGenericRuntimeManager.KillPod	/pkg/kubelet/kuberuntime/kuberuntime_manager.go
	|	|- m.killPodWithSyncResult
	|--kl.containerManager.UpdateQOSCgroups()

經過多層的調用,來到kubeGenericRuntimeManager.killPodWithSyncResult方法,代碼中關鍵操作有兩個
1 先停止屬於該pod的所有containers
2 然后再停止pod sandbox容器

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
	killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
	
	// Stop all sandboxes belongs to same pod
	for _, podSandbox := range runningPod.Sandboxes {
		if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
		}
	}

	return
}

killContainer

kubeGenericRuntimeManager.killPodWithSyncResult		/pkg/kubelet/kuberuntime/kuberuntime_manager.go
|--m.killContainersWithSyncResult			/pkg/kubelet/kuberuntime/kuberuntime_container.go
	|--m.killContainer

killContainersWithSyncResult經過兩層調用來到kubeGenericRuntimeManager.killContainer,從代碼看到
1 關閉pod的寬限時間設置
2 執行pod的preStop生命周期鈎子
3 寬限時間不夠可以再多給2s
4 停止容器

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error {

	//1 關閉pod的寬限時間設置
	gracePeriod := int64(minimumGracePeriodInSeconds)
	switch {
	case pod.DeletionGracePeriodSeconds != nil:
		gracePeriod = *pod.DeletionGracePeriodSeconds
	case pod.Spec.TerminationGracePeriodSeconds != nil:
		gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
	}

	//2 執行pod的preStop生命周期鈎子
	if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
		//這里執行完會返回剩余的寬限時間
		gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
	}

	//3 寬限時間不夠可以再多給2s
	// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
	if gracePeriod < minimumGracePeriodInSeconds {
		gracePeriod = minimumGracePeriodInSeconds
	}

	//4 停止容器
	err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
}

若要往下追源碼,可在下面這方法看到往dockerDeamon發送stop容器的請求

func (cli *Client) ContainerStop(ctx context.Context, containerID string, timeout *time.Duration) error {
	query := url.Values{}
	if timeout != nil {
		query.Set("t", timetypes.DurationToSecondsString(*timeout))
	}
	resp, err := cli.post(ctx, "/containers/"+containerID+"/stop", query, nil, nil)
	ensureReaderClosed(resp)
	return err
}

調用鏈如下

m.runtimeService.StopContainer		/pkg/kubelet/kuberuntime/kuberuntime_container.go
|==remoteRuntimeService.StopContainer	/pkg/kubelet/cri/remote/remote_runtime.go
	|--r.runtimeClient.StopContainer	
	|==dockerService.StopContainer		/pkg/kubelet/dockershim/docker_container.go
		|--ds.client.StopContainer	
		|==kubeDockerClient.StopContainer	/pkg/kubelet/dockershim/libdocker/kube_docker_client.go
			|--d.client.ContainerStop	//就是上面的Client.ContainerStop

注:當使用GOALND看代碼時追到r.runtimeClient.StopContainer時會發現調到cri-api包里面的RuntimeServiceClient,這個包處於vendor中,又找不到實現,實際上這里已經是kubelet開始調CRI了,目前的例子是使用docker作為CRI,那相關代碼在/pkg/kubelet/dockershim里面找,這里是涉及到container的則看docker_container.go,像上一篇跟sandbox相關的在docker_sandbox.go里面找

StopPodSandbox

killPodWithSyncResult的另外一個關鍵調用就是調用StopPodSandbox方法,為了停止SandBox,主要步驟有
1 調用ds.network.TearDownPod:刪除pod網絡;
2 調用ds.client.StopContainer:停止pod sandbox容器。
代碼位於/pkg/kubelet/dockershim/docker_sandbox.go

func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
	ready, ok := ds.getNetworkReady(podSandboxID)
	if !hostNetwork && (ready || !ok) {
		err := ds.network.TearDownPod(namespace, name, cID)
	}
	if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {
	}
}

TearDownPod是CRI的方法,用於清除容器網絡,StopContainer則與上面停止業務容器時調用ds.client.StopContainer一樣,實際上調用kubeDockerClient.StopContainer最終往dockerDaemon發stop容器的post請求。

后續清理Pod資源

至此Pod就停下來了,從狀態Terminating轉成Terminated,Pod這個資源將要etcd中刪除,通過api-server查也查不到,這個調用api-server刪pod資源的動作由kublet的statusManager執行

在執行kubelet的Run方法跑起kubelet的核心循環syncLoop之前,啟動了各種manager,其中有一個便是statusManager,statusManager的Run方法是開了一個協程不斷去循環同步Pod狀態,觸發方式有兩種,其一是從通道里傳入,單個執行同步;另一是通過定時器觸發批量執行同步

代碼位於/pkg/kubelet/status/status_manager.go

func (m *manager) Start() {
	go wait.Forever(func() {
		for {
			select {
			case syncRequest := <-m.podStatusChannel:
				m.syncPod(syncRequest.podUID, syncRequest.status)
			case <-syncTicker:
				for i := len(m.podStatusChannel); i > 0; i-- {
					<-m.podStatusChannel
				}
				m.syncBatch()
			}
		}
	}, 0)
}

syncPod的簡略如下

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
	if m.canBeDeleted(pod, status.status) {
		deleteOptions := metav1.DeleteOptions{
			GracePeriodSeconds: new(int64),
			// Use the pod UID as the precondition for deletion to prevent deleting a
			// newly created pod with the same name and namespace.
			Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
		}
		err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
	}
}

執行canBeDeleted方法作用如函數名一致用於判定當前pod的狀況能否去執行pod資源的刪除,最終會調用到Kubelet.PodResourcesAreReclaimed方法,大致是判斷pod的業務容器和sandbox是否有清理干凈,volume有否卸載完畢,cgroup是否有清理完畢,代碼位於/pkg/kubelet/kubelet_pods.go

func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
	if !notRunning(status.ContainerStatuses) {
		// We shouldn't delete pods that still have running containers
		klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
		return false
	}
	// pod's containers should be deleted
	runtimeStatus, err := kl.podCache.Get(pod.UID)
	if err != nil {
		klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
		return false
	}
	if len(runtimeStatus.ContainerStatuses) > 0 {
		var statusStr string
		for _, status := range runtimeStatus.ContainerStatuses {
			statusStr += fmt.Sprintf("%+v ", *status)
		}
		klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
		return false
	}
	// pod's sandboxes should be deleted
	if len(runtimeStatus.SandboxStatuses) > 0 {
		var sandboxStr string
		for _, sandbox := range runtimeStatus.SandboxStatuses {
			sandboxStr += fmt.Sprintf("%+v ", *sandbox)
		}
		klog.V(3).Infof("Pod %q is terminated, but some pod sandboxes have not been cleaned up: %s", format.Pod(pod), sandboxStr)
		return false
	}

	if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
		// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
		klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
		return false
	}
	if kl.kubeletConfiguration.CgroupsPerQOS {
		pcm := kl.containerManager.NewPodContainerManager()
		if pcm.Exists(pod) {
			klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod))
			return false
		}
	}
	return true
}

同步Pod的觸發源頭

然后要找到這串邏輯觸發的源頭,就在先前kubelet.dispatchWork方法開頭的那個判斷,經過之前清理了各種容器后Pod的狀態已轉換成Terminated,再次走到dispatchWork方法時就會進入statusManager.TerminatePod

Kubelet.dispatchWork			/pkg/kubelet/kubelet.go	
|--kl.statusManager.TerminatePod(pod)
|==manager.TerminatePod			/pkg/kubelet/status/status_manager.go
	|--m.updateStatusInternal
		|--m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}

尋找statusManager,調api刪除pod資源,前者在startKubelet時開了個協程去同步,在kubelet.dispatchWork處調kl.statusManager.TerminatePod往通道里塞pod觸發邏輯

小結

本篇從kubelet的主循環開始,講述了kubelet啟動pod的過程,包括狀態更新,分配cgroup,創建容器目錄,等待volume掛載,注入imagepull secret,創建sandbox,調用cni編織網絡,啟動臨時容器,init容器,業務容器,執行postStart生命周期鈎子。

如有興趣,可閱讀鄙人“k8s源碼之旅”系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
kubelet源碼分析——監控Pod變更
scheduler源碼分析——調度流程
apiserver源碼分析——啟動流程
apiserver源碼分析——處理請求

參考文章

kubernetes/k8s CRI 分析-kubelet刪除pod分析
pod刪除主要流程源碼解析
Pod 的終止


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM