mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	scheduler: Improve CSILimits plugin accuracy by using VolumeAttachments
Signed-off-by: torredil <torredil@amazon.com>
This commit is contained in:
		@@ -23,6 +23,7 @@ import (
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	storagev1 "k8s.io/api/storage/v1"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/rand"
 | 
			
		||||
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
@@ -60,6 +61,7 @@ type CSILimits struct {
 | 
			
		||||
	pvLister      corelisters.PersistentVolumeLister
 | 
			
		||||
	pvcLister     corelisters.PersistentVolumeClaimLister
 | 
			
		||||
	scLister      storagelisters.StorageClassLister
 | 
			
		||||
	vaLister      storagelisters.VolumeAttachmentLister
 | 
			
		||||
 | 
			
		||||
	randomVolumeIDPrefix string
 | 
			
		||||
 | 
			
		||||
@@ -183,6 +185,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
 | 
			
		||||
		logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Count CSI volumes from the new pod
 | 
			
		||||
	newVolumes := make(map[string]string)
 | 
			
		||||
	if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil {
 | 
			
		||||
		if apierrors.IsNotFound(err) {
 | 
			
		||||
@@ -203,6 +206,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Count CSI volumes from existing pods
 | 
			
		||||
	attachedVolumes := make(map[string]string)
 | 
			
		||||
	for _, existingPod := range nodeInfo.Pods {
 | 
			
		||||
		if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
 | 
			
		||||
@@ -217,6 +221,19 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
 | 
			
		||||
		attachedVolumeCount[driverName]++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Count CSI volumes from VolumeAttachments
 | 
			
		||||
	volumeAttachments, err := pl.getNodeVolumeAttachmentInfo(logger, node.Name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return framework.AsStatus(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for volumeUniqueName, driverName := range volumeAttachments {
 | 
			
		||||
		// Avoid double-counting volumes already used by existing pods
 | 
			
		||||
		if _, exists := attachedVolumes[volumeUniqueName]; !exists {
 | 
			
		||||
			attachedVolumeCount[driverName]++
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Count the new volumes count per driver
 | 
			
		||||
	newVolumeCount := map[string]int{}
 | 
			
		||||
	for _, driverName := range newVolumes {
 | 
			
		||||
@@ -303,7 +320,7 @@ func (pl *CSILimits) filterAttachableVolumes(
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
 | 
			
		||||
		volumeUniqueName := getVolumeUniqueName(driverName, volumeHandle)
 | 
			
		||||
		result[volumeUniqueName] = driverName
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
@@ -344,7 +361,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
 | 
			
		||||
	if translatedPV.Spec.PersistentVolumeSource.CSI == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
 | 
			
		||||
	volumeUniqueName := getVolumeUniqueName(driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
 | 
			
		||||
	result[volumeUniqueName] = driverName
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -453,6 +470,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
 | 
			
		||||
	pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
 | 
			
		||||
	csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
 | 
			
		||||
	scLister := informerFactory.Storage().V1().StorageClasses().Lister()
 | 
			
		||||
	vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister()
 | 
			
		||||
	csiTranslator := csitrans.New()
 | 
			
		||||
 | 
			
		||||
	return &CSILimits{
 | 
			
		||||
@@ -460,6 +478,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
 | 
			
		||||
		pvLister:             pvLister,
 | 
			
		||||
		pvcLister:            pvcLister,
 | 
			
		||||
		scLister:             scLister,
 | 
			
		||||
		vaLister:             vaLister,
 | 
			
		||||
		randomVolumeIDPrefix: rand.String(32),
 | 
			
		||||
		translator:           csiTranslator,
 | 
			
		||||
	}, nil
 | 
			
		||||
@@ -480,3 +499,40 @@ func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 {
 | 
			
		||||
	}
 | 
			
		||||
	return nodeVolumeLimits
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node.
 | 
			
		||||
func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) {
 | 
			
		||||
	volumeAttachments := make(map[string]string)
 | 
			
		||||
	vas, err := pl.vaLister.List(labels.Everything())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, va := range vas {
 | 
			
		||||
		if va.Spec.NodeName == nodeName {
 | 
			
		||||
			if va.Spec.Attacher == "" {
 | 
			
		||||
				logger.V(5).Info("VolumeAttachment has no attacher", "VolumeAttachment", klog.KObj(va))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if va.Spec.Source.PersistentVolumeName == nil {
 | 
			
		||||
				logger.V(5).Info("VolumeAttachment has no PV name", "VolumeAttachment", klog.KObj(va))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			pv, err := pl.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logger.V(5).Info("Unable to get PV for VolumeAttachment", "VolumeAttachment", klog.KObj(va), "err", err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if pv.Spec.CSI == nil {
 | 
			
		||||
				logger.V(5).Info("PV is not a CSI volume", "PV", klog.KObj(pv))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			volumeID := getVolumeUniqueName(va.Spec.Attacher, pv.Spec.CSI.VolumeHandle)
 | 
			
		||||
			volumeAttachments[volumeID] = va.Spec.Attacher
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return volumeAttachments, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getVolumeUniqueName(driverName, volumeHandle string) string {
 | 
			
		||||
	return fmt.Sprintf("%s/%s", driverName, volumeHandle)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -265,6 +265,7 @@ func TestCSILimits(t *testing.T) {
 | 
			
		||||
		extraClaims         []v1.PersistentVolumeClaim
 | 
			
		||||
		filterName          string
 | 
			
		||||
		maxVols             int32
 | 
			
		||||
		vaCount             int
 | 
			
		||||
		driverNames         []string
 | 
			
		||||
		test                string
 | 
			
		||||
		migrationEnabled    bool
 | 
			
		||||
@@ -273,6 +274,27 @@ func TestCSILimits(t *testing.T) {
 | 
			
		||||
		wantStatus          *framework.Status
 | 
			
		||||
		wantPreFilterStatus *framework.Status
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			newPod:       csiEBSOneVolPod,
 | 
			
		||||
			existingPods: []*v1.Pod{},
 | 
			
		||||
			filterName:   "csi",
 | 
			
		||||
			maxVols:      2,
 | 
			
		||||
			driverNames:  []string{ebsCSIDriverName},
 | 
			
		||||
			vaCount:      2,
 | 
			
		||||
			test:         "should count VolumeAttachments towards volume limit when no pods exist",
 | 
			
		||||
			limitSource:  "csinode",
 | 
			
		||||
			wantStatus:   framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			newPod:       csiEBSOneVolPod,
 | 
			
		||||
			existingPods: []*v1.Pod{},
 | 
			
		||||
			filterName:   "csi",
 | 
			
		||||
			maxVols:      2,
 | 
			
		||||
			driverNames:  []string{ebsCSIDriverName},
 | 
			
		||||
			vaCount:      1,
 | 
			
		||||
			test:         "should schedule pod when VolumeAttachments count does not exceed limit",
 | 
			
		||||
			limitSource:  "csinode",
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			newPod:       csiEBSOneVolPod,
 | 
			
		||||
			existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
 | 
			
		||||
@@ -609,6 +631,7 @@ func TestCSILimits(t *testing.T) {
 | 
			
		||||
				pvLister:             getFakeCSIPVLister(test.filterName, test.driverNames...),
 | 
			
		||||
				pvcLister:            append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...),
 | 
			
		||||
				scLister:             getFakeCSIStorageClassLister(scName, test.driverNames[0]),
 | 
			
		||||
				vaLister:             getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...),
 | 
			
		||||
				randomVolumeIDPrefix: rand.String(32),
 | 
			
		||||
				translator:           csiTranslator,
 | 
			
		||||
			}
 | 
			
		||||
@@ -769,6 +792,28 @@ func TestCSILimitsAddedPVCQHint(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister {
 | 
			
		||||
	vaLister := tf.VolumeAttachmentLister{}
 | 
			
		||||
	for _, driver := range driverNames {
 | 
			
		||||
		for j := 0; j < count; j++ {
 | 
			
		||||
			pvName := fmt.Sprintf("csi-%s-%d", driver, j)
 | 
			
		||||
			va := storagev1.VolumeAttachment{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: fmt.Sprintf("va-%s-%d", driver, j),
 | 
			
		||||
				},
 | 
			
		||||
				Spec: storagev1.VolumeAttachmentSpec{
 | 
			
		||||
					NodeName: "node-for-max-pd-test-1",
 | 
			
		||||
					Attacher: driver,
 | 
			
		||||
					Source: storagev1.VolumeAttachmentSource{
 | 
			
		||||
						PersistentVolumeName: &pvName,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
			vaLister = append(vaLister, va)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return vaLister
 | 
			
		||||
}
 | 
			
		||||
func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister {
 | 
			
		||||
	pvLister := tf.PersistentVolumeLister{}
 | 
			
		||||
	for _, driver := range driverNames {
 | 
			
		||||
 
 | 
			
		||||
@@ -313,3 +313,27 @@ func (classes StorageClassLister) Get(name string) (*storagev1.StorageClass, err
 | 
			
		||||
func (classes StorageClassLister) List(selector labels.Selector) ([]*storagev1.StorageClass, error) {
 | 
			
		||||
	return nil, fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// VolumeAttachmentLister declares a []storagev1.VolumeAttachment type for testing.
 | 
			
		||||
type VolumeAttachmentLister []storagev1.VolumeAttachment
 | 
			
		||||
 | 
			
		||||
var _ storagelisters.VolumeAttachmentLister = VolumeAttachmentLister{}
 | 
			
		||||
 | 
			
		||||
// List lists all VolumeAttachments in the indexer.
 | 
			
		||||
func (val VolumeAttachmentLister) List(selector labels.Selector) (ret []*storagev1.VolumeAttachment, err error) {
 | 
			
		||||
	var list []*storagev1.VolumeAttachment
 | 
			
		||||
	for i := range val {
 | 
			
		||||
		list = append(list, &val[i])
 | 
			
		||||
	}
 | 
			
		||||
	return list, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get returns a fake VolumeAttachment object from the fake VolumeAttachments by name.
 | 
			
		||||
func (val VolumeAttachmentLister) Get(name string) (*storagev1.VolumeAttachment, error) {
 | 
			
		||||
	for _, va := range val {
 | 
			
		||||
		if va.Name == name {
 | 
			
		||||
			return &va, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil, errors.NewNotFound(storagev1.Resource("volumeattachments"), name)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -572,6 +572,7 @@ func ClusterRoles() []rbacv1.ClusterRole {
 | 
			
		||||
		rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(),
 | 
			
		||||
		// Needed for volume limits
 | 
			
		||||
		rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(),
 | 
			
		||||
		rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie(),
 | 
			
		||||
		// Needed for namespaceSelector feature in pod affinity
 | 
			
		||||
		rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("namespaces").RuleOrDie(),
 | 
			
		||||
		rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(),
 | 
			
		||||
 
 | 
			
		||||
@@ -851,6 +851,14 @@ items:
 | 
			
		||||
    - get
 | 
			
		||||
    - list
 | 
			
		||||
    - watch
 | 
			
		||||
  - apiGroups:
 | 
			
		||||
    - storage.k8s.io
 | 
			
		||||
    resources:
 | 
			
		||||
    - volumeattachments
 | 
			
		||||
    verbs:
 | 
			
		||||
    - get
 | 
			
		||||
    - list
 | 
			
		||||
    - watch
 | 
			
		||||
  - apiGroups:
 | 
			
		||||
    - ""
 | 
			
		||||
    resources:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user