【博客503】kubelet device plugin如何管理与分配device
kubelet device plugin如何分配device
kubelet如何管理与持久化device信息
kubelet ManagerImp的创建:
pkg/kubelet/cm/devicemanager/manager.go:97
// NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) {
// 通过/var/lib/kubelet/device-plugins/kubelet.sock与device plugin交互
return newManagerImpl(pluginapi.KubeletSocket)
}
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %v", socketPath)
}
dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpoint),
socketname: file,
socketdir: dir,
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
pluginOpts: make(map[string]*pluginapi.DevicePluginOptions),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
// The following structs are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
var err error
// 在/var/lib/kubelet/device-plugins/目录下创建file store类型的key-value存储文件kubelet_internal_checkpoint,用来作为kubelet的device plugin的checkpoint。
manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})
if err != nil {
return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err)
}
return manager, nil
}
解析:
1、kubelet Device Manager通过/var/lib/kubelet/device-plugins/kubelet.sock与device plugin交互。
2、注册callback为genericDeviceUpdateCallback,用来处理对应devices的add,delete,update事件。
3、在/var/lib/kubelet/device-plugins/目录下创建file store类型的key-value存储文kubelet_internal_checkpoint,用来作为kubelet的device plugin的checkpoint。
当监听到devices add/delete/update事件发生时,会更新到kubelet_internal_checkpoint文件中。
4、当device plugin的stop time超过grace period time(代码写死为5min,不可配置),会从checkpoint中删除对应的devices。在这个时间范围内,Device Manager会继续缓存该endpoint及对应的devices。
5、为Container Allocate Devices后,也会将PodDevices更新到checkpoint中。
callback的实现即:genericDeviceUpdateCallback的实现,从而了解Device Manager是如何处理devices的add/delete/update消息的。
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
kept := append(updated, added...)
m.mutex.Lock()
if _, ok := m.healthyDevices[resourceName]; !ok {
m.healthyDevices[resourceName] = sets.NewString()
}
if _, ok := m.unhealthyDevices[resourceName]; !ok {
m.unhealthyDevices[resourceName] = sets.NewString()
}
for _, dev := range kept {
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
m.healthyDevices[resourceName].Delete(dev.ID)
}
}
for _, dev := range deleted {
m.healthyDevices[resourceName].Delete(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
}
m.mutex.Unlock()
m.writeCheckpoint()
}
逻辑总结:
1、将callback中收到的devices状态是Healthy,那么将device ID插入到ManagerImpl中healthDevices中,并从unhealthyDevices中删除。
2、将callback中收到的devices状态是Unhealthy,那么将device ID插入到ManagerImpl中unhealthDevices中,并从healthyDevices中删除。
3、将device plugin反馈的需要delete的devices从healthDevices和unhealthDevices中一并删除。
4、将ManagerImpl中的数据更新到checkpoint文件中。
kubelet每次启动会清空所有的socket文件,但不清空checkpiont文件
kubelet每次启动后将/var/lib/kubelet/device-plugins/下面的所有文件清空,当然checkpiont文件除外,也就是清空所有的socket文件,包括自己的kubelet.sock,以及其他所有之前的device plugin的socket文件。device plugin会监控kubelet.sock文件是否被删除,如果删除,则会触发自己的向kubelet重新注册自己
kubelet plugin device如何维护与分配device
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
for podUID := range m.devicesToReuse {
if podUID != string(pod.UID) {
delete(m.devicesToReuse, podUID)
}
}
// 为initContainer分配设备
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
// 对于initContainer,将所分配的device不断地加入到可重用设置列表中,以便后续回收提供给initContainer使用
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
}
// 为非initContainer分配设备
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
// 而对于非initContainer,则不断地从可重用设置列表中将分配出去的设备删除,因为他们非initContainer,会一直跑,使用的设备不能reuse
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
// 原因是k8s中有initContainer,initContainer可以有多个,先于container执行,
// 每个initContainer按顺序依次执行完毕后container才会开始创建,
// 而在为container或initContainer分配设备的时候会优先利用deviceToReuse的设备,
// 这样可避免资源浪费
逻辑解析:
1、从devicesToReuse中查看是否有该pod的reuse信息,如果devicesToReuse信息不属于pod则把devicesToReuse信息删除;
2、判断container是否为initContainer,如果是则通过allocateContainerResources为container分配所有的device资源,针对扩展资源只能是request=limit;
2-1、UpdateAllocatedDevices,获取activePods,把podDevices中不是active的pod分配信息从podDevices取出并从allocatedDevices删除;
2-2、执行devicesToAllocate
----2-2-1、先获取这个pod的container的这个resource资源的device信息,如果已经分配的部分device,还需要分配的资源量就需要减去已经分配的部分;
----2-2-2、为container分配device,默认情况下是取通过available.UnsortedList()[:needed]取可用的前几个device,如果有topologyAffinityStore的配置,则需要按照topology原则分配device;
----2-2-3、将上面分配的device加入到allocatedDevices;
2-3、根据devicesToAllocate分配的devices调用endpoint的allocategRPC接口分配对应的设备,然后将返回结果和设备分配信息添加到podDevices;
2-4、writeCheckpoint,将分配资源写入到kubelet_internal_checkpoint文件;
3、如果是InitContainer还需要把分配出去的device添加到devicesToReuse;
4如果不是InitContainer则直接调用allocateContainerResources进行device分配,只不过不需要将device信息添加到devicesToReuse;
向kubelet注册Device的时候,ID是我们自己来决定的,且kubelet后续也会使用我们申请的ID来向我们发起某块设备的申请
我们向kubelet注册设备:
2022/09/21 07:42:04 Registered device plugin with Kubelet
exposing devices: [&Device{ID:test-id-0,Health:Healthy,Topology:nil,} &Device{ID:test-id-1,Health:Healthy,Topology:nil,} &Device{ID:test-id-2,Health:Healthy,Topology:nil,} &Device{ID:test-id-3,Health:Healthy,Topology:nil,} &Device{ID:test-id-4,Health:Healthy,Topology:nil,} &Device{ID:test-id-5,Health:Healthy,Topology:nil,} &Device{ID:test-id-6,Health:Healthy,Topology:nil,} &Device{ID:test-id-7,Health:Healthy,Topology:nil,} &Device{ID:test-id-8,Health:Healthy,Topology:nil,} &Device{ID:test-id-9,Health:Healthy,Topology:nil,} &Device{ID:test-id-10,Health:Healthy,Topology:nil,} &Device{ID:test-id-11,Health:Healthy,Topology:nil,} &Device{ID:test-id-12,Health:Healthy,Topology:nil,} &Device{ID:test-id-13,Health:Healthy,Topology:nil,} &Device{ID:test-id-14,Health:Healthy,Topology:nil,} &Device{ID:test-id-15,Health:Healthy,Topology:nil,} &Device{ID:test-id-16,Health:Healthy,Topology:nil,} &Device{ID:test-id-17,Health:Healthy,Topology:nil,} &Device{ID:test-id-18,Health:Healthy,Topology:nil,} &Device{ID:test-id-19,Health:Healthy,Topology:nil,} &Device{ID:test-id-20,Health:Healthy,Topology:nil,} &Device{ID:test-id-21,Health:Healthy,Topology:nil,} &Device{ID:test-id-22,Health:Healthy,Topology:nil,} &Device{ID:test-id-23,Health:Healthy,Topology:nil,} &Device{ID:test-id-24,Health:Healthy,Topology:nil,} &Device{ID:test-id-25,Health:Healthy,Topology:nil,} &Device{ID:test-id-26,Health:Healthy,Topology:nil,} &Device{ID:test-id-27,Health:Healthy,Topology:nil,} &Device{ID:test-id-28,Health:Healthy,Topology:nil,} &Device{ID:test-id-29,Health:Healthy,Topology:nil,} &Device{ID:test-id-30,Health:Healthy,Topology:nil,} &Device{ID:test-id-31,Health:Healthy,Topology:nil,} &Device{ID:test-id-32,Health:Healthy,Topology:nil,} &Device{ID:test-id-33,Health:Healthy,Topology:nil,} &Device{ID:test-id-34,Health:Healthy,Topology:nil,} &Device{ID:test-id-35,Health:Healthy,Topology:nil,} &Device{ID:test-id-36,Health:Healthy,Topology:nil,} &Device{ID:test-id-37,Health:Healthy,Topology:nil,} &Device{ID:test-id-38,Health:Healthy,Topology:nil,} &Device{ID:test-id-39,Health:Healthy,Topology:nil,} &Device{ID:test-id-40,Health:Healthy,Topology:nil,} &Device{ID:test-id-41,Health:Healthy,Topology:nil,} &Device{ID:test-id-42,Health:Healthy,Topology:nil,} &Device{ID:test-id-43,Health:Healthy,Topology:nil,} &Device{ID:test-id-44,Health:Healthy,Topology:nil,} &Device{ID:test-id-45,Health:Healthy,Topology:nil,} &Device{ID:test-id-46,Health:Healthy,Topology:nil,} &Device{ID:test-id-47,Health:Healthy,Topology:nil,} &Device{ID:test-id-48,Health:Healthy,Topology:nil,} &Device{ID:test-id-49,Health:Healthy,Topology:nil,} &Device{ID:test-id-50,Health:Healthy,Topology:nil,} &Device{ID:test-id-51,Health:Healthy,Topology:nil,} &Device{ID:test-id-52,Health:Healthy,Topology:nil,} &Device{ID:test-id-53,Health:Healthy,Topology:nil,} &Device{ID:test-id-54,Health:Healthy,Topology:nil,} &Device{ID:test-id-55,Health:Healthy,Topology:nil,} &Device{ID:test-id-56,Health:Healthy,Topology:nil,} &Device{ID:test-id-57,Health:Healthy,Topology:nil,} &Device{ID:test-id-58,Health:Healthy,Topology:nil,} &Device{ID:test-id-59,Health:Healthy,Topology:nil,} &Device{ID:test-id-60,Health:Healthy,Topology:nil,} &Device{ID:test-id-61,Health:Healthy,Topology:nil,} &Device{ID:test-id-62,Health:Healthy,Topology:nil,} &Device{ID:test-id-63,Health:Healthy,Topology:nil,} &Device{ID:test-id-64,Health:Healthy,Topology:nil,} &Device{ID:test-id-65,Health:Healthy,Topology:nil,} &Device{ID:test-id-66,Health:Healthy,Topology:nil,} &Device{ID:test-id-67,Health:Healthy,Topology:nil,} &Device{ID:test-id-68,Health:Healthy,Topology:nil,} &Device{ID:test-id-69,Health:Healthy,Topology:nil,} &Device{ID:test-id-70,Health:Healthy,Topology:nil,} &Device{ID:test-id-71,Health:Healthy,Topology:nil,} &Device{ID:test-id-72,Health:Healthy,Topology:nil,} &Device{ID:test-id-73,Health:Healthy,Topology:nil,} &Device{ID:test-id-74,Health:Healthy,Topology:nil,} &Device{ID:test-id-75,Health:Healthy,Topology:nil,} &Device{ID:test-id-76,Health:Healthy,Topology:nil,} &Device{ID:test-id-77,Health:Healthy,Topology:nil,} &Device{ID:test-id-78,Health:Healthy,Topology:nil,} &Device{ID:test-id-79,Health:Healthy,Topology:nil,} &Device{ID:test-id-80,Health:Healthy,Topology:nil,} &Device{ID:test-id-81,Health:Healthy,Topology:nil,} &Device{ID:test-id-82,Health:Healthy,Topology:nil,} &Device{ID:test-id-83,Health:Healthy,Topology:nil,} &Device{ID:test-id-84,Health:Healthy,Topology:nil,} &Device{ID:test-id-85,Health:Healthy,Topology:nil,} &Device{ID:test-id-86,Health:Healthy,Topology:nil,} &Device{ID:test-id-87,Health:Healthy,Topology:nil,} &Device{ID:test-id-88,Health:Healthy,Topology:nil,} &Device{ID:test-id-89,Health:Healthy,Topology:nil,} &Device{ID:test-id-90,Health:Healthy,Topology:nil,} &Device{ID:test-id-91,Health:Healthy,Topology:nil,} &Device{ID:test-id-92,Health:Healthy,Topology:nil,} &Device{ID:test-id-93,Health:Healthy,Topology:nil,} &Device{ID:test-id-94,Health:Healthy,Topology:nil,} &Device{ID:test-id-95,Health:Healthy,Topology:nil,} &Device{ID:test-id-96,Health:Healthy,Topology:nil,} &Device{ID:test-id-97,Health:Healthy,Topology:nil,} &Device{ID:test-id-98,Health:Healthy,Topology:nil,} &Device{ID:test-id-99,Health:Healthy,Topology:nil,}]
看到kubelet调用我们allocate的时候,会随机从设备ID列表中取出一个,然后向我们申请那个设备,使用的ID就是我们注册时的那些设备ID
2022/09/21 07:42:13 allocate request: &AllocateRequest{ContainerRequests:[]*ContainerAllocateRequest{&ContainerAllocateRequest{DevicesIDs:[test-id-69],},},}
2022/09/21 07:42:13 allocate response: {[&ContainerAllocateResponse{Envs:map[string]string{},Mounts:[]*Mount{},Devices:[]*DeviceSpec{&DeviceSpec{ContainerPath:/dev/mydevs/mynull0,HostPath:/dev/mydevs/mynull0,Permissions:rwm,},&DeviceSpec{ContainerPath:/dev/mydevs/mynull1,HostPath:/dev/mydevs/mynull1,Permissions:rwm,},},Annotations:map[string]string{},}] {} 0}
kubelet会在本地维护device的分配情况
kubelet在/var/lib/kubelet/device-plugins路径下,有一个kubelet_internal_checkpoint,里面维护了设备的使用情况
可以看到这里的test-id-69,也就是上述kubelet申请的时候带了的id,已经显示分配给了host-device-test这个容器,
// 所属的podUid为:f3ca736e-ee62-40e8-ab8a-1870820b09f8。
root@10 device-plugins]# ls
DEPRECATION everpeace-mydevs.sock kubelet_internal_checkpoint kubelet.sock
[root@10 device-plugins]# cat kubelet_internal_checkpoint
{“Data”:{“PodDeviceEntries”:[{“PodUID”:“f3ca736e-ee62-40e8-ab8a-1870820b09f8”,“ContainerName”:“host-device-test-ctr”,“ResourceName”:“github.com/everpeace-mydevs”,“DeviceIDs”:{“0”:[“test-id-69”]},“AllocResp”:“Gi8KEy9kZXYvbXlkZXZzL215bnVsbDASEy9kZXYvbXlkZXZzL215bnVsbDAaA3J3bRovChMvZGV2L215ZGV2cy9teW51bGwxEhMvZGV2L215ZGV2cy9teW51bGwxGgNyd20=”}],“RegisteredDevices”:{“github.com/everpeace-mydevs”:[“test-id-38”,“test-id-49”,“test-id-95”,“test-id-28”,“test-id-60”,“test-id-67”,“test-id-71”,“test-id-79”,“test-id-1”,“test-id-6”,“test-id-50”,“test-id-59”,“test-id-88”,“test-id-17”,“test-id-37”,“test-id-45”,“test-id-69”,“test-id-75”,“test-id-87”,“test-id-98”,“test-id-16”,“test-id-26”,“test-id-70”,“test-id-77”,“test-id-84”,“test-id-21”,“test-id-22”,“test-id-53”,“test-id-92”,“test-id-57”,“test-id-76”,“test-id-94”,“test-id-0”,“test-id-24”,“test-id-25”,“test-id-42”,“test-id-46”,“test-id-99”,“test-id-9”,“test-id-33”,“test-id-43”,“test-id-64”,“test-id-86”,“test-id-52”,“test-id-72”,“test-id-19”,“test-id-20”,“test-id-32”,“test-id-44”,“test-id-47”,“test-id-40”,“test-id-41”,“test-id-58”,“test-id-73”,“test-id-2”,“test-id-4”,“test-id-8”,“test-id-13”,“test-id-14”,“test-id-80”,“test-id-91”,“test-id-55”,“test-id-65”,“test-id-85”,“test-id-11”,“test-id-12”,“test-id-34”,“test-id-48”,“test-id-54”,“test-id-96”,“test-id-97”,“test-id-39”,“test-id-83”,“test-id-3”,“test-id-18”,“test-id-29”,“test-id-30”,“test-id-35”,“test-id-36”,“test-id-56”,“test-id-61”,“test-id-5”,“test-id-7”,“test-id-10”,“test-id-27”,“test-id-31”,“test-id-63”,“test-id-74”,“test-id-81”,“test-id-93”,“test-id-51”,“test-id-68”,“test-id-82”,“test-id-89”,“test-id-90”,“test-id-15”,“test-id-23”,“test-id-62”,“test-id-66”,“test-id-78”]}},“Checksum”:2583529994}
kubelet checkpointData数据格式解析
注意:
1、device plugin提供的Resource属于Kubernetes Extended Resources,所以其Resource QoS只能是Guaranted。
2、每次在为Pod分配devices之前,都去检查一下此时的active pods,并与podDevices缓存中的pods进行比对,将已经terminated的Pods的devices从podDevices中删除,即进行了devices的GC操作。
3、从healthyDevices中随机分配对应数量的devices给该Pod,并注意更新allocatedDevices,否则会导致一个device被分配给多个Pod。
4、拿到devices后,就调用Endpoint的Allocate方法(进而调用对应device plugin的Allocate gRPC Service),device plugin返回ContainerAllocateResponse(包括注入的环境变量、挂载信息、Annotations)。
更新podDevices缓存信息,并将ManagerImpl中缓存数据更新到checkpoint文件中。
5、当init container结束后,对应分配的devices会被释放吗? 目前还不会释放devices,在Allocate前只会回收Terminated Pods的devices,并没有回收init container的devices。要优化这个也是比较简单的,只要修改上面代码中updateAllocatedDevices方法内的逻辑就行了,增加init container的devices回收逻辑。所以当前版本最好不会要在init container中使用devices,虽然这种场景几乎不存在。
kebelet维护NodeStatus中Device Plugin管理的Resource Capacity,即:资源上报
当kubelet更新node status时会调用GetCapacity更新device plugins对应的Resource信息。
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
needsUpdateCheckpoint := false
var capacity = v1.ResourceList{}
var allocatable = v1.ResourceList{}
deletedResources := sets.NewString()
m.mutex.Lock()
for resourceName, devices := range m.healthyDevices {
e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
if !ok {
glog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
}
delete(m.endpoints, resourceName)
delete(m.healthyDevices, resourceName)
deletedResources.Insert(resourceName)
needsUpdateCheckpoint = true
} else {
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
}
}
for resourceName, devices := range m.unhealthyDevices {
e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
if !ok {
glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
}
delete(m.endpoints, resourceName)
delete(m.unhealthyDevices, resourceName)
deletedResources.Insert(resourceName)
needsUpdateCheckpoint = true
} else {
capacityCount := capacity[v1.ResourceName(resourceName)]
unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
capacityCount.Add(unhealthyCount)
capacity[v1.ResourceName(resourceName)] = capacityCount
}
}
m.mutex.Unlock()
if needsUpdateCheckpoint {
m.writeCheckpoint()
}
return capacity, allocatable, deletedResources.UnsortedList()
}
逻辑解析:
1、先查看healthDevices资源量,把资源量添加到capacity和allocatable;
2、查看unhealthDevices资源量,更新capacity总量,最后返回数据;
3、如果device已经删除时间超过5分钟,则需要将资源从endpoint, healthyDevices中删除相关的device信息,最后会将信息更新到kubelet_internal_checkpoint。
GetCapacity更新NodeStatus如下数据:
1、registered device plugin resource Capacity
2、registered device plugin resource Allocatable
3、previously registered resources that are no longer active