上一篇文章我們發現,kubelet在運行時會通過五種渠道獲得pod狀態變化的信息,並層層調用,直到調用到kubelet.go里的syncPod方法。
一、kubelet.go的syncPod方法
syncPod方法非常重要,存放了kubelet創建一個pod的基本邏輯。我們來仔細看一下:
pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(o syncPodOptions) error { // pull out the required options pod := o.pod mirrorPod := o.mirrorPod podStatus := o.podStatus updateType := o.updateType // if we want to kill a pod, do it now! if updateType == kubetypes.SyncPodKill { killPodOptions := o.killPodOptions if killPodOptions == nil || killPodOptions.PodStatusFunc == nil { return fmt.Errorf("kill pod options are required if update type is kill") } apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus) kl.statusManager.SetPodStatus(pod, apiPodStatus) // we kill the pod with the specified grace period since this is a termination if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) // there was an error killing the pod, so we return that error directly utilruntime.HandleError(err) return err } return nil } // Latency measurements for the main workflow are relative to the // first time the pod was seen by the API server. var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } // Record pod worker start latency if being created // TODO: make pod workers record their own latencies if updateType == kubetypes.SyncPodCreate { if !firstSeenTime.IsZero() { // This is the first time we are syncing the pod. Record the latency // since kubelet first saw the pod if firstSeenTime is set. metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } else { klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) } } // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // set pod IP to hostIP directly in runtime.GetPodStatus podStatus.IP = apiPodStatus.PodIP // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } runnable := kl.canRunPod(pod) if !runnable.Admit { // Pod is not runnable; update the Pod and Container statuses to why. apiPodStatus.Reason = runnable.Reason apiPodStatus.Message = runnable.Message // Waiting containers are not creating. const waitingReason = "Blocked" for _, cs := range apiPodStatus.InitContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } for _, cs := range apiPodStatus.ContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } } // Update status in the status manager kl.statusManager.SetPodStatus(pod, apiPodStatus) // Kill pod if it should not be running if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed { var syncErr error if err := kl.killPod(pod, nil, podStatus, nil); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) syncErr = fmt.Errorf("error killing pod: %v", err) utilruntime.HandleError(syncErr) } else { if !runnable.Admit { // There was no error killing the pod, but the pod cannot be run. // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } } return syncErr } // If the network plugin is not ready, only start the pod if it uses the host network if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs) return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs) } // Create Cgroups for the pod and apply resource parameters // to them if cgroups-per-qos flag is enabled. pcm := kl.containerManager.NewPodContainerManager() // If pod has already been terminated then we need not create // or update the pod's cgroup if !kl.podIsTerminated(pod) { // When the kubelet is restarted with the cgroups-per-qos // flag enabled, all the pod's running containers // should be killed intermittently and brought back up // under the qos cgroup hierarchy. // Check if this is the pod's first sync firstSync := true for _, containerStatus := range apiPodStatus.ContainerStatuses { if containerStatus.State.Running != nil { firstSync = false break } } // Don't kill containers in pod if pod's cgroups already // exists or the pod is running for the first time podKilled := false if !pcm.Exists(pod) && !firstSync { if err := kl.killPod(pod, nil, podStatus, nil); err == nil { podKilled = true } } // Create and Update pod's Cgroups // Don't create cgroups for run once pod if it was killed above // The current policy is not to restart the run once pods when // the kubelet is restarted with the new flag as run once pods are // expected to run only once and if the kubelet is restarted then // they are not expected to run again. // We don't create and apply updates to cgroup if its a run once pod and was killed above if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) { if !pcm.Exists(pod) { if err := kl.containerManager.UpdateQOSCgroups(); err != nil { klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err) } if err := pcm.EnsureExists(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } } // Create Mirror Pod for Static Pod if it doesn't already exist if kubepod.IsStaticPod(pod) { ... } // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { ... } // Volume manager will not mount volumes for terminated pods if !kl.podIsTerminated(pod) { // Wait for volumes to attach/mount if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err) klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) return err } } // Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod) // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { // Do not return error if the only failures were pods in backoff for _, r := range result.SyncResults { if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // Do not record an event here, as we keep all event logging for sync pod failures // local to container runtime so we get better errors return err } } return nil } return nil }
可以看到,方法是按照下面的步驟更新pod的:
(1)如果是刪除操作,則優先執行。
(2)記錄pod從創建到運行花費的時間。
(3)判斷pod是否能正常運行,如不能則記錄原因。
(4)刪除不應該運行的pod。
(5)判斷網絡資源是否可用。
(6)為pod創建cgroup。
(7)處理靜態pod,創建pod的元數據目錄。
(8)通過volumemanager,實現volume的掛載。
(9)獲取拉取鏡像的pullsecret。
(10)調用container runtime的syncPod方法,實現容器創建的主邏輯。
以上,就是kubelet在更新pod時采取的一系列步驟。下面我們再來看一下container runtime的syncPod方法。
二、kuberuntime_manager.go的syncPod方法
pkg/kubelet/kuberumtime/kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create init containers. // 6. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. podContainerChanges := m.computePodActions(pod, podStatus) klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod)) if podContainerChanges.CreateSandbox { ... } // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.KillPod { ... } else { // Step 3: kill any running containers in this pod which are not to keep. for containerID, containerInfo := range podContainerChanges.ContainersToKill { klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err) return } } } ...
// Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod)) createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
...
} ...
// Step 5: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod)) return } klog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil { startContainerResult.Fail(err, msg) utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg)) return } // Successfully started the container; clear the entry in the failure klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) } // Step 6: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod)) continue } klog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil { startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err == images.ErrImagePullBackOff: klog.V(3).Infof("container start failed: %v: %s", err, msg) default: utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg)) } continue } } return }
注釋寫得很直觀,首先通過沙盒判斷容器狀態的變化,做出相應的處理。之后,對於新容器,則依次創建沙盒、啟動容器和主容器。前面部分略去,重點看后面啟動容器的地方,是調用了kubeGenericRuntimeManager的startContainer方法。
三、startContainer
startContainer方法定義了kubelet啟動容器的具體流程:
pkg/kubelet/kuberumtime/kuberuntime_manager.go
// startContainer starts a container and returns a message indicates why it is failed on error. // It starts the container through the following steps: // * pull the image // * create the container // * start the container // * run the post start lifecycle hooks (if applicable) func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) { // Step 1: pull the image. imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig) if err != nil { m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err)) return msg, err } // Step 2: create the container. ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err) } klog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref) // For a new container, the RestartCount should be 0 restartCount := 0 containerStatus := podStatus.FindContainerStatusByName(container.Name) if containerStatus != nil { restartCount = containerStatus.RestartCount + 1 } containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType) if cleanupAction != nil { defer cleanupAction() } if err != nil { m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err)) return grpc.ErrorDesc(err), ErrCreateContainerConfig } containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig) if err != nil { m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err)) return grpc.ErrorDesc(err), ErrCreateContainer } err = m.internalLifecycle.PreStartContainer(pod, container, containerID) if err != nil { m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err)) return grpc.ErrorDesc(err), ErrPreStartHook } m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container") if ref != nil { m.containerRefManager.SetRef(kubecontainer.ContainerID{ Type: m.runtimeName, ID: containerID, }, ref) } // Step 3: start the container. err = m.runtimeService.StartContainer(containerID) if err != nil { m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err)) return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer } m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container") // Symlink container logs to the legacy container log location for cluster logging // support. // TODO(random-liu): Remove this after cluster logging supports CRI container log path. containerMeta := containerConfig.GetMetadata() sandboxMeta := podSandboxConfig.GetMetadata() legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name, sandboxMeta.Namespace) containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath) // only create legacy symlink if containerLog path exists (or the error is not IsNotExist). // Because if containerLog path does not exist, only dandling legacySymlink is created. // This dangling legacySymlink is later removed by container gc, so it does not make sense // to create it in the first place. it happens when journald logging driver is used with docker. if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) { if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil { klog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v", legacySymlink, containerID, containerLog, err) } } // Step 4: execute the post start hook. if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { kubeContainerID := kubecontainer.ContainerID{ Type: m.runtimeName, ID: containerID, } msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart) if handlerErr != nil { m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg) if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil { klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v", container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err) } return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr) } } return "", nil }
如注釋所寫,大體上是4步,即:拉取鏡像、創建容器、啟動容器、執行鈎子。另外還有一些日志處理等細節操作。
值得關注的是,方法在創建或殺死容器時,調用了m.runtimeService的CreateContainer、StartContainer等方法。
我們以CreateContainer為例,進入這一方法,發現其調用了pkg/kubelet/remote/remote_runtime.go文件中的CreateContainer方法,而這一方法本質上又調用了另一個文件中的CreateContainer方法:
pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go
func (c *runtimeServiceClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) { out := new(CreateContainerResponse) err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/CreateContainer", in, out, c.cc, opts...) if err != nil { return nil, err } return out, nil }
然后我們看到,這個CreateContainer方法通過grpc的方式調用RuntimeService的CreateContainer方法。這恰恰是我們在第二篇文章中提到的kubelet 的grpc server(https://www.cnblogs.com/00986014w/p/10895532.html)。至此,kubelet中方法的定義和調用就形成了一個閉環。
四、總結
kubelet的代碼雖然多,但是邏輯比較清晰。本質上就是先創建一個kubelet實例,啟動grpc server,然后運行kubelet,通過五種途徑獲取pod的狀態變化,執行pod更新操作,並最終通過grpc的方式調用grpc server上的方法。