pod刪除主要流程源碼解析


本文以v1.12版本進行分析

當一個pod刪除時,client端向apiserver發送請求,apiserver將pod的deletionTimestamp打上時間。kubelet watch到該事件,開始處理。

syncLoop

kubelet對pod的處理主要都是在syncLoop中處理的。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
for {
...
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
...

與pod刪除主要在syncLoopIteration中需要關注的是以下這兩個。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
...
		switch u.Op {
...
		case kubetypes.UPDATE:
			handler.HandlePodUpdates(u.Pods)
...
	case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() {
		} else {
			if err := handler.HandlePodCleanups(); err != nil {
				glog.Errorf("Failed cleaning pods: %v", err)
			}
		}
	}

第一個是由於發送給apiserver的DELETE請求觸發的,增加了deletionTimestamp的事件。這里對應於kubetypes.UPDATE。也就是會走到HandlePodUpdates函數。

另外一個與delete相關的是每2s執行一次的來自於housekeepingCh的定時事件,用於清理pod,執行的是handler.HandlePodCleanups函數。這兩個作用不同,下面分別進行介紹。

HandlePodUpdates

先看HandlePodUpdates這個流程。只要打上了deletionTimestamp,就必然走到這個流程里去。

func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
	for _, pod := range pods {
...
		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
	}
}

在HandlePodUpdates中,進而將pod的信息傳遞到dispatchWork中處理。

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	if kl.podIsTerminated(pod) {
		if pod.DeletionTimestamp != nil {
			kl.statusManager.TerminatePod(pod)
		}
		return
	}
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
...

這里首先通過判斷了kl.podIsTerminated(pod)判斷pod是不是已經處於了Terminated狀態。如果是的話,則不進行下面的kl.podWorkers.UpdatePod。

func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
	status, ok := kl.statusManager.GetPodStatus(pod.UID)
	if !ok {
		status = pod.Status
	}
	return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}

這個地方特別值得注意的是,並不是由了DeletionTimestamp就會認為是Terminated狀態,而是有DeletionTimestamp且所有的容器不在運行了。也就是說如果是一個正在正常運行的pod,是也會走到kl.podWorkers.UpdatePod中的。UpdatePod通過一系列函數調用,最終會通過異步的方式執行syncPod函數中進入到syncPod函數中。

func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
	if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
		var syncErr error
		if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
...

在syncPod中,調用killPod從而對pod進行停止操作。

killPod

killPod是停止pod的主體。在很多地方都會使用。這里主要介紹下起主要的工作流程。停止pod的過程主要發生在killPodWithSyncResult函數中。

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
	killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
...
	for _, podSandbox := range runningPod.Sandboxes {
			if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
...

killPodWithSyncResult的主要工作分為兩個部分。killContainersWithSyncResult負責將pod中的container停止掉,在停止后再執行StopPodSandbox。

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, reason string, gracePeriodOverride *int64) error {
	if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
		return err
	}
...
	err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)

killContainersWithSyncResult的主要工作是在killContainer中完成的,這里可以看到,其中的主要兩個步驟是在容器中進行prestop的操作。待其成功后,進行container的stop工作。至此所有的應用容器都已經停止了。下一步是停止pause容器。而StopPodSandbox就是執行這一過程的。將sandbox,也就是pause容器停止掉。StopPodSandbox是在dockershim中執行的。

func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
...
if !hostNetwork && (ready || !ok) {
...
		err := ds.network.TearDownPod(namespace, name, cID, annotations)
...
	}
	if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {

StopPodSandbox中主要的部分是先進行網絡卸載,再停止相應的容器。在完成StopPodSandbox后,至此pod的所有容器都已經停止完成。至於volume的卸載,是在volumeManager中進行的。本文不做單獨介紹了。停止后的容器在pod徹底清理后,會被gc回收。這里也不展開講了。

HandlePodCleanups

上面這個流程並不是刪除流程的全部。一個典型的情況就是,如果container都不是running,那么在UpdatePod的時候都return了,那么又由誰來處理呢?這里我們回到最開始,就是那個每2s執行一次的HandlePodCleanups的流程。也就是說比如container處於crash,container正好不是running等情況,其實是在這個流程里進行處理的。當然HandlePodCleanups的作用不僅僅是清理not running的pod,再比如數據已經在apiserver中強制清理掉了,或者由於其他原因這個節點上還有一些沒有完成清理的pod,都是在這個流程中進行處理。

func (kl *Kubelet) HandlePodCleanups() error {
...	
	for _, pod := range runningPods {
		if _, found := desiredPods[pod.ID]; !found {
			kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
		}
	}

runningPods是從cache中獲取節點現有的pod,而desiredPods則是節點上應該存在未被停止的pod。如果存在runningPods中有而desiredPods中沒有的pod,那么它應該被停止,所以發送到podKillingCh中。

func (kl *Kubelet) podKiller() {
...
	for podPair := range kl.podKillingCh {
...

		if !exists {
			go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
				glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
				err := kl.killPod(apiPod, runningPod, nil, nil)
...
			}(apiPod, runningPod)
		}
	}
}

在podKiller流程中,會去接收來自podKillingCh的消息,從而執行killPod,上文已經做了該函數的介紹了。

statusManager

在最后,statusManager中的syncPod流程,將會進行檢測,通過canBeDeleted確認是否所有的容器關閉了,volume卸載了,cgroup清理了等等。如果這些全部完成了,則從apiserver中將pod信息徹底刪除。

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
...
	if m.canBeDeleted(pod, status.status) {
		deleteOptions := metav1.NewDeleteOptions(0)
		deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
		err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM