基於 gpushare-device-plugin 的 k8s device plugin 分析


k8s device plugin 分析

示例為 gpushare-device-plugin

  • device plugin 一般以 daemonset 部署在 node 節點上;
  • GRPC server,信道是 Unix socket,主要實現 ListAndWatchAllocate rpc,收集 device 信息、准備 device 環境;
  • 被 kubelet 中 device manager 管理

device plugin 工作流程

device plugin 下文簡稱 DP,device manager 下文簡稱 DM

  1. 以 daemonset 啟動,立刻啟動 GRPC server,監聽獨特的 Unix Socket 地址;
  2. 執行 register,DP ---register----> DM (GRPC server) ----ListAndWatch----> DP ;
  3. DM send rpc ListAndWatch 獲得 device 信息,faked device ID + is_health;
  4. DM send rpc Allocate 得到啟動容器所需的信息(env,mount,device,annotation);

技術細節

DP 啟動的入口函數?

  1. 制作 docker
    Dockerfile 中編譯 go 代碼,得到 gpushare-device-plugin-v2,將其放置到 /usr/bin/,Entrypoint 為 gpushare-device-plugin-v2 -logtostderr

  2. daemonset 啟動該容器后,會執行 gpushare-device-plugin-v2 -logtostderr 啟動

  3. 主文件 cmd/nvidia/main.go

	// 入口函數在此
	ngm := nvidia.NewSharedGPUManager(*mps, *healthCheck, translatememoryUnits(*memoryUnit))
	err := ngm.Run()

DP 流程

shareGPUManager ---run()---> NvidiaDevicePlugin ----Serve()----> Start GRPC server and Register

DP 如何知道 DM 的 Unix socket 地址?

這個地址是常量,不可改變

// gpushare-device-plugin/pkg/gpu/nvidia/server.go
import (
	pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1")

err = m.Register(pluginapi.KubeletSocket, resourceName)

pluginapi.KubeletSocket 是常量 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go

package v1beta1

const (
	// Healthy means that the device is healthy
	Healthy = "Healthy"
	// UnHealthy means that the device is unhealthy
	Unhealthy = "Unhealthy"

	// Current version of the API supported by kubelet
	Version = "v1beta1"
	// DevicePluginPath is the folder the Device Plugin is expecting sockets to be on
	// Only privileged pods have access to this path
	// Note: Placeholder until we find a "standard path"
	DevicePluginPath = "/var/lib/kubelet/device-plugins/"
	// KubeletSocket is the path of the Kubelet registry socket
	KubeletSocket = DevicePluginPath + "kubelet.sock"
	// Timeout duration in secs for PreStartContainer RPC
	KubeletPreStartContainerRPCTimeoutInSecs = 30
)

var SupportedVersions = [...]string{"v1beta1"}

resourceName 是常量 gpushare-device-plugin/pkg/gpu/nvidia/const.go

const (
	resourceName  = "aliyun.com/gpu-mem")

Register 的細節

參數包括:DM 與 DP 通信的版本號DP 的 Unix socket 名字資源的類型名

  1. 通過 kubelet Unix socket 創建 grpc 連接
conn, err := dial(kubeletEndpoint, 5*time.Second)
client := pluginapi.NewRegistrationClient(conn)
  1. 構造參數:
// 其中三者都是常量:
// Endpoint = aliyungpushare.sock
// pluginapi.Version = v1beta1
// resourceName = aliyun.com/gpu-mem

reqt := &pluginapi.RegisterRequest{
	Version:      pluginapi.Version,
	Endpoint:     path.Base(m.socket),
	ResourceName: resourceName,
}
  1. 調用 DM 的 rpc register
_, err = client.Register(context.Background(), reqt)
  1. DM 的處理

檢查參數是否合法,創建 endpointImpl 來管理。

kubernetes/pkg/kubelet/cm/devicemanager/manager.go

go m.addEndpoint(r)

注意,register rpc 沒有返回值,但是第二個參數如果不是 nil 則就是有錯誤

// 正確
return &pluginapi.Empty{}, nil

// 出錯
return &pluginapi.Empty{}, fmt.Errorf(errorString)

詳細流程:

  • 執行 m.addEndpoint(r) 增加 endpoint
  • 創建 endpointImpl 主要是,建立與 DP 的 grpc 長連接,填入 callback 函數(genericDeviceUpdateCallback)
  • 在 ManagerImpl 的 m.endpoints (dict 數據結構) 中加入 endpointImpl
  • 執行 runEndpoint,調用一次 ListAndWatch
  • 進入一個死循環:通過 grpc stream 不斷的獲取返回的狀態,用 callback 函數來處理
  • callback 函數,將 DP 上報的所有 device 設備的 id + is_health 填入 m.healthyDevices[resourceName] 和 m.unhealthyDevices[resourceName]

綜上,當 DP 發來注冊消息后,DM 組織幾個結構:

  1. m.endpoints[resourceName]
  2. 調用 ListAndWatch 后,立刻組織 m.healthyDevices[resourceName]m.unhealthyDevices[resourceName]
  3. m.unhealthyDevices 應該有別的函數會定期處理,一旦有新值,會進入一系列的處理

DM 調用 DP ListAndWatch 的時機?

register 時候會調用一次,而且僅僅就這一次,因為 ListAndWatch 是個 GRPC 長連接,DP 可以通過這個長連接不停的反饋。

ListAndWatch 的參數

參見說明 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto

rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}

message ListAndWatchResponse {
	repeated Device devices = 1;
}

message Device {
	// A unique ID assigned by the device plugin used
	// to identify devices during the communication
	// Max length of this field is 63 characters
	string ID = 1;
	// Health of the device, can be healthy or unhealthy, see constants.go
	string health = 2;
}

由 proto 文件中所示,DM 查詢無需參數,回復的內容是 repeated deviceID and is_health

注意 gpushare-device-plugin 返回的是 faked device ID,比如這個機器上插了 8 GPU 16GB 內存。

我想大家和我一樣,認為是 8 個 GPU 設備,其實這個理解是錯誤的,其實 gpushare-device-plugin 會上報 8*16 個設備,每個設備會有假的 deviceID

Allocate 詳解 in kubelet -> DM

這是一個很龐雜的問題,簡單來說,kubelet 初始化時,有一個步驟是初始化 DM,DM 本身提供了 Allocate 函數供 kubelet 調用。而 DM 中,又會調用 DP 的 rpc Allocate 方法。

kubelet 什么時候調用 DM 的 Allocate

我們根據 kubernetes/pkg/kubelet/cm/devicemanager/types.go 中可知,kubelet 肯定通過某種方式調用了下面的函數;

type Manager interface {
	// Allocate configures and assigns devices to pods. The pods are provided
	// through the pod admission attributes in the attrs argument. From the
	// requested device resources, Allocate will communicate with the owning
	// device plugin to allow setup procedures to take place, and for the
	// device plugin to provide runtime settings to use the device (environment
	// variables, mount points and device files). The node object is provided
	// for the device manager to update the node capacity to reflect the
	// currently available devices.
	Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error

	...
}

通過一系列分析,確實如同我們的猜測,只不過略顯復雜:

  1. kubelet 啟動后,klet.admitHandlers 調用 AddPodAdmitHandler 將一系列 PodAdmitHandler 加入 kubelet 的核心結構 klet.adminHandler
  2. 其中有一個叫 lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources) 也被加入;
// 注意,PredicateAdmitHandler 實現了 Admit 方法,因此,PredicateAdmitHandler 的實例的指針就可以被 AddPodAdmitHandler 直接加入
type PodAdmitHandler interface {
// Admit evaluates if a pod can be admitted.
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
  1. 在 kubelet syncLoop 中的 syncLoopIteration 中,當出現新的 pod 申請時,會調用 handler.HandlePodUpdates(u.Pods)
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			// 看過源碼,這里的 handler 就是 kl
			handler.HandlePodAdditions(u.Pods)
  1. HandlePodAdditions 中調用 kl.canAdmitPod(activePods, pod)

  2. canAdmitPod 中:

// 會調用 podAdmitHandler.Admit,因為 podAdmitHandler 是 interface,因此根據實際的類,調用不同的方法
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
	if result := podAdmitHandler.Admit(attrs); !result.Admit {
		return false, result.Reason, result.Message
	}
}
  1. 對於 PredicateAdmitHandler.Admit,會調用 err = w.pluginResourceUpdateFunc(nodeInfo, attrs), pluginResourceUpdateFunc 是之前注冊的 klet.containerManager.UpdatePluginResources

  2. klet.containerManager.UpdatePluginResources 最終調用:

// 終於調用到了 DM 的 Allocate 方法
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
	return cm.deviceManager.Allocate(node, attrs)
}

上文的行文邏輯是從 pod 創建后,是如果最終調用到 Allocate 函數的流程,但是探索的過程往往是相反的,是從 Allocate 開始,看誰調用的它。

這里最復雜的地方在於,UpdatePluginResources 是誰調用的,通過全局搜索,發現只有其被注冊的地方,沒有其被使用的地方。我們再來整理下思路:

  • UpdatePluginResources 被注冊到了 PredicateAdmitHandlerPredicateAdmitHandler 又被加入到 klet.admitHandlers
  • 當有 pod 創建的事件發生時,會遍歷 klet.admitHandlers 逐一調用 podAdmitHandler.Admit 方法
  • podAdmitHandler.Admit 中(實際上是 PredicateAdmitHandler.Admin)會調用 pluginResourceUpdateFunc 方法,這個方法正好是注冊的 UpdatePluginResources

關於 kubelet 啟動后有哪幾個大 loop,又有哪些小 loop,我們可以以后再分析,至少,創建 pod 時確實會調用 Allocate

DM 的 Allocate 詳解

Allocate 代碼 in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
	pod := attrs.Pod
	err := m.allocatePodResources(pod)
	if err != nil {
		klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
		return err
	}

	m.mutex.Lock()
	defer m.mutex.Unlock()

	// quick return if no pluginResources requested
	if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
		return nil
	}

	m.sanitizeNodeAllocatable(node)
	return nil
}

看完了 Allocate 函數和參數,感覺有三座大山:

  • err := m.allocatePodResources(pod)
    • m.allocDevices 填入預申請的 device
    • 請求 DP 的 allocate(rpc)方法將返回的信息填入 m.podDevices
  • m.podDevices
    • 保存 pod 使用 device 的記錄結構,在 DM 中被管理
  • m.sanitizeNodeAllocatable(node)
    • 保存 newAllocatableResource 到 node 結構體中

總結,Allocate 並不返回什么信息,僅僅是做好預分配,將 DM 結構體中的數據結構進行變更。注意,這其中會調用 DP 的 Allocate 方法,但是返回的內容也僅僅先填寫到 m.podDevice,再啰嗦一句,該函數就是預分配資源。具體怎么用,什么時候用,還需要更進一步的分析。

下面是對 Allocate 詳細分析:

m.allocatePodResources(pod) in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
	// 注意,devicesToReuse 一開始為空,什么內容都沒有
	devicesToReuse := make(map[string]sets.String)
	// pod.Spec.InitContainers 參見:https://github.com/kubernetes/api/blob/master/core/v1/types.go
	// InitContainers 是一個 pod 的最初的 container,鎖定網絡
	for _, container := range pod.Spec.InitContainers {
		// 將 `m.allocDevices` 填入預申請的 device
		// 請求 DP 的 allocate(rpc)方法將返回的信息填入 `m.podDevices`
		// 本質上說,到這一步,只改變了 DM 的兩個數據結構的值而已,什么都沒有變化
		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
			return err
		}
		// in kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go
		// 將 `m.podDevices` 中的 pod + container 的 resource 內容賦值到 `devicesToReuse`,注意,這個操作在 for 循環中,因此 devicesToReuse 會產生影響
		m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
	}
	for _, container := range pod.Spec.Containers {
		// 同上,是針對真正的 pod 中的 container
		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
			return err
		}
		m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
	}
	return nil
}

allocateContainerResources in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
	// 分配的資源在 allocDevices,且已經寫入了 m.allocatedDevices
	allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
	// 取出 resouce(resourceName 為 aliyun.com/gpu-mem)的 GRPC server 地址
	eI, ok := m.endpoints[resource]
	// 調用 DP 的 allocate
	resp, err := eI.e.allocate(devs)
	// 將結果寫入 m.podDevices
	m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])

devicesToAllocate,返回是需要被分配的 device。

  • m.podDevices.containerDevices 是每一個 container 使用的資源,這個和下面三個 device 互不影響
  • m.healthyDevices 是 DM 保存的健康的 device
  • m.unhealthyDevices 是 DM 保存的不健康的 device
  • m.allocatedDevices 是 DM 保存的已經分配了的資源
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
    // containerDevices in `kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go`
	// 該函數把需要分配的 device list 返回出去,
	// 首先嘗試使用 reusableDevices,如果足夠了,就返回,否則還會額外的返回全新的 device
	// 如果沒有必要返回 device 列表,則返回 nil
	devices := m.podDevices.containerDevices(podUID, contName, resource)
	// reusableDevices
	// Allocates from reusableDevices list first.
	for device := range reusableDevices {
		devices.Insert(device)
		needed--
		if needed == 0 {
			return devices, nil
		}
	}

	for _, device := range allocated {
		m.allocatedDevices[resource].Insert(device)
		//
		devices.Insert(device)
	}
	return devices, nil

sanitizeNodeAllocatable in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulernodeinfo.NodeInfo) {
	var newAllocatableResource *schedulernodeinfo.Resource
	allocatableResource := node.AllocatableResource()
	if allocatableResource.ScalarResources == nil {
		allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
	}
	for resource, devices := range m.allocatedDevices {
		needed := devices.Len()
		quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
		if ok && int(quant) >= needed {
			continue
		}
		// Needs to update nodeInfo.AllocatableResource to make sure
		// NodeInfo.allocatableResource at least equal to the capacity already allocated.
		if newAllocatableResource == nil {
			newAllocatableResource = allocatableResource.Clone()
		}
		newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
	}
	// 將 allocatedDevices 寫入 node 結構中,即:n.allocatableResource = allocatableResource
	if newAllocatableResource != nil {
		node.SetAllocatableResource(newAllocatableResource)
	}
}

Allocate 詳解 in DM -> DP

參數部分相對容易,只要查看 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto 即可

rpc Allocate(AllocateRequest) returns (AllocateResponse) {}

// 傳入的結構體 AllocateRequest,簡言之就是一堆 deviceID
message AllocateRequest {
	repeated ContainerAllocateRequest container_requests = 1;
}

message ContainerAllocateRequest {
	repeated string devicesIDs = 1;
}

// 返回的結構體 AllocateResponse,簡言之,是傳入容器的環境變量、文件夾映射、設備映射,和 annotations,其中 mount 和 device 均是 container_path、host_path、permission。

message AllocateResponse {
	repeated ContainerAllocateResponse container_responses = 1;
}

message ContainerAllocateResponse {
  	// List of environment variable to be set in the container to access one of more devices.
	map<string, string> envs = 1;
	// Mounts for the container.
	repeated Mount mounts = 2;
	// Devices for the container.
	repeated DeviceSpec devices = 3;
	// Container annotations to pass to the container runtime
	map<string, string> annotations = 4;
}

message Mount {
	// Path of the mount within the container.
	string container_path = 1;
	// Path of the mount on the host.
	string host_path = 2;
	// If set, the mount is read-only.
	bool read_only = 3;
}

// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
    // Path of the device within the container.
    string container_path = 1;
    // Path of the device on the host.
    string host_path = 2;
    // Cgroups permissions of the device, candidates are one or more of
    // * r - allows container to read from the specified device.
    // * w - allows container to write to the specified device.
    // * m - allows container to create device files that do not yet exist.
    string permissions = 3;
}

有一個核心的問題,DM 通過 DP 的 ListAndWatch 接口查到的是幾個 device?如果這個機器上插了 8 GPU 16GB 內存。

我想大家和我一樣,認為是 8 個 GPU 設備,其實這個理解是錯誤的,其實 gpushare-device-plugin 會上報 8*16 個設備,每個設備會有假的 deviceID:

// gpushare-device-plugin/pkg/gpu/nvidia/nvidia.go func getDevices()
fakeID := generateFakeDeviceID(d.UUID, j)
devs = append(devs, &pluginapi.Device{
				ID:     fakeID,
				Health: pluginapi.Healthy,
}

明確這個問題后,可以再看看 DP 以 gpushare-device-plugin 為例是怎么處理的:

// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
	reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
	// 猜測:`Allocate` 的流程中,分配卡就變得容易,我們可以想象,一定是把多個 faked device ID 轉化為一張真正的卡,如果出現了 faked device IDs 不屬於同一張卡,那么一定是調度出現了問題!
	// 實際上,DP 從來不做調度管理,是根據 scheduler 增加 annotation 來知道到底 faked device ID 轉化為哪一張真正的卡
	responses := pluginapi.AllocateResponse{}

	log.Infoln("----Allocating GPU for gpu mem is started----")
	var (
		podReqGPU uint
		found     bool
		assumePod *v1.Pod
	)

	// podReqGPU = uint(0)
	// 注意,Allocate 是 Pod 的請求,不是 Pod 下面的 container 的獨立請求,因此,podReqGPU 表示,該 pod 一共需要多少 faked 的卡
	for _, req := range reqs.ContainerRequests {
		podReqGPU += uint(len(req.DevicesIDs))
	}
	// 請求幾個 faked GPU 卡
	log.Infof("RequestPodGPUs: %d", podReqGPU)

	m.Lock()
	defer m.Unlock()
	log.Infoln("checking...")
	// 需要仔細分析
	// 通過查詢 k8s api,獲得所有的處於 pending 狀態的,並且需要被該 DP 所在 node 處理的 pod
	// 確實是 getCandidatePods
	pods, err := getCandidatePods()
	if err != nil {
		log.Infof("invalid allocation requst: Failed to find candidate pods due to %v", err)
		return buildErrResponse(reqs, podReqGPU), nil
	}

	if log.V(4) {
		for _, pod := range pods {
			log.Infof("Pod %s in ns %s request GPU Memory %d with timestamp %v",
				pod.Name,
				pod.Namespace,
				getGPUMemoryFromPodResource(pod),
				getAssumeTimeFromPodAnnotation(pod))
		}
	}

	for _, pod := range pods {
		// 看下每一個 pod 需要的內存和本次請求的作比較,如果相同,則假設就是這個 pod 發來的請求(不夠嚴謹?)
		// 我想,從 DM <-> DP 直接通信的參數本意上來說,DP 不需要知道我是為誰(哪個 pod)服務,但是為什么 gpushare-device-plugin 需要知道呢?
		if getGPUMemoryFromPodResource(pod) == podReqGPU {
			log.Infof("Found Assumed GPU shared Pod %s in ns %s with GPU Memory %d",
				pod.Name,
				pod.Namespace,
				podReqGPU)
			assumePod = pod
			found = true
			break
		}
	}

	if found {
		// 查找 Pod 中 annotation 的 `ALIYUN_COM_GPU_MEM_IDX` 內容是什么,比如 0 代表用第 0 塊卡
		// 這個我猜測是在調度的時候被打上的
		// 否則,分配失敗會報錯
		id := getGPUIDFromPodAnnotation(assumePod)
		if id < 0 {
			log.Warningf("Failed to get the dev ", assumePod)
		}

		candidateDevID := ""
		if id >= 0 {
			ok := false
			// 得到設備名稱
			candidateDevID, ok = m.GetDeviceNameByIndex(uint(id))
			if !ok {
				log.Warningf("Failed to find the dev for pod %v because it's not able to find dev with index %d",
					assumePod,
					id)
				id = -1
			}
		}

		if id < 0 {
			return buildErrResponse(reqs, podReqGPU), nil
		}

		// 1. Create container requests
		// 構造返回的內容
		for _, req := range reqs.ContainerRequests {
			// reqGPU 是每一個 container 需要的卡,和 podReqGPU 要區分開
			reqGPU := uint(len(req.DevicesIDs))
			// 傳入的環境變量為:
			/*
			NVIDIA_VISIBLE_DEVICES: 也許是GPU0
			ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
			ALIYUN_COM_GPU_MEM_POD: 10 (POD 總 GPU memory)
			ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一個 container 的 GPU memory)
			ALIYUN_COM_GPU_MEM_DEV: 16 (這個卡總共能提供多少資源)
			*/
			response := pluginapi.ContainerAllocateResponse{
				Envs: map[string]string{
					envNVGPU:               candidateDevID,
					EnvResourceIndex:       fmt.Sprintf("%d", id),
					EnvResourceByPod:       fmt.Sprintf("%d", podReqGPU),
					EnvResourceByContainer: fmt.Sprintf("%d", reqGPU),
					EnvResourceByDev:       fmt.Sprintf("%d", getGPUMemory()),
				},
			}
			responses.ContainerResponses = append(responses.ContainerResponses, &response)
		}

		// 2. Update Pod spec
		// 直接改 pod 中的 annotations,生成新的數據結構
		// ALIYUN_COM_GPU_MEM_ASSIGNED = true
		// ALIYUN_COM_GPU_MEM_ASSUME_TIME = 時間戳
		// 感覺很暴力!
		newPod := updatePodAnnotations(assumePod)
		// 使之生效
		_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
		if err != nil {
			// 如果設置失敗,有重試機制
			// the object has been modified; please apply your changes to the latest version and try again
			if err.Error() == OptimisticLockErrorMsg {
				// retry
				pod, err := clientset.CoreV1().Pods(assumePod.Namespace).Get(assumePod.Name, metav1.GetOptions{})
				if err != nil {
					log.Warningf("Failed due to %v", err)
					return buildErrResponse(reqs, podReqGPU), nil
				}
				newPod = updatePodAnnotations(pod)
				_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
				if err != nil {
					log.Warningf("Failed due to %v", err)
					return buildErrResponse(reqs, podReqGPU), nil
				}
			} else {
				log.Warningf("Failed due to %v", err)
				return buildErrResponse(reqs, podReqGPU), nil
			}
		}

	} else {
		log.Warningf("invalid allocation requst: request GPU memory %d can't be satisfied.",
			podReqGPU)
		// return &responses, fmt.Errorf("invalid allocation requst: request GPU memory %d can't be satisfied", reqGPU)
		return buildErrResponse(reqs, podReqGPU), nil
	}

	log.Infof("new allocated GPUs info %v", &responses)
	log.Infoln("----Allocating GPU for gpu mem is ended----")
	// // Add this to make sure the container is created at least
	// currentTime := time.Now()

	// currentTime.Sub(lastAllocateTime)

	// 將 1. Create container requests 中的內容返回了
	return &responses, nil
}

我們需要總結一下這個流程:

  1. 猜測本次 Allocate 請求是哪個 Pod 提出的
  2. 通過 API 調用,獲得 Pod 的 Spec 配置,將多個 faked device ID 轉化為一張真正的 GPU 卡
  3. 組織返回的信息,包括多個(Pod 中有幾個 container 就有幾套):
NVIDIA_VISIBLE_DEVICES: 也許是GPU0
ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
ALIYUN_COM_GPU_MEM_POD: 10 (POD 總 GPU memory)
ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一個 container 的 GPU memory)
ALIYUN_COM_GPU_MEM_DEV: 16 (這個卡總共能提供多少資源)
  1. 直接修改 Pod 中的 annotations
ALIYUN_COM_GPU_MEM_ASSIGNED = true
ALIYUN_COM_GPU_MEM_ASSUME_TIME = 時間戳

至於 ALIYUN_COM_GPU_MEM_ASSIGNED 等有什么用,我們會另起文章分析。

Device 異常,DP 和 DM 如何反應?

首先,一定是 DP 先發現異常的。以 gpushare-device-plugin 為例,在啟動的時候,會增加監控 gpushare-device-plugin/pkg/gpu/nvidia/server.go

func (m *NvidiaDevicePlugin) Start() error {
	...
	go m.healthcheck()
	...
	}
func (m *NvidiaDevicePlugin) healthcheck() {
	ctx, cancel := context.WithCancel(context.Background())

	var xids chan *pluginapi.Device
	if m.healthCheck {
		xids = make(chan *pluginapi.Device)
		// 監控所有的 dev
		go watchXIDs(ctx, m.devs, xids)
	}

	for {
		select {
		case <-m.stop:
			cancel()
			return
		case dev := <-xids:
			// 如果異常,則調用 unhealthy 方法,該方法向信號參數 m.health 寫 dev 的內容
			m.unhealthy(dev)
		}
	}
}
func watchXIDs(ctx context.Context, devs []*pluginapi.Device, xids chan<- *pluginapi.Device) {
	通過 nvidia 庫 nvml RegisterEventForDevice,並監聽是否有異常出現,有異常則往管道里寫錯誤,一個用不停止的大循環

其次,如果 m.unhealthy 出現了異常設備,如何上報?或者是 DM 定期輪詢 ListAndWatch?

通過 ListAndWatch 代碼 gpushare-device-plugin/pkg/gpu/nvidia/server.go 可知,實際上都不是!

func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
	s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})

	// 之前未曾注意到這個循環,當 DM 在 DP register 后,會立刻調用該函數,當正常返回后,該函數進入一個死循環
	// 當 m.health 有內容了,立刻向 DM 發送出錯的信息,把所有卡都置為失敗,這個確實值得商榷
	for {
		select {
		case <-m.stop:
			return nil
		case d := <-m.health:
			// FIXME: there is no way to recover from the Unhealthy state.
			d.Health = pluginapi.Unhealthy
			s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
		}
	}
}

最后,DM 真的有一個進程一直在等待 ListAndWatchResponse 么?

簡言之是的,因為 DM 調用 ListAndResponse 后,會從 stream 流中不斷的獲得返回的內容,沒有新的,則會卡在 stream.Recv() 中,當有異常的時候,調用 callback 函數后,會 m.unhealthyDevices[resourceName].Insert(dev.ID)

什么時候才會真正獲取 Allocate 后的環境變量等內容

我們知道 kubelet 調用 DM 的 Allocate 僅僅是將資源准備好(設置幾個 dict 而已)(資源是在 DM 中管理的,DP 不記錄資源的使用),但是什么時候 kubelet 才真正讓這些資源被用起來呢?

分析的過程:

  1. 我就知道 func (m *ManagerImpl) GetDeviceRunContainerOptions 這個函數是用來返回 options 的,其在 kubernetes/pkg/kubelet/cm/devicemanager/manager.go
  2. func (cm *containerManagerImpl) GetResources in kubernetes/pkg/kubelet/cm/container_manager_linux.go 調用了,該函數返回所有的 options
  3. func (kl *Kubelet) GenerateRunContainerOptions in kubernetes/pkg/kubelet/kubelet_pods.go 該函數調用了
  4. func (m *kubeGenericRuntimeManager) generateContainerConfig in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
  5. func (m *kubeGenericRuntimeManager) startContainer in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
  6. func (m *kubeGenericRuntimeManager) SyncPod in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
  7. func (kl *Kubelet) syncPod in kubernetes/pkg/kubelet/kubelet.go
  8. func (m *manager) Start() in kubernetes/pkg/kubelet/status/status_manager.go 當有 m.podStatusChannel 事件時,SyncPod 被觸發
  9. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { in kubernetes/pkg/kubelet/kubelet.go

Allocate 應該在先,然后才是 GetDeviceRunContainerOptions,分析的過程中並沒有明確的先后順序,作為下次分析的主題

擴展

nvidia-docker 能夠使用 GPU 的原理

准備作為一個主題,下次分享 參考

k8s 啟動帶 GPU 的容器的原理

准備作為一個主題,下次分享 參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM