上一篇說到kublet如何啟動一個pod,本篇講述如何關閉一個Pod,引用一段來自官方文檔介紹pod的生命周期的話
- 你使用 kubectl 工具手動刪除某個特定的 Pod,而該 Pod 的體面終止限期是默認值(30 秒)。
- API 服務器中的 Pod 對象被更新,記錄涵蓋體面終止限期在內 Pod 的最終死期,超出所計算時間點則認為 Pod 已死(dead)。 如果你使用 kubectl describe 來查驗你正在刪除的 Pod,該 Pod 會顯示為 "Terminating" (正在終止)。 在 Pod 運行所在的節點上:kubelet 一旦看到 Pod 被標記為正在終止(已經設置了體面終止限期),kubelet 即開始本地的 Pod 關閉過程。
- 如果 Pod 中的容器之一定義了 preStop 回調, kubelet 開始在容器內運行該回調邏輯。如果超出體面終止限期時,preStop 回調邏輯 仍在運行,kubelet 會請求給予該 Pod 的寬限期一次性增加 2 秒鍾。
說明: 如果 preStop 回調所需要的時間長於默認的體面終止限期,你必須修改 terminationGracePeriodSeconds 屬性值來使其正常工作。
- kubelet 接下來觸發容器運行時發送 TERM 信號給每個容器中的進程 1。
說明: Pod 中的容器會在不同時刻收到 TERM 信號,接收順序也是不確定的。 如果關閉的順序很重要,可以考慮使用 preStop 回調邏輯來協調。
- 與此同時,kubelet 啟動體面關閉邏輯,控制面會將 Pod 從對應的端點列表(以及端點切片列表, 如果啟用了的話)中移除,過濾條件是 Pod 被對應的 服務以某 選擇算符選定。 ReplicaSets和其他工作負載資源 不再將關閉進程中的 Pod 視為合法的、能夠提供服務的副本。關閉動作很慢的 Pod 也無法繼續處理請求數據,因為負載均衡器(例如服務代理)已經在終止寬限期開始的時候 將其從端點列表中移除。
- 超出終止寬限期限時,kubelet 會觸發強制關閉過程。容器運行時會向 Pod 中所有容器內 仍在運行的進程發送 SIGKILL 信號。 kubelet 也會清理隱藏的 pause 容器,如果容器運行時使用了這種容器的話。
- kubelet 觸發強制從 API 服務器上刪除 Pod 對象的邏輯,並將體面終止限期設置為 0 (這意味着馬上刪除)。
- API 服務器刪除 Pod 的 API 對象,從任何客戶端都無法再看到該對象。
簡單概括為
- 刪除一個Pod后系統默認給30s的寬限期,並將它的狀態設置成Terminating
- kublectl發現Pod狀態為Terminating則嘗試執行preStop生命周期勾子,並可多給2s的寬限期
- 同時控制面將Pod中svc的endpoint中去除
- 寬限期到則發送TERM信號
- Pod還不關閉再發送SIGKILL強制關閉,並清理sandbox
- kubelet刪除Pod資源對象。
下面則從kublet源碼中查看這個過程
kubelet.syncLoop
前文講到kubelet.syncLoop這個循環包含了kublet主要的核心的操作,Pod的啟動從這里開始,Pod的關閉也從這里開始,與之前Pod啟動的極為相似,最終還是到達了kublet的sync方法
kubelet.syncLoop /pkg/kubelet/kubelet.go
|--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
|--u, open := <-configCh
|--handler.HandlePodUpdates(u.Pods)即Kubelet.HandlePodUpdates
|--kl.handleMirrorPod(pod, start)
|--kl.dispatchWork
|--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
|--kl.podWorkers.UpdatePod即podWorkers.UpdatePod /pkg/kubelet/pod_worker.go
|--p.managePodLoop
|--p.syncPodFn
Kubelet.dispatchWork
但是需要穿插提前說一下這個方法,當pod的container是Termial(status.State.Terminated不為空)且DeletionTimestamp不為空(資源被調用刪除后這個字段會填值),就會調用statusManager.TerminatePod,這個方法的作用后續會說,按着順序走調用podWorkers.UpdatePod方法,傳入的UpdateType是SyncPodUpdate。
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
if pod.DeletionTimestamp != nil && containersTerminal {
kl.statusManager.TerminatePod(pod)
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})
}
Pod sync(Kubelet.syncPod)
還是走到kubelet.syncPod方法,在這個方法里面一開始也有一個killPod方法的的調用,但是本次進入傳參updateType是SyncPodUpdate,因此會往下走,走到runnable.Admit的判斷才是進入調用killPod方法
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we kill the pod with the specified grace period since this is a termination
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
}
return nil
}
// Kill pod if it should not be running
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
var syncErr error
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %v", err)
utilruntime.HandleError(syncErr)
} else {
if !runnable.Admit {
// There was no error killing the pod, but the pod cannot be run.
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
}
}
return syncErr
}
}
killPod
Kubelet.syncPod /pkg/kubelet/kubelet.go
|--Kubelet.killPod /pkg/kubelet/kubelet_pods.go
|--kl.containerRuntime.KillPod
|==kubeGenericRuntimeManager.KillPod /pkg/kubelet/kuberuntime/kuberuntime_manager.go
| |- m.killPodWithSyncResult
|--kl.containerManager.UpdateQOSCgroups()
經過多層的調用,來到kubeGenericRuntimeManager.killPodWithSyncResult方法,代碼中關鍵操作有兩個
1 先停止屬於該pod的所有containers
2 然后再停止pod sandbox容器
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
// Stop all sandboxes belongs to same pod
for _, podSandbox := range runningPod.Sandboxes {
if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
}
}
return
}
killContainer
kubeGenericRuntimeManager.killPodWithSyncResult /pkg/kubelet/kuberuntime/kuberuntime_manager.go
|--m.killContainersWithSyncResult /pkg/kubelet/kuberuntime/kuberuntime_container.go
|--m.killContainer
killContainersWithSyncResult經過兩層調用來到kubeGenericRuntimeManager.killContainer,從代碼看到
1 關閉pod的寬限時間設置
2 執行pod的preStop生命周期鈎子
3 寬限時間不夠可以再多給2s
4 停止容器
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error {
//1 關閉pod的寬限時間設置
gracePeriod := int64(minimumGracePeriodInSeconds)
switch {
case pod.DeletionGracePeriodSeconds != nil:
gracePeriod = *pod.DeletionGracePeriodSeconds
case pod.Spec.TerminationGracePeriodSeconds != nil:
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
//2 執行pod的preStop生命周期鈎子
if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
//這里執行完會返回剩余的寬限時間
gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
}
//3 寬限時間不夠可以再多給2s
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
//4 停止容器
err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
}
若要往下追源碼,可在下面這方法看到往dockerDeamon發送stop容器的請求
func (cli *Client) ContainerStop(ctx context.Context, containerID string, timeout *time.Duration) error {
query := url.Values{}
if timeout != nil {
query.Set("t", timetypes.DurationToSecondsString(*timeout))
}
resp, err := cli.post(ctx, "/containers/"+containerID+"/stop", query, nil, nil)
ensureReaderClosed(resp)
return err
}
調用鏈如下
m.runtimeService.StopContainer /pkg/kubelet/kuberuntime/kuberuntime_container.go
|==remoteRuntimeService.StopContainer /pkg/kubelet/cri/remote/remote_runtime.go
|--r.runtimeClient.StopContainer
|==dockerService.StopContainer /pkg/kubelet/dockershim/docker_container.go
|--ds.client.StopContainer
|==kubeDockerClient.StopContainer /pkg/kubelet/dockershim/libdocker/kube_docker_client.go
|--d.client.ContainerStop //就是上面的Client.ContainerStop
注:當使用GOALND看代碼時追到r.runtimeClient.StopContainer時會發現調到cri-api包里面的RuntimeServiceClient,這個包處於vendor中,又找不到實現,實際上這里已經是kubelet開始調CRI了,目前的例子是使用docker作為CRI,那相關代碼在/pkg/kubelet/dockershim里面找,這里是涉及到container的則看docker_container.go,像上一篇跟sandbox相關的在docker_sandbox.go里面找
StopPodSandbox
killPodWithSyncResult的另外一個關鍵調用就是調用StopPodSandbox方法,為了停止SandBox,主要步驟有
1 調用ds.network.TearDownPod:刪除pod網絡;
2 調用ds.client.StopContainer:停止pod sandbox容器。
代碼位於/pkg/kubelet/dockershim/docker_sandbox.go
func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
ready, ok := ds.getNetworkReady(podSandboxID)
if !hostNetwork && (ready || !ok) {
err := ds.network.TearDownPod(namespace, name, cID)
}
if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {
}
}
TearDownPod是CRI的方法,用於清除容器網絡,StopContainer則與上面停止業務容器時調用ds.client.StopContainer一樣,實際上調用kubeDockerClient.StopContainer最終往dockerDaemon發stop容器的post請求。
后續清理Pod資源
至此Pod就停下來了,從狀態Terminating轉成Terminated,Pod這個資源將要etcd中刪除,通過api-server查也查不到,這個調用api-server刪pod資源的動作由kublet的statusManager執行
在執行kubelet的Run方法跑起kubelet的核心循環syncLoop之前,啟動了各種manager,其中有一個便是statusManager,statusManager的Run方法是開了一個協程不斷去循環同步Pod狀態,觸發方式有兩種,其一是從通道里傳入,單個執行同步;另一是通過定時器觸發批量執行同步
代碼位於/pkg/kubelet/status/status_manager.go
func (m *manager) Start() {
go wait.Forever(func() {
for {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
for i := len(m.podStatusChannel); i > 0; i-- {
<-m.podStatusChannel
}
m.syncBatch()
}
}
}, 0)
}
syncPod的簡略如下
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if m.canBeDeleted(pod, status.status) {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
// Use the pod UID as the precondition for deletion to prevent deleting a
// newly created pod with the same name and namespace.
Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
}
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
}
}
執行canBeDeleted方法作用如函數名一致用於判定當前pod的狀況能否去執行pod資源的刪除,最終會調用到Kubelet.PodResourcesAreReclaimed方法,大致是判斷pod的業務容器和sandbox是否有清理干凈,volume有否卸載完畢,cgroup是否有清理完畢,代碼位於/pkg/kubelet/kubelet_pods.go
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
if !notRunning(status.ContainerStatuses) {
// We shouldn't delete pods that still have running containers
klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return false
}
// pod's containers should be deleted
runtimeStatus, err := kl.podCache.Get(pod.UID)
if err != nil {
klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
var statusStr string
for _, status := range runtimeStatus.ContainerStatuses {
statusStr += fmt.Sprintf("%+v ", *status)
}
klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr)
return false
}
// pod's sandboxes should be deleted
if len(runtimeStatus.SandboxStatuses) > 0 {
var sandboxStr string
for _, sandbox := range runtimeStatus.SandboxStatuses {
sandboxStr += fmt.Sprintf("%+v ", *sandbox)
}
klog.V(3).Infof("Pod %q is terminated, but some pod sandboxes have not been cleaned up: %s", format.Pod(pod), sandboxStr)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
return false
}
if kl.kubeletConfiguration.CgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
if pcm.Exists(pod) {
klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod))
return false
}
}
return true
}
同步Pod的觸發源頭
然后要找到這串邏輯觸發的源頭,就在先前kubelet.dispatchWork方法開頭的那個判斷,經過之前清理了各種容器后Pod的狀態已轉換成Terminated,再次走到dispatchWork方法時就會進入statusManager.TerminatePod
Kubelet.dispatchWork /pkg/kubelet/kubelet.go
|--kl.statusManager.TerminatePod(pod)
|==manager.TerminatePod /pkg/kubelet/status/status_manager.go
|--m.updateStatusInternal
|--m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}
尋找statusManager,調api刪除pod資源,前者在startKubelet時開了個協程去同步,在kubelet.dispatchWork處調kl.statusManager.TerminatePod往通道里塞pod觸發邏輯
小結
本篇從kubelet的主循環開始,講述了kubelet啟動pod的過程,包括狀態更新,分配cgroup,創建容器目錄,等待volume掛載,注入imagepull secret,創建sandbox,調用cni編織網絡,啟動臨時容器,init容器,業務容器,執行postStart生命周期鈎子。
如有興趣,可閱讀鄙人“k8s源碼之旅”系列的其他文章
kubelet源碼分析——kubelet簡介與啟動
kubelet源碼分析——啟動Pod
kubelet源碼分析——關閉Pod
kubelet源碼分析——監控Pod變更
scheduler源碼分析——調度流程
apiserver源碼分析——啟動流程
apiserver源碼分析——處理請求