基於tag v1.17.4
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
概述
volume manager存在於kubelet中,主要是管理存儲卷的attach/detach(與AD controller作用相同,通過kubelet啟動參數控制哪個組件來做該操作,后續會詳細介紹)、mount/umount等操作。
簡介
容器的存儲掛載分為兩大步:
(1)attach;
(2)mount。
解除容器存儲掛載分為兩大步:
(1)umount;
(2)detach。
attach/detach操作可以由kube-controller-manager或者kubelet中的volume manager來完成,根據啟動參數enable-controller-attach-detach
來決定;而mount/umount操作只由kubelet中的volume manager來完成。
VolumeManager接口
(1)運行在kubelet 里讓存儲Ready的部件,主要是mount/unmount(attach/detach可選);
(2)pod調度到這個node上后才會有卷的相應操作,所以它的觸發端是kubelet(嚴格講是kubelet里的pod manager),根據Pod Manager里pod spec里申明的存儲來觸發卷的掛載操作;
(3)Kubelet會監聽到調度到該節點上的pod聲明,會把pod緩存到Pod Manager中,VolumeManager通過Pod Manager獲取PV/PVC的狀態,並進行分析出具體的attach/detach、mount/umount, 操作然后調用plugin進行相應的業務處理。
// pkg/kubelet/volumemanager/volume_manager.go
// VolumeManager runs a set of asynchronous loops that figure out which volumes
// need to be attached/mounted/unmounted/detached based on the pods scheduled on
// this node and makes it so.
type VolumeManager interface {
// Starts the volume manager and all the asynchronous loops that it controls
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// WaitForAttachAndMount processes the volumes referenced in the specified
// pod and blocks until they are all attached and mounted (reflected in
// actual state of the world).
// An error is returned if all volumes are not attached and mounted within
// the duration defined in podAttachAndMountTimeout.
WaitForAttachAndMount(pod *v1.Pod) error
// GetMountedVolumesForPod returns a VolumeMap containing the volumes
// referenced by the specified pod that are successfully attached and
// mounted. The key in the map is the OuterVolumeSpecName (i.e.
// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
// volumes.
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
// GetExtraSupplementalGroupsForPod returns a list of the extra
// supplemental groups for the Pod. These extra supplemental groups come
// from annotations on persistent volumes that the pod depends on.
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
// interface and are currently in use according to the actual and desired
// state of the world caches. A volume is considered "in use" as soon as it
// is added to the desired state of world, indicating it *should* be
// attached to this node and remains "in use" until it is removed from both
// the desired state of the world and the actual state of the world, or it
// has been unmounted (as indicated in actual state of world).
GetVolumesInUse() []v1.UniqueVolumeName
// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
// has been synced at least once after kubelet starts so that it is safe to update mounted
// volume list retrieved from actual state.
ReconcilerStatesHasBeenSynced() bool
// VolumeIsAttached returns true if the given volume is attached to this
// node.
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
// Marks the specified volume as having successfully been reported as "in
// use" in the nodes's volume status.
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
兩個關鍵結構體
(1)desiredStateOfWorld: 集群中期望要達到的數據卷掛載狀態,簡稱DSW。假設集群內新調度了一個Pod,此時要用到volume,Pod被分配到某節點NodeA上。 此時,對於AD controller來說,DSW中節點NodeA應該有被分配的volume在准備被這個Pod掛載。
(2)actualStateOfWorld: 集群中實際存在的數據卷掛載狀態,簡稱ASW。實際狀態未必是和期望狀態一樣,比如實際狀態Node上有剛調度過來的Pod,但是還沒有相應已經attached狀態的volume。
actualStateOfWorld相關結構體
actualStateOfWorld
實際存儲掛載狀態結構體。
actualStateOfWorld: 實際存儲掛載狀態,簡稱ASW。包括了已經成功掛載到node節點的存儲,以及已經成功掛載該存儲的pod列表。
主要屬性attachedVolumes,數據結構map,key為已經成功掛載到node的存儲名稱,value為已經成功掛載到node節點的存儲信息。
// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
type actualStateOfWorld struct {
// nodeName is the name of this node. This value is passed to Attach/Detach
nodeName types.NodeName
// attachedVolumes is a map containing the set of volumes the kubelet volume
// manager believes to be successfully attached to this node. Volume types
// that do not implement an attacher interface are assumed to be in this
// state by default.
// The key in this map is the name of the volume and the value is an object
// containing more information about the attached volume.
attachedVolumes map[v1.UniqueVolumeName]attachedVolume
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
attachedVolume
主要屬性mountedPods,數據結構map,key為pod名稱,value為已經成功掛載了該存儲的pod列表。
// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// attachedVolume represents a volume the kubelet volume manager believes to be
// successfully attached to a node it is managing. Volume types that do not
// implement an attacher are assumed to be in this state.
type attachedVolume struct {
// volumeName contains the unique identifier for this volume.
volumeName v1.UniqueVolumeName
// mountedPods is a map containing the set of pods that this volume has been
// successfully mounted to. The key in this map is the name of the pod and
// the value is a mountedPod object containing more information about the
// pod.
mountedPods map[volumetypes.UniquePodName]mountedPod
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to plugin methods.
// In particular, the Unmount method uses spec.Name() as the volumeSpecName
// in the mount path:
// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/
spec *volume.Spec
// pluginName is the Unescaped Qualified name of the volume plugin used to
// attach and mount this volume. It is stored separately in case the full
// volume spec (everything except the name) can not be reconstructed for a
// volume that should be unmounted (which would be the case for a mount path
// read from disk without a full volume spec).
pluginName string
// pluginIsAttachable indicates the volume plugin used to attach and mount
// this volume implements the volume.Attacher interface
pluginIsAttachable bool
// globallyMounted indicates that the volume is mounted to the underlying
// device at a global mount point. This global mount point must be unmounted
// prior to detach.
globallyMounted bool
// devicePath contains the path on the node where the volume is attached for
// attachable volumes
devicePath string
// deviceMountPath contains the path on the node where the device should
// be mounted after it is attached.
deviceMountPath string
}
mountedPod
pod相關信息。
// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// The mountedPod object represents a pod for which the kubelet volume manager
// believes the underlying volume has been successfully been mounted.
type mountedPod struct {
// the name of the pod
podName volumetypes.UniquePodName
// the UID of the pod
podUID types.UID
// mounter used to mount
mounter volume.Mounter
// mapper used to block volumes support
blockVolumeMapper volume.BlockVolumeMapper
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to plugin methods.
// In particular, the Unmount method uses spec.Name() as the volumeSpecName
// in the mount path:
// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/
volumeSpec *volume.Spec
// outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced
// directly in the pod. If the volume was referenced through a persistent
// volume claim, this contains the volume.Spec.Name() of the persistent
// volume claim
outerVolumeSpecName string
// remountRequired indicates the underlying volume has been successfully
// mounted to this pod but it should be remounted to reflect changes in the
// referencing pod.
// Atomically updating volumes depend on this to update the contents of the
// volume. All volume mounting calls should be idempotent so a second mount
// call for volumes that do not need to update contents should not fail.
remountRequired bool
// volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string
// fsResizeRequired indicates the underlying volume has been successfully
// mounted to this pod but its size has been expanded after that.
fsResizeRequired bool
}
desiredStateOfWorld相關結構體
desiredStateOfWorld
期望存儲掛載狀態結構體。
desiredStateOfWorld: 期望的存儲掛載狀態,簡稱DSW。包括了期望掛載到node節點的存儲,以及期望掛載該存儲的pod列表。
主要屬性volumesToMount,數據結構map,key為期望掛載到該node節點的存儲,value為該存儲相關信息。
// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
type desiredStateOfWorld struct {
// volumesToMount is a map containing the set of volumes that should be
// attached to this node and mounted to the pods referencing it. The key in
// the map is the name of the volume and the value is a volume object
// containing more information about the volume.
volumesToMount map[v1.UniqueVolumeName]volumeToMount
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
// podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod.
podErrors map[types.UniquePodName]sets.String
sync.RWMutex
}
volumeToMount
主要屬性podsToMount,數據結構map,key為pod名稱,value為期望掛載該存儲的所有pod的相關信息。
// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// The volume object represents a volume that should be attached to this node,
// and mounted to podsToMount.
type volumeToMount struct {
// volumeName contains the unique identifier for this volume.
volumeName v1.UniqueVolumeName
// podsToMount is a map containing the set of pods that reference this
// volume and should mount it once it is attached. The key in the map is
// the name of the pod and the value is a pod object containing more
// information about the pod.
podsToMount map[types.UniquePodName]podToMount
// pluginIsAttachable indicates that the plugin for this volume implements
// the volume.Attacher interface
pluginIsAttachable bool
// pluginIsDeviceMountable indicates that the plugin for this volume implements
// the volume.DeviceMounter interface
pluginIsDeviceMountable bool
// volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string
// reportedInUse indicates that the volume was successfully added to the
// VolumesInUse field in the node's status.
reportedInUse bool
// desiredSizeLimit indicates the desired upper bound on the size of the volume
// (if so implemented)
desiredSizeLimit *resource.Quantity
}
podToMount
podToMount結構體主要記錄了pod信息。
// pkg/kubelet/volumemanager/cache/actual_state_of_world.go
// The pod object represents a pod that references the underlying volume and
// should mount it once it is attached.
type podToMount struct {
// podName contains the name of this pod.
podName types.UniquePodName
// Pod to mount the volume to. Used to create NewMounter.
pod *v1.Pod
// volume spec containing the specification for this volume. Used to
// generate the volume plugin object, and passed to plugin methods.
// For non-PVC volumes this is the same as defined in the pod object. For
// PVC volumes it is from the dereferenced PV object.
volumeSpec *volume.Spec
// outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced
// directly in the pod. If the volume was referenced through a persistent
// volume claim, this contains the volume.Spec.Name() of the persistent
// volume claim
outerVolumeSpecName string
}
方法入口分析
kubelet管理volume的方式基於兩個不同的狀態:
(1)DesiredStateOfWorld:預期中,volume的掛載情況,簡稱預期狀態。從pod對象中獲取預期狀態;
(2)ActualStateOfWorld:實際中,voluem的掛載情況,簡稱實際狀態。從node.Status.VolumesAttached獲取實際狀態,並根據調諧更新實際狀態。
Run方法中主要包含了2個方法:
(1)vm.desiredStateOfWorldPopulator.Run
(2)vm.reconciler.Run
下面將一一分析。
// pkg/kubelet/volumemanager/volume_manager.go
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
// 從apiserver同步pod信息,來更新DesiredStateOfWorld
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
klog.V(2).Infof("The desired_state_of_world populator starts")
klog.Infof("Starting Kubelet Volume Manager")
// 預期狀態和實際狀態的協調者,負責調整實際狀態至預期狀態
go vm.reconciler.Run(stopCh)
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
if vm.kubeClient != nil {
// start informer for CSIDriver
vm.volumePluginMgr.Run(stopCh)
}
<-stopCh
klog.Infof("Shutting down Kubelet Volume Manager")
}
1 vm.desiredStateOfWorldPopulator.Run
主要邏輯為調用dswp.populatorLoop(),根據pod的volume信息,來更新DesiredStateOfWorld;並且不斷循環調用該方法,來不斷更新DesiredStateOfWorld。
dswp.loopSleepDuration的值為100ms。
// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
klog.Infof("Desired state populator starts to run")
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoop()
return done, nil
}, stopCh)
dswp.hasAddedPodsLock.Lock()
dswp.hasAddedPods = true
dswp.hasAddedPodsLock.Unlock()
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}
1.1 dswp.populatorLoop()
兩個關鍵方法:
dswp.findAndAddNewPods():添加volume進DesiredStateOfWorld;
dswp.findAndRemoveDeletedPods():從DesiredStateOfWorld刪除volume。
// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
klog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
1.1.1 dswp.findAndAddNewPods()
主要邏輯:
(1)如果kubelet開啟了features.ExpandInUsePersistentVolumes,處理一下map mountedVolumesForPod,用於后續處理標記存儲擴容邏輯;
(2)遍歷pod列表,調用dswp.processPodVolumes將pod中的volume添加到DesiredStateOfWorld(pvc已經處於bound狀態的volume才會添加到DesiredStateOfWorld);同時processPodVolumes方法里有標記某存儲是否需要擴容的邏輯,用於后續觸發存儲擴容操作。
// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Map unique pod name to outer volume name to MountedVolume.
mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]
if !exist {
mountedVolumes = make(map[string]cache.MountedVolume)
mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes
}
mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume
}
}
processedVolumesForFSResize := sets.NewString()
for _, pod := range dswp.podManager.GetPods() {
// pod如果terminated了,則跳過該pod的volume
if dswp.isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
// 將pod中的volume添加到 the desired state of the world
dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
}
}
func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool {
podStatus, found := dswp.podStatusProvider.GetPodStatus(pod.UID)
if !found {
podStatus = pod.Status
}
return util.IsPodTerminated(pod, podStatus)
}
pod屬於terminated的判斷看下列代碼:
// pkg/volume/util/util.go
// IsPodTerminated checks if pod is terminated
func IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
}
// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []v1.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}
dswp.processPodVolumes()主要邏輯:
(1)調用dswp.podPreviouslyProcessed判斷指定pod的volume是否已經被處理過了,處理過則直接返回;
(2)如果開啟了features.ExpandInUsePersistentVolumes,則調用dswp.checkVolumeFSResize來標記需要擴容的volume信息;
(3)循環遍歷pod.Spec.Volumes,做(4)(5)處理;
(4),調用dswp.createVolumeSpec,根據pvc與pv對象等信息,構造並返回volume.spec屬性(方法中會獲取pvc以及pv對象,並判斷pvc對象是否與pv對象bound,沒有bound則返回錯誤,另外,該方法還判斷pvc的volumeMode等屬性是否與pod內volume配置一致);
(5)調用dswp.desiredStateOfWorld.AddPodToVolume將pod的volume信息加入desiredStateOfWorld中;
(6)調用dswp.markPodProcessed標記該pod的volume信息已被處理。
// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
pod *v1.Pod,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
if pod == nil {
return
}
uniquePodName := util.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
allVolumesAdded := true
mounts, devices := util.GetPodVolumeNames(pod)
expandInUsePV := utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes)
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) {
// Volume is not used in the pod, ignore it.
klog.V(4).Infof("Skipping unused volume %q for pod %q", podVolume.Name, format.Pod(pod))
continue
}
pvc, volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mounts, devices)
if err != nil {
klog.Errorf(
"Error processing volume %q for pod %q: %v",
podVolume.Name,
format.Pod(pod),
err)
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
allVolumesAdded = false
continue
}
// for local volume
err = dswp.checkLocalVolumePV(pod, volumeSpec)
if err != nil {
klog.Errorf(
"Error processing volume %q for pod %q: %v",
podVolume.Name,
format.Pod(pod),
err)
allVolumesAdded = false
continue
}
// Add volume to desired state of world
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
if err != nil {
klog.Errorf(
"Failed to add volume %s (specName: %s) for pod %q to desiredStateOfWorld: %v",
podVolume.Name,
volumeSpec.Name(),
uniquePodName,
err)
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
allVolumesAdded = false
} else {
klog.V(4).Infof(
"Added volume %q (volSpec=%q) for pod %q to desired state.",
podVolume.Name,
volumeSpec.Name(),
uniquePodName)
}
if expandInUsePV {
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec,
uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
}
}
// some of the volume additions may have failed, should not mark this pod as fully processed
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
// New pod has been synced. Re-mount all volumes that need it
// (e.g. DownwardAPI)
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
// Remove any stored errors for the pod, everything went well in this processPodVolumes
dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
}
}
再來看一個關鍵方法checkVolumeFSResize():
與存儲擴容相關,當pv.Spec.Capacity大小大於pvc.Status.Capacity時,將該存儲標記為需要擴容
// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// checkVolumeFSResize checks whether a PVC mounted by the pod requires file
// system resize or not. If so, marks this volume as fsResizeRequired in ASW.
// - mountedVolumesForPod stores all mounted volumes in ASW, because online
// volume resize only considers mounted volumes.
// - processedVolumesForFSResize stores all volumes we have checked in current loop,
// because file system resize operation is a global operation for volume, so
// we only need to check it once if more than one pod use it.
func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
pod *v1.Pod,
podVolume v1.Volume,
pvc *v1.PersistentVolumeClaim,
volumeSpec *volume.Spec,
uniquePodName volumetypes.UniquePodName,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
if podVolume.PersistentVolumeClaim == nil {
// Only PVC supports resize operation.
return
}
uniqueVolumeName, exist := getUniqueVolumeName(uniquePodName, podVolume.Name, mountedVolumesForPod)
if !exist {
// Volume not exist in ASW, we assume it hasn't been mounted yet. If it needs resize,
// it will be handled as offline resize(if it indeed hasn't been mounted yet),
// or online resize in subsequent loop(after we confirm it has been mounted).
return
}
if processedVolumesForFSResize.Has(string(uniqueVolumeName)) {
// File system resize operation is a global operation for volume,
// so we only need to check it once if more than one pod use it.
return
}
// volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted.
// This is the same flag that determines filesystem resizing behaviour for offline resizing and hence
// we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.
if volumeSpec.ReadOnly {
// This volume is used as read only by this pod, we don't perform resize for read only volumes.
klog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+
"as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name)
return
}
if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) {
dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName)
}
processedVolumesForFSResize.Insert(string(uniqueVolumeName))
}
// pv.Spec.Capacity大小大於pvc.Status.Capacity時返回true
func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
capacity := pvc.Status.Capacity[v1.ResourceStorage]
requested := pv.Spec.Capacity[v1.ResourceStorage]
return requested.Cmp(capacity) > 0
}
1.1.2 dswp.findAndRemoveDeletedPods
主要邏輯:
(1)從dswp.desiredStateOfWorld中獲取已掛載volume的pod,從podManager獲取指定pod是否存在,不存在則再從containerRuntime中查詢pod中的容器是否都已terminated,如以上兩個條件都符合,則繼續往下執行,否則直接返回;
(2)調用dswp.desiredStateOfWorld.DeletePodFromVolume,將pod從dsw.volumesToMount[volumeName].podsToMount[podName]中去除,即將volume掛載到指定pod的信息從desiredStateOfWorld中去除。
// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
var runningPods []*kubecontainer.Pod
runningPodsFetched := false
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
if podExists {
// Skip running pods
if !dswp.isPodTerminated(pod) {
continue
}
if dswp.keepTerminatedPodVolumes {
continue
}
}
// Once a pod has been deleted from kubelet pod manager, do not delete
// it immediately from volume manager. Instead, check the kubelet
// containerRuntime to verify that all containers in the pod have been
// terminated.
if !runningPodsFetched {
var getPodsErr error
runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
if getPodsErr != nil {
klog.Errorf(
"kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
getPodsErr)
continue
}
runningPodsFetched = true
dswp.timeOfLastGetPodStatus = time.Now()
}
runningContainers := false
for _, runningPod := range runningPods {
if runningPod.ID == volumeToMount.Pod.UID {
if len(runningPod.Containers) > 0 {
runningContainers = true
}
break
}
}
if runningContainers {
klog.V(4).Infof(
"Pod %q still has one or more containers in the non-exited state. Therefore, it will not be removed from desired state.",
format.Pod(volumeToMount.Pod))
continue
}
exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
if !exists && podExists {
klog.V(4).Infof(
volumeToMount.GenerateMsgDetailed(fmt.Sprintf("Actual state has not yet has this volume mounted information and pod (%q) still exists in pod manager, skip removing volume from desired state",
format.Pod(volumeToMount.Pod)), ""))
continue
}
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Removing volume from desired state", ""))
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
}
podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors()
for _, podName := range podsWithError {
if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists {
dswp.desiredStateOfWorld.PopPodErrors(podName)
}
}
}
2 vm.reconciler.Run
主要是調用rc.reconcile(),做存儲的預期狀態和實際狀態的協調,負責調整實際狀態至預期狀態。
dswp.loopSleepDuration的值為100ms。
// pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.Infof("Reconciler: start to sync state")
rc.sync()
}
}
}
2.1 rc.reconcile
rc.reconcile()主要邏輯:
(1)對於實際已經掛載了的存儲,如果期望掛載信息中無該存儲,或期望掛載存儲的pod列表中沒有該pod,則指定pod的指定存儲需要unmount(最終調用csi.NodeUnpublishVolume);
(2)從desiredStateOfWorld中獲取需要mount到pod的volome信息列表:
a.當存儲未attach到node時:調用方法將存儲先attach到node上(此處會判斷是不是由kubelet來做attach操作,是則創建VolumeAttachment對象並等待該對象的.status.attached的值為 true,不是則等待AD controller來做attach操作,kubelet將會根據node對象的.Status.VolumesAttached屬性來判斷該存儲是否已attach到node上);
b.當存儲attach后,但未mount給pod或者需要remount時:調用方法進行volume mount(最終調用csi.NodeStageVolume與csi.NodePublishVolume);
c.當存儲需要擴容時,調用方法進行存儲擴容(最終調用csi.NodeExpandVolume);
(3)對比actualStateOfWorld,從desiredStateOfWorld中獲取需要detached的volomes(detached意思為把存儲從node上解除掛載):
a.當actualStateOfWorld中表明,某volume沒有被任何pod掛載,且desiredStateOfWorld中也不期望該volume被任何pod掛載,且attachedVolume.GloballyMounted屬性為true時(device與global mount path的掛載關系還在),會調用到UnmountDevice,主要是調用csi.NodeUnstageVolume解除node上global mount path的存儲掛載;
b.當actualStateOfWorld中表明,某volume沒有被任何pod掛載,且desiredStateOfWorld中也不存在該volume,且attachedVolume.GloballyMounted屬性為false時(已經調用過UnmountDevice,device與global mount path的掛載關系已解除),會調用到UnmountDevice,主要是從etcd中刪除VolumeAttachment對象,並等待刪除成功。
reconcile()涉及主要方法:
(1)rc.operationExecutor.UnmountVolume:當actualStateOfWorld中表明,pod已經掛載了某volume,但desiredStateOfWorld中期望掛載某volume的pod列表中不存在該pod時(即表明存儲已經掛載給pod,但該pod已經不存在了,需要解除該掛載),會調用到UnmountVolume,主要是調用csi.NodeUnpublishVolume將pod mount path解除掛載;
(2)rc.operationExecutor.AttachVolume:當actualStateOfWorld中已經掛載到node節點的volume信息中不存在某volume,但desiredStateOfWorld中期望某volume掛載到node節點上時(即表明需要掛載到node節點的存儲未掛載),會調用到AttachVolume,主要是創建VolumeAttachment對象,並等待其.status.attached屬性值更新為true;
(3)rc.operationExecutor.MountVolume:當desiredStateOfWorld中期望某volume掛載給某pod,但actualStateOfWorld中表明該volume並沒有掛載給該pod,且該volume已經掛載到了node節點上,(或者該pod的volume需要remount),會調用到MountVolume,主要是調用csi.NodeStageVolume將存儲掛載到node上的global mount path,調用csi.NodePublishVolume將存儲從global mount path掛載到pod mount path;
(4)rc.operationExecutor.ExpandInUseVolume:主要負責在controller端的存儲擴容操作完成后,做node端的存儲擴容操作(后續會單獨分析存儲擴容操作)。
(5)rc.operationExecutor.UnmountDevice:當actualStateOfWorld中表明,某volume沒有被任何pod掛載,且desiredStateOfWorld中也不期望該volume被任何pod掛載,且attachedVolume.GloballyMounted屬性為true時(device與global mount path的掛載關系還在),會調用到UnmountDevice,主要是調用csi.NodeUnstageVolume解除node上global mount path的存儲掛載;
(6)rc.operationExecutor.DetachVolume:當actualStateOfWorld中表明,某volume沒有被任何pod掛載,且desiredStateOfWorld中也不存在該volume,且attachedVolume.GloballyMounted屬性為false時(已經調用過UnmountDevice,device與global mount path的掛載關系已解除),會調用到UnmountDevice,主要是從etcd中刪除VolumeAttachment對象,並等待刪除成功。
pod掛載存儲的調用流程:AttachVolume(csi-attacher.Attach) --> MountVolume(csi-attacher.MountDevice --> csi-mounter.SetUp)
解除pod存儲掛載的調用流程:UnmountVolume(csi-mounter.TearDown) --> UnmountDevice(csi-attacher.UnmountDevice) --> DetachVolume(csi-attcher.Detach)
另外說明:controllerAttachDetachEnabled
該參數的值由kubelet啟動參數--enable-controller-attach-detach決定,該啟動參數設置為 true 表示啟用 Attach/Detach controller進行Attach/Detach 操作,同時禁用 kubelet 執行 Attach/Detach 操作(默認值為 true)。對於csi plugin來說,實際上Attach/Detach 操作只是創建/刪除VolumeAttachment對象。
// pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
// 對於實際已經掛載了的存儲,如果期望掛載信息中無該存儲,或期望掛載存儲的pod列表中沒有該pod,則指定pod的指定存儲需要unmount
// Ensure volumes that should be unmounted are unmounted.
// 從實際掛載信息結構體中獲取存儲掛載信息
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
// 判斷期望掛載信息結構體中是否存在指定存儲掛載到指定pod的信息,如果不存在,則調用rc.operationExecutor.UnmountVolume 從指定pod中unmount掉指定存儲
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
// 從desiredStateOfWorld中獲取需要mount到pod的volome信息列表
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
// 調用rc.actualStateOfWorld.PodExistsInVolume查詢指定volume是否已mount到指定pod
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
// 當存儲未attach到node時,調用方法將存儲先attach到node上
if cache.IsVolumeNotAttachedError(err) {
// 判斷是否是controller來進行Attach/Detach操作
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
// 如果是controller來進行Attach/Detach操作,則調用VerifyControllerAttachedVolume方法來判斷controller是否已經執行完Attach/Detach操作
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
}
}
// 當存儲attach后,但未mount給pod或者需要remount時,調用方法進行volume mount
} else if !volMounted || cache.IsRemountRequiredError(err) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(err)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
if remountingLogStr == "" {
klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
} else {
klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
}
}
// 當存儲需要擴容時,調用方法進行存儲擴容
} else if cache.IsFSResizeRequiredError(err) &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
err := rc.operationExecutor.ExpandInUseVolume(
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
}
if err == nil {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
}
}
}
// 對比actualStateOfWorld,從desiredStateOfWorld中獲取需要detached的volomes(detached意思為把存儲從node上解除掛載)
// Ensure devices that should be detached/unmounted are detached/unmounted.
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
// 當volume已經從node上解除掛載后,GloballyMounted的值被賦值為false
if attachedVolume.GloballyMounted {
// Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
}
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
}
}
}
}
}
}
2.1.1 rc.operationExecutor.VerifyControllerAttachedVolume
因為attach/detach操作由AD controller來完成,所以volume manager只能通過node對象來獲取指定volume是否已經attach,如已經attach,則更新actualStateOfWorld。
主要邏輯:從node對象中獲取.Status.VolumesAttached,從而判斷volume是否已經attach,然后更新actualStateOfWorld。
// pkg/volume/util/operationexecutor/operation_executor.go
func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount VolumeToMount,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
generatedOperations, err :=
oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
if err != nil {
return err
}
return oe.pendingOperations.Run(
volumeToMount.VolumeName, "" /* podName */, generatedOperations)
}
func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
volumeToMount VolumeToMount,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
}
verifyControllerAttachedVolumeFunc := func() (error, error) {
if !volumeToMount.PluginIsAttachable {
// If the volume does not implement the attacher interface, it is
// assumed to be attached and the actual state of the world is
// updated accordingly.
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
}
return nil, nil
}
if !volumeToMount.ReportedInUse {
// If the given volume has not yet been added to the list of
// VolumesInUse in the node's volume status, do not proceed, return
// error. Caller will log and retry. The node status is updated
// periodically by kubelet, so it may take as much as 10 seconds
// before this clears.
// Issue #28141 to enable on demand status updates.
return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
}
// Fetch current node object
node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(nodeName), metav1.GetOptions{})
if fetchErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
}
if node == nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError(
"VerifyControllerAttachedVolume failed",
fmt.Errorf("Node object retrieved from API server is nil"))
}
for _, attachedVolume := range node.Status.VolumesAttached {
if attachedVolume.Name == volumeToMount.VolumeName {
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath)))
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
}
return nil, nil
}
}
// Volume not attached, return error. Caller will log and retry.
return volumeToMount.GenerateError("Volume not attached according to node status", nil)
}
return volumetypes.GeneratedOperations{
OperationName: "verify_controller_attached_volume",
OperationFunc: verifyControllerAttachedVolumeFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
EventRecorderFunc: nil, // nil because we do not want to generate event on error
}, nil
}
2.2 rc.sync()
rc.sync()調用時機:在vm.desiredStateOfWorldPopulator.Run中已經將所有pod的volume信息更新到了desiredStateOfWorld中。
rc.sync()主要邏輯:掃描node上所有pod目錄下的volume目錄,來更新desiredStateOfWorld與actualStateOfWorld。
// pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.Infof("Reconciler: start to sync state")
rc.sync()
}
}
}
// pkg/kubelet/volumemanager/reconciler/reconciler.go
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates()
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates() {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil {
klog.Errorf("Cannot get volumes from disk %v", err)
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
continue
}
// No pod needs the volume.
klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
rc.cleanupMounts(volume)
continue
}
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
klog.Warning("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).Infof(
"Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
}
}
總結
volume manager作用
volume manager存在於kubelet中,主要是管理卷的attach/detach(與AD controller作用相同,通過kubelet啟動參數控制哪個組件來做該操作,后續會詳細介紹)、mount/umount等操作。
volume manager中pod掛載存儲的調用流程
AttachVolume(csi-attacher.Attach) --> MountVolume(csi-attacher.MountDevice --> csi-mounter.SetUp)
volume manager中解除pod存儲掛載的調用流程
UnmountVolume(csi-mounter.TearDown) --> UnmountDevice(csi-attacher.UnmountDevice) --> DetachVolume(csi-attcher.Detach)
volume manager的vm.reconciler.Run中各個方法的調用鏈
(1)AttachVolume
Volume is not attached to node, kubelet attach is enabled
vm.reconciler.Run --> rc.operationExecutor.AttachVolume --> oe.operationGenerator.GenerateAttachVolumeFunc --> csi-attacher.Attach(pkg/volume/csi/csi_attacher.go)--> create VolumeAttachment
(2)MountVolume
Volume is not mounted, or is already mounted, but requires remounting
vm.reconciler.Run --> rc.operationExecutor.MountVolume --> oe.operationGenerator.GenerateMountVolumeFunc --> 1.csi-attacer.WaitForAttach(等待VolumeAttachment的.status.attached屬性值更新為true) 2.csi-attacer.MountDevice(--
csi.NodeStageVolume) 3.csi-mounter.SetUp(-->csi.NodePublishVolume)
(3)ExpandInUseVolume
Volume is mounted, but it needs to resize
vm.reconciler.Run --> rc.operationExecutor.ExpandInUseVolume --> oe.operationGenerator.GenerateExpandInUseVolumeFunc --> og.doOnlineExpansion --> og.nodeExpandVolume --> expander.NodeExpand (pkg/volume/csi/expander.go) --> csi.NodeExpandVolume
(4)UnmountVolume
Volume is mounted, unmount it
vm.reconciler.Run --> rc.operationExecutor.UnmountVolume --> oe.operationGenerator.GenerateUnmountVolumeFunc --> csi-mounter.TearDown(pkg/volume/csi/csi_mounter.go)--> csi.NodeUnpublishVolume
(5)UnmountDevice
Volume is globally mounted to device, unmount it
vm.reconciler.Run --> rc.operationExecutor.UnmountDevice --> oe.operationGenerator.GenerateUnmountVolumeFunc --> csi-attacher.UnmountDevice --> csi.NodeUnstageVolume
(6)DetachVolume
Volume is attached to node, detach it
vm.reconciler.Run --> rc.operationExecutor.DetachVolume --> oe.operationGenerator.GenerateDetachVolumeFunc --> csi-attacher.Detach --> delete VolumeAttachment