k8s device plugin 分析
- device plugin 一般以 daemonset 部署在 node 節點上;
- GRPC server,信道是 Unix socket,主要實現
ListAndWatch
、Allocate
rpc,收集 device 信息、准備 device 環境; - 被 kubelet 中 device manager 管理
device plugin 工作流程
device plugin 下文簡稱 DP,device manager 下文簡稱 DM
- 以 daemonset 啟動,立刻啟動 GRPC server,監聽獨特的 Unix Socket 地址;
- 執行 register,DP ---register----> DM (GRPC server) ----ListAndWatch----> DP ;
- DM send rpc
ListAndWatch
獲得 device 信息,faked device ID + is_health; - DM send rpc
Allocate
得到啟動容器所需的信息(env,mount,device,annotation);
技術細節
DP 啟動的入口函數?
-
制作 docker
Dockerfile 中編譯 go 代碼,得到gpushare-device-plugin-v2
,將其放置到/usr/bin/
,Entrypoint 為gpushare-device-plugin-v2 -logtostderr
-
daemonset 啟動該容器后,會執行
gpushare-device-plugin-v2 -logtostderr
啟動 -
主文件 cmd/nvidia/main.go
// 入口函數在此
ngm := nvidia.NewSharedGPUManager(*mps, *healthCheck, translatememoryUnits(*memoryUnit))
err := ngm.Run()
DP 流程
shareGPUManager ---run()---> NvidiaDevicePlugin ----Serve()----> Start GRPC server and Register
DP 如何知道 DM 的 Unix socket 地址?
這個地址是常量,不可改變
// gpushare-device-plugin/pkg/gpu/nvidia/server.go
import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1")
err = m.Register(pluginapi.KubeletSocket, resourceName)
pluginapi.KubeletSocket 是常量 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go
package v1beta1
const (
// Healthy means that the device is healthy
Healthy = "Healthy"
// UnHealthy means that the device is unhealthy
Unhealthy = "Unhealthy"
// Current version of the API supported by kubelet
Version = "v1beta1"
// DevicePluginPath is the folder the Device Plugin is expecting sockets to be on
// Only privileged pods have access to this path
// Note: Placeholder until we find a "standard path"
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock"
// Timeout duration in secs for PreStartContainer RPC
KubeletPreStartContainerRPCTimeoutInSecs = 30
)
var SupportedVersions = [...]string{"v1beta1"}
resourceName 是常量 gpushare-device-plugin/pkg/gpu/nvidia/const.go
const (
resourceName = "aliyun.com/gpu-mem")
Register 的細節
參數包括:DM 與 DP 通信的版本號
、DP 的 Unix socket 名字
、資源的類型名
- 通過 kubelet Unix socket 創建 grpc 連接
conn, err := dial(kubeletEndpoint, 5*time.Second)
client := pluginapi.NewRegistrationClient(conn)
- 構造參數:
// 其中三者都是常量:
// Endpoint = aliyungpushare.sock
// pluginapi.Version = v1beta1
// resourceName = aliyun.com/gpu-mem
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
- 調用 DM 的 rpc register
_, err = client.Register(context.Background(), reqt)
- DM 的處理
檢查參數是否合法,創建 endpointImpl 來管理。
在 kubernetes/pkg/kubelet/cm/devicemanager/manager.go
中
go m.addEndpoint(r)
注意,register rpc 沒有返回值,但是第二個參數如果不是 nil 則就是有錯誤
// 正確
return &pluginapi.Empty{}, nil
// 出錯
return &pluginapi.Empty{}, fmt.Errorf(errorString)
詳細流程:
- 執行 m.addEndpoint(r) 增加 endpoint
- 創建 endpointImpl 主要是,建立與 DP 的 grpc 長連接,填入 callback 函數(genericDeviceUpdateCallback)
- 在 ManagerImpl 的 m.endpoints (dict 數據結構) 中加入 endpointImpl
- 執行
runEndpoint
,調用一次ListAndWatch
- 進入一個死循環:通過 grpc stream 不斷的獲取返回的狀態,用 callback 函數來處理
- callback 函數,將 DP 上報的所有 device 設備的 id + is_health 填入 m.healthyDevices[resourceName] 和 m.unhealthyDevices[resourceName]
綜上,當 DP 發來注冊消息后,DM 組織幾個結構:
m.endpoints[resourceName]
- 調用
ListAndWatch
后,立刻組織m.healthyDevices[resourceName]
、m.unhealthyDevices[resourceName]
- m.unhealthyDevices 應該有別的函數會定期處理,一旦有新值,會進入一系列的處理
DM 調用 DP ListAndWatch 的時機?
register 時候會調用一次,而且僅僅就這一次,因為 ListAndWatch 是個 GRPC 長連接,DP 可以通過這個長連接不停的反饋。
ListAndWatch 的參數
參見說明 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
message ListAndWatchResponse {
repeated Device devices = 1;
}
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
}
由 proto 文件中所示,DM 查詢無需參數,回復的內容是 repeated deviceID and is_health
注意 gpushare-device-plugin 返回的是 faked device ID,比如這個機器上插了 8 GPU 16GB 內存。
我想大家和我一樣,認為是 8 個 GPU 設備,其實這個理解是錯誤的,其實 gpushare-device-plugin 會上報 8*16 個設備,每個設備會有假的 deviceID
Allocate 詳解 in kubelet -> DM
這是一個很龐雜的問題,簡單來說,kubelet
初始化時,有一個步驟是初始化 DM,DM 本身提供了 Allocate
函數供 kubelet
調用。而 DM 中,又會調用 DP 的 rpc Allocate
方法。
kubelet 什么時候調用 DM 的 Allocate
我們根據 kubernetes/pkg/kubelet/cm/devicemanager/types.go
中可知,kubelet 肯定通過某種方式調用了下面的函數;
type Manager interface {
// Allocate configures and assigns devices to pods. The pods are provided
// through the pod admission attributes in the attrs argument. From the
// requested device resources, Allocate will communicate with the owning
// device plugin to allow setup procedures to take place, and for the
// device plugin to provide runtime settings to use the device (environment
// variables, mount points and device files). The node object is provided
// for the device manager to update the node capacity to reflect the
// currently available devices.
Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
...
}
通過一系列分析,確實如同我們的猜測,只不過略顯復雜:
- kubelet 啟動后,
klet.admitHandlers
調用AddPodAdmitHandler
將一系列PodAdmitHandler
加入 kubelet 的核心結構klet.adminHandler
中 - 其中有一個叫
lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)
也被加入;
// 注意,PredicateAdmitHandler 實現了 Admit 方法,因此,PredicateAdmitHandler 的實例的指針就可以被 AddPodAdmitHandler 直接加入
type PodAdmitHandler interface {
// Admit evaluates if a pod can be admitted.
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
- 在 kubelet
syncLoop
中的syncLoopIteration
中,當出現新的 pod 申請時,會調用handler.HandlePodUpdates(u.Pods)
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
// 看過源碼,這里的 handler 就是 kl
handler.HandlePodAdditions(u.Pods)
-
HandlePodAdditions
中調用kl.canAdmitPod(activePods, pod)
-
canAdmitPod
中:
// 會調用 podAdmitHandler.Admit,因為 podAdmitHandler 是 interface,因此根據實際的類,調用不同的方法
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
-
對於
PredicateAdmitHandler.Admit
,會調用err = w.pluginResourceUpdateFunc(nodeInfo, attrs)
,pluginResourceUpdateFunc
是之前注冊的klet.containerManager.UpdatePluginResources
-
klet.containerManager.UpdatePluginResources
最終調用:
// 終於調用到了 DM 的 Allocate 方法
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.Allocate(node, attrs)
}
上文的行文邏輯是從 pod 創建后,是如果最終調用到 Allocate
函數的流程,但是探索的過程往往是相反的,是從 Allocate
開始,看誰調用的它。
這里最復雜的地方在於,UpdatePluginResources
是誰調用的,通過全局搜索,發現只有其被注冊的地方,沒有其被使用的地方。我們再來整理下思路:
UpdatePluginResources
被注冊到了PredicateAdmitHandler
,PredicateAdmitHandler
又被加入到klet.admitHandlers
- 當有 pod 創建的事件發生時,會遍歷
klet.admitHandlers
逐一調用podAdmitHandler.Admit
方法 - podAdmitHandler.Admit 中(實際上是 PredicateAdmitHandler.Admin)會調用
pluginResourceUpdateFunc
方法,這個方法正好是注冊的UpdatePluginResources
關於 kubelet 啟動后有哪幾個大 loop,又有哪些小 loop,我們可以以后再分析,至少,創建 pod 時確實會調用 Allocate
。
DM 的 Allocate 詳解
Allocate 代碼 in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
err := m.allocatePodResources(pod)
if err != nil {
klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
return err
}
m.mutex.Lock()
defer m.mutex.Unlock()
// quick return if no pluginResources requested
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
return nil
}
m.sanitizeNodeAllocatable(node)
return nil
}
看完了 Allocate
函數和參數,感覺有三座大山:
err := m.allocatePodResources(pod)
m.allocDevices
填入預申請的 device- 請求 DP 的 allocate(rpc)方法將返回的信息填入
m.podDevices
m.podDevices
- 保存 pod 使用 device 的記錄結構,在 DM 中被管理
m.sanitizeNodeAllocatable(node)
- 保存
newAllocatableResource
到 node 結構體中
- 保存
總結,Allocate 並不返回什么信息,僅僅是做好預分配,將 DM 結構體中的數據結構進行變更。注意,這其中會調用 DP 的 Allocate 方法,但是返回的內容也僅僅先填寫到 m.podDevice
,再啰嗦一句,該函數就是預分配資源。具體怎么用,什么時候用,還需要更進一步的分析。
下面是對 Allocate
詳細分析:
m.allocatePodResources(pod) in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
// 注意,devicesToReuse 一開始為空,什么內容都沒有
devicesToReuse := make(map[string]sets.String)
// pod.Spec.InitContainers 參見:https://github.com/kubernetes/api/blob/master/core/v1/types.go
// InitContainers 是一個 pod 的最初的 container,鎖定網絡
for _, container := range pod.Spec.InitContainers {
// 將 `m.allocDevices` 填入預申請的 device
// 請求 DP 的 allocate(rpc)方法將返回的信息填入 `m.podDevices`
// 本質上說,到這一步,只改變了 DM 的兩個數據結構的值而已,什么都沒有變化
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
// in kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go
// 將 `m.podDevices` 中的 pod + container 的 resource 內容賦值到 `devicesToReuse`,注意,這個操作在 for 循環中,因此 devicesToReuse 會產生影響
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
for _, container := range pod.Spec.Containers {
// 同上,是針對真正的 pod 中的 container
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
}
return nil
}
allocateContainerResources in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
// 分配的資源在 allocDevices,且已經寫入了 m.allocatedDevices
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
// 取出 resouce(resourceName 為 aliyun.com/gpu-mem)的 GRPC server 地址
eI, ok := m.endpoints[resource]
// 調用 DP 的 allocate
resp, err := eI.e.allocate(devs)
// 將結果寫入 m.podDevices
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
devicesToAllocate,返回是需要被分配的 device。
- m.podDevices.containerDevices 是每一個 container 使用的資源,這個和下面三個 device 互不影響
- m.healthyDevices 是 DM 保存的健康的 device
- m.unhealthyDevices 是 DM 保存的不健康的 device
- m.allocatedDevices 是 DM 保存的已經分配了的資源
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
// containerDevices in `kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go`
// 該函數把需要分配的 device list 返回出去,
// 首先嘗試使用 reusableDevices,如果足夠了,就返回,否則還會額外的返回全新的 device
// 如果沒有必要返回 device 列表,則返回 nil
devices := m.podDevices.containerDevices(podUID, contName, resource)
// reusableDevices
// Allocates from reusableDevices list first.
for device := range reusableDevices {
devices.Insert(device)
needed--
if needed == 0 {
return devices, nil
}
}
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
//
devices.Insert(device)
}
return devices, nil
sanitizeNodeAllocatable in kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulernodeinfo.NodeInfo) {
var newAllocatableResource *schedulernodeinfo.Resource
allocatableResource := node.AllocatableResource()
if allocatableResource.ScalarResources == nil {
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
}
for resource, devices := range m.allocatedDevices {
needed := devices.Len()
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
if ok && int(quant) >= needed {
continue
}
// Needs to update nodeInfo.AllocatableResource to make sure
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
if newAllocatableResource == nil {
newAllocatableResource = allocatableResource.Clone()
}
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
}
// 將 allocatedDevices 寫入 node 結構中,即:n.allocatableResource = allocatableResource
if newAllocatableResource != nil {
node.SetAllocatableResource(newAllocatableResource)
}
}
Allocate 詳解 in DM -> DP
參數部分相對容易,只要查看 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto
即可
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
// 傳入的結構體 AllocateRequest,簡言之就是一堆 deviceID
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}
message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}
// 返回的結構體 AllocateResponse,簡言之,是傳入容器的環境變量、文件夾映射、設備映射,和 annotations,其中 mount 和 device 均是 container_path、host_path、permission。
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}
message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}
// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}
有一個核心的問題,DM 通過 DP 的 ListAndWatch 接口查到的是幾個 device?如果這個機器上插了 8 GPU 16GB 內存。
我想大家和我一樣,認為是 8 個 GPU 設備,其實這個理解是錯誤的,其實 gpushare-device-plugin 會上報 8*16 個設備,每個設備會有假的 deviceID:
// gpushare-device-plugin/pkg/gpu/nvidia/nvidia.go func getDevices()
fakeID := generateFakeDeviceID(d.UUID, j)
devs = append(devs, &pluginapi.Device{
ID: fakeID,
Health: pluginapi.Healthy,
}
明確這個問題后,可以再看看 DP 以 gpushare-device-plugin
為例是怎么處理的:
// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
// 猜測:`Allocate` 的流程中,分配卡就變得容易,我們可以想象,一定是把多個 faked device ID 轉化為一張真正的卡,如果出現了 faked device IDs 不屬於同一張卡,那么一定是調度出現了問題!
// 實際上,DP 從來不做調度管理,是根據 scheduler 增加 annotation 來知道到底 faked device ID 轉化為哪一張真正的卡
responses := pluginapi.AllocateResponse{}
log.Infoln("----Allocating GPU for gpu mem is started----")
var (
podReqGPU uint
found bool
assumePod *v1.Pod
)
// podReqGPU = uint(0)
// 注意,Allocate 是 Pod 的請求,不是 Pod 下面的 container 的獨立請求,因此,podReqGPU 表示,該 pod 一共需要多少 faked 的卡
for _, req := range reqs.ContainerRequests {
podReqGPU += uint(len(req.DevicesIDs))
}
// 請求幾個 faked GPU 卡
log.Infof("RequestPodGPUs: %d", podReqGPU)
m.Lock()
defer m.Unlock()
log.Infoln("checking...")
// 需要仔細分析
// 通過查詢 k8s api,獲得所有的處於 pending 狀態的,並且需要被該 DP 所在 node 處理的 pod
// 確實是 getCandidatePods
pods, err := getCandidatePods()
if err != nil {
log.Infof("invalid allocation requst: Failed to find candidate pods due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
if log.V(4) {
for _, pod := range pods {
log.Infof("Pod %s in ns %s request GPU Memory %d with timestamp %v",
pod.Name,
pod.Namespace,
getGPUMemoryFromPodResource(pod),
getAssumeTimeFromPodAnnotation(pod))
}
}
for _, pod := range pods {
// 看下每一個 pod 需要的內存和本次請求的作比較,如果相同,則假設就是這個 pod 發來的請求(不夠嚴謹?)
// 我想,從 DM <-> DP 直接通信的參數本意上來說,DP 不需要知道我是為誰(哪個 pod)服務,但是為什么 gpushare-device-plugin 需要知道呢?
if getGPUMemoryFromPodResource(pod) == podReqGPU {
log.Infof("Found Assumed GPU shared Pod %s in ns %s with GPU Memory %d",
pod.Name,
pod.Namespace,
podReqGPU)
assumePod = pod
found = true
break
}
}
if found {
// 查找 Pod 中 annotation 的 `ALIYUN_COM_GPU_MEM_IDX` 內容是什么,比如 0 代表用第 0 塊卡
// 這個我猜測是在調度的時候被打上的
// 否則,分配失敗會報錯
id := getGPUIDFromPodAnnotation(assumePod)
if id < 0 {
log.Warningf("Failed to get the dev ", assumePod)
}
candidateDevID := ""
if id >= 0 {
ok := false
// 得到設備名稱
candidateDevID, ok = m.GetDeviceNameByIndex(uint(id))
if !ok {
log.Warningf("Failed to find the dev for pod %v because it's not able to find dev with index %d",
assumePod,
id)
id = -1
}
}
if id < 0 {
return buildErrResponse(reqs, podReqGPU), nil
}
// 1. Create container requests
// 構造返回的內容
for _, req := range reqs.ContainerRequests {
// reqGPU 是每一個 container 需要的卡,和 podReqGPU 要區分開
reqGPU := uint(len(req.DevicesIDs))
// 傳入的環境變量為:
/*
NVIDIA_VISIBLE_DEVICES: 也許是GPU0
ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
ALIYUN_COM_GPU_MEM_POD: 10 (POD 總 GPU memory)
ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一個 container 的 GPU memory)
ALIYUN_COM_GPU_MEM_DEV: 16 (這個卡總共能提供多少資源)
*/
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
envNVGPU: candidateDevID,
EnvResourceIndex: fmt.Sprintf("%d", id),
EnvResourceByPod: fmt.Sprintf("%d", podReqGPU),
EnvResourceByContainer: fmt.Sprintf("%d", reqGPU),
EnvResourceByDev: fmt.Sprintf("%d", getGPUMemory()),
},
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
// 2. Update Pod spec
// 直接改 pod 中的 annotations,生成新的數據結構
// ALIYUN_COM_GPU_MEM_ASSIGNED = true
// ALIYUN_COM_GPU_MEM_ASSUME_TIME = 時間戳
// 感覺很暴力!
newPod := updatePodAnnotations(assumePod)
// 使之生效
_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
if err != nil {
// 如果設置失敗,有重試機制
// the object has been modified; please apply your changes to the latest version and try again
if err.Error() == OptimisticLockErrorMsg {
// retry
pod, err := clientset.CoreV1().Pods(assumePod.Namespace).Get(assumePod.Name, metav1.GetOptions{})
if err != nil {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
newPod = updatePodAnnotations(pod)
_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
if err != nil {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
} else {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
}
} else {
log.Warningf("invalid allocation requst: request GPU memory %d can't be satisfied.",
podReqGPU)
// return &responses, fmt.Errorf("invalid allocation requst: request GPU memory %d can't be satisfied", reqGPU)
return buildErrResponse(reqs, podReqGPU), nil
}
log.Infof("new allocated GPUs info %v", &responses)
log.Infoln("----Allocating GPU for gpu mem is ended----")
// // Add this to make sure the container is created at least
// currentTime := time.Now()
// currentTime.Sub(lastAllocateTime)
// 將 1. Create container requests 中的內容返回了
return &responses, nil
}
我們需要總結一下這個流程:
- 猜測本次 Allocate 請求是哪個 Pod 提出的
- 通過 API 調用,獲得 Pod 的 Spec 配置,將多個 faked device ID 轉化為一張真正的 GPU 卡
- 組織返回的信息,包括多個(Pod 中有幾個 container 就有幾套):
NVIDIA_VISIBLE_DEVICES: 也許是GPU0
ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
ALIYUN_COM_GPU_MEM_POD: 10 (POD 總 GPU memory)
ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一個 container 的 GPU memory)
ALIYUN_COM_GPU_MEM_DEV: 16 (這個卡總共能提供多少資源)
- 直接修改 Pod 中的 annotations
ALIYUN_COM_GPU_MEM_ASSIGNED = true
ALIYUN_COM_GPU_MEM_ASSUME_TIME = 時間戳
至於 ALIYUN_COM_GPU_MEM_ASSIGNED
等有什么用,我們會另起文章分析。
Device 異常,DP 和 DM 如何反應?
首先,一定是 DP 先發現異常的。以 gpushare-device-plugin 為例,在啟動的時候,會增加監控 gpushare-device-plugin/pkg/gpu/nvidia/server.go
:
func (m *NvidiaDevicePlugin) Start() error {
...
go m.healthcheck()
...
}
func (m *NvidiaDevicePlugin) healthcheck() {
ctx, cancel := context.WithCancel(context.Background())
var xids chan *pluginapi.Device
if m.healthCheck {
xids = make(chan *pluginapi.Device)
// 監控所有的 dev
go watchXIDs(ctx, m.devs, xids)
}
for {
select {
case <-m.stop:
cancel()
return
case dev := <-xids:
// 如果異常,則調用 unhealthy 方法,該方法向信號參數 m.health 寫 dev 的內容
m.unhealthy(dev)
}
}
}
func watchXIDs(ctx context.Context, devs []*pluginapi.Device, xids chan<- *pluginapi.Device) {
通過 nvidia 庫 nvml RegisterEventForDevice,並監聽是否有異常出現,有異常則往管道里寫錯誤,一個用不停止的大循環
其次,如果 m.unhealthy 出現了異常設備,如何上報?或者是 DM 定期輪詢 ListAndWatch?
通過 ListAndWatch 代碼 gpushare-device-plugin/pkg/gpu/nvidia/server.go
可知,實際上都不是!
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
// 之前未曾注意到這個循環,當 DM 在 DP register 后,會立刻調用該函數,當正常返回后,該函數進入一個死循環
// 當 m.health 有內容了,立刻向 DM 發送出錯的信息,把所有卡都置為失敗,這個確實值得商榷
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
}
}
}
最后,DM 真的有一個進程一直在等待 ListAndWatchResponse
么?
簡言之是的,因為 DM 調用 ListAndResponse
后,會從 stream 流中不斷的獲得返回的內容,沒有新的,則會卡在 stream.Recv()
中,當有異常的時候,調用 callback 函數后,會 m.unhealthyDevices[resourceName].Insert(dev.ID)
什么時候才會真正獲取 Allocate 后的環境變量等內容
我們知道 kubelet 調用 DM 的 Allocate
僅僅是將資源准備好(設置幾個 dict 而已)(資源是在 DM 中管理的,DP 不記錄資源的使用),但是什么時候 kubelet 才真正讓這些資源被用起來呢?
分析的過程:
- 我就知道
func (m *ManagerImpl) GetDeviceRunContainerOptions
這個函數是用來返回 options 的,其在kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (cm *containerManagerImpl) GetResources
inkubernetes/pkg/kubelet/cm/container_manager_linux.go
調用了,該函數返回所有的 optionsfunc (kl *Kubelet) GenerateRunContainerOptions
inkubernetes/pkg/kubelet/kubelet_pods.go
該函數調用了func (m *kubeGenericRuntimeManager) generateContainerConfig
inkubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
func (m *kubeGenericRuntimeManager) startContainer
inkubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
func (m *kubeGenericRuntimeManager) SyncPod
inkubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
func (kl *Kubelet) syncPod
inkubernetes/pkg/kubelet/kubelet.go
func (m *manager) Start()
inkubernetes/pkg/kubelet/status/status_manager.go
當有 m.podStatusChannel 事件時,SyncPod
被觸發func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
inkubernetes/pkg/kubelet/kubelet.go
Allocate
應該在先,然后才是 GetDeviceRunContainerOptions
,分析的過程中並沒有明確的先后順序,作為下次分析的主題
擴展
nvidia-docker 能夠使用 GPU 的原理
准備作為一個主題,下次分享 參考
k8s 啟動帶 GPU 的容器的原理
准備作為一個主題,下次分享 參考