mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Add benchmark for scheduling of pods with PVs
This commit is contained in:
		@@ -33,11 +33,17 @@ go_test(
 | 
			
		||||
    embed = [":go_default_library"],
 | 
			
		||||
    tags = ["integration"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/features:go_default_library",
 | 
			
		||||
        "//pkg/scheduler/factory:go_default_library",
 | 
			
		||||
        "//pkg/volume/util:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
 | 
			
		||||
        "//test/integration/framework:go_default_library",
 | 
			
		||||
        "//test/utils:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/klog:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -22,16 +22,27 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	storagev1beta1 "k8s.io/api/storage/v1beta1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/resource"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	"k8s.io/csi-translation-lib/plugins"
 | 
			
		||||
	csilibplugins "k8s.io/csi-translation-lib/plugins"
 | 
			
		||||
	"k8s.io/klog"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
	testutils "k8s.io/kubernetes/test/utils"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/klog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	defaultNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
 | 
			
		||||
 | 
			
		||||
	testCSIDriver = plugins.AWSEBSDriverName
 | 
			
		||||
	// From PV controller
 | 
			
		||||
	annBindCompleted = "pv.kubernetes.io/bind-completed"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// BenchmarkScheduling benchmarks the scheduling rate when the cluster has
 | 
			
		||||
@@ -79,6 +90,134 @@ func BenchmarkSchedulingPodAntiAffinity(b *testing.B) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BenchmarkSchedulingSecrets benchmarks the scheduling rate of pods with
 | 
			
		||||
// volumes that don't require any special handling, such as Secrets.
 | 
			
		||||
// It can be used to compare scheduler efficiency with the other benchmarks
 | 
			
		||||
// that use volume scheduling predicates.
 | 
			
		||||
func BenchmarkSchedulingSecrets(b *testing.B) {
 | 
			
		||||
	tests := []struct{ nodes, existingPods, minPods int }{
 | 
			
		||||
		{nodes: 500, existingPods: 250, minPods: 250},
 | 
			
		||||
		{nodes: 500, existingPods: 5000, minPods: 250},
 | 
			
		||||
		{nodes: 1000, existingPods: 1000, minPods: 500},
 | 
			
		||||
		{nodes: 5000, existingPods: 1000, minPods: 1000},
 | 
			
		||||
	}
 | 
			
		||||
	// The setup strategy creates pods with no volumes.
 | 
			
		||||
	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup")
 | 
			
		||||
	// The test strategy creates pods with a secret.
 | 
			
		||||
	testBasePod := makeBasePodWithSecret()
 | 
			
		||||
	testStrategy := testutils.NewCustomCreatePodStrategy(testBasePod)
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods)
 | 
			
		||||
		b.Run(name, func(b *testing.B) {
 | 
			
		||||
			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, defaultNodeStrategy, setupStrategy, testStrategy, b)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BenchmarkSchedulingInTreePVs benchmarks the scheduling rate of pods with
 | 
			
		||||
// in-tree volumes (used via PV/PVC). Nodes have default hardcoded attach limits
 | 
			
		||||
// (39 for AWS EBS).
 | 
			
		||||
func BenchmarkSchedulingInTreePVs(b *testing.B) {
 | 
			
		||||
	tests := []struct{ nodes, existingPods, minPods int }{
 | 
			
		||||
		{nodes: 500, existingPods: 250, minPods: 250},
 | 
			
		||||
		{nodes: 500, existingPods: 5000, minPods: 250},
 | 
			
		||||
		{nodes: 1000, existingPods: 1000, minPods: 500},
 | 
			
		||||
		{nodes: 5000, existingPods: 1000, minPods: 1000},
 | 
			
		||||
	}
 | 
			
		||||
	// The setup strategy creates pods with no volumes.
 | 
			
		||||
	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup")
 | 
			
		||||
 | 
			
		||||
	// The test strategy creates pods with AWS EBS volume used via PV.
 | 
			
		||||
	baseClaim := makeBasePersistentVolumeClaim()
 | 
			
		||||
	basePod := makeBasePod()
 | 
			
		||||
	testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, awsVolumeFactory, basePod)
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods)
 | 
			
		||||
		b.Run(name, func(b *testing.B) {
 | 
			
		||||
			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, defaultNodeStrategy, setupStrategy, testStrategy, b)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BenchmarkSchedulingMigratedInTreePVs benchmarks the scheduling rate of pods with
 | 
			
		||||
// in-tree volumes (used via PV/PVC) that are migrated to CSI. CSINode instances exist
 | 
			
		||||
// for all nodes and have proper annotation that AWS is migrated.
 | 
			
		||||
func BenchmarkSchedulingMigratedInTreePVs(b *testing.B) {
 | 
			
		||||
	tests := []struct{ nodes, existingPods, minPods int }{
 | 
			
		||||
		{nodes: 500, existingPods: 250, minPods: 250},
 | 
			
		||||
		{nodes: 500, existingPods: 5000, minPods: 250},
 | 
			
		||||
		{nodes: 1000, existingPods: 1000, minPods: 500},
 | 
			
		||||
		{nodes: 5000, existingPods: 1000, minPods: 1000},
 | 
			
		||||
	}
 | 
			
		||||
	// The setup strategy creates pods with no volumes.
 | 
			
		||||
	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup")
 | 
			
		||||
 | 
			
		||||
	// The test strategy creates pods with AWS EBS volume used via PV.
 | 
			
		||||
	baseClaim := makeBasePersistentVolumeClaim()
 | 
			
		||||
	basePod := makeBasePod()
 | 
			
		||||
	testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, awsVolumeFactory, basePod)
 | 
			
		||||
 | 
			
		||||
	// Each node can use the same amount of CSI volumes as in-tree AWS volume
 | 
			
		||||
	// plugin, so the results should be comparable with BenchmarkSchedulingInTreePVs.
 | 
			
		||||
	driverKey := util.GetCSIAttachLimitKey(testCSIDriver)
 | 
			
		||||
	allocatable := map[v1.ResourceName]string{
 | 
			
		||||
		v1.ResourceName(driverKey): fmt.Sprintf("%d", util.DefaultMaxEBSVolumes),
 | 
			
		||||
	}
 | 
			
		||||
	var count int32 = util.DefaultMaxEBSVolumes
 | 
			
		||||
	csiAllocatable := map[string]*storagev1beta1.VolumeNodeResources{
 | 
			
		||||
		testCSIDriver: {
 | 
			
		||||
			Count: &count,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	nodeStrategy := testutils.NewNodeAllocatableStrategy(allocatable, csiAllocatable, []string{csilibplugins.AWSEBSInTreePluginName})
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods)
 | 
			
		||||
		b.Run(name, func(b *testing.B) {
 | 
			
		||||
			defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.CSIMigration, true)()
 | 
			
		||||
			defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)()
 | 
			
		||||
			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, nodeStrategy, setupStrategy, testStrategy, b)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// node.status.allocatable.
 | 
			
		||||
func BenchmarkSchedulingCSIPVs(b *testing.B) {
 | 
			
		||||
	tests := []struct{ nodes, existingPods, minPods int }{
 | 
			
		||||
		{nodes: 500, existingPods: 250, minPods: 250},
 | 
			
		||||
		{nodes: 500, existingPods: 5000, minPods: 250},
 | 
			
		||||
		{nodes: 1000, existingPods: 1000, minPods: 500},
 | 
			
		||||
		{nodes: 5000, existingPods: 1000, minPods: 1000},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// The setup strategy creates pods with no volumes.
 | 
			
		||||
	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup")
 | 
			
		||||
 | 
			
		||||
	// The test strategy creates pods with CSI volume via PV.
 | 
			
		||||
	baseClaim := makeBasePersistentVolumeClaim()
 | 
			
		||||
	basePod := makeBasePod()
 | 
			
		||||
	testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, csiVolumeFactory, basePod)
 | 
			
		||||
 | 
			
		||||
	// Each node can use the same amount of CSI volumes as in-tree AWS volume
 | 
			
		||||
	// plugin, so the results should be comparable with BenchmarkSchedulingInTreePVs.
 | 
			
		||||
	driverKey := util.GetCSIAttachLimitKey(testCSIDriver)
 | 
			
		||||
	allocatable := map[v1.ResourceName]string{
 | 
			
		||||
		v1.ResourceName(driverKey): fmt.Sprintf("%d", util.DefaultMaxEBSVolumes),
 | 
			
		||||
	}
 | 
			
		||||
	var count int32 = util.DefaultMaxEBSVolumes
 | 
			
		||||
	csiAllocatable := map[string]*storagev1beta1.VolumeNodeResources{
 | 
			
		||||
		testCSIDriver: {
 | 
			
		||||
			Count: &count,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	nodeStrategy := testutils.NewNodeAllocatableStrategy(allocatable, csiAllocatable, []string{})
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods)
 | 
			
		||||
		b.Run(name, func(b *testing.B) {
 | 
			
		||||
			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, nodeStrategy, setupStrategy, testStrategy, b)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BenchmarkSchedulingPodAffinity benchmarks the scheduling rate of pods with
 | 
			
		||||
// PodAffinity rules when the cluster has various quantities of nodes and
 | 
			
		||||
// scheduled pods.
 | 
			
		||||
@@ -265,8 +404,110 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
 | 
			
		||||
		if len(scheduled) >= numExistingPods+b.N {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Note: This might introduce slight deviation in accuracy of benchmark results.
 | 
			
		||||
		// Since the total amount of time is relatively large, it might not be a concern.
 | 
			
		||||
		time.Sleep(100 * time.Millisecond)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// makeBasePodWithSecrets creates a Pod object to be used as a template.
 | 
			
		||||
// The pod uses a single Secrets volume.
 | 
			
		||||
func makeBasePodWithSecret() *v1.Pod {
 | 
			
		||||
	basePod := &v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			GenerateName: "secret-volume-",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: testutils.MakePodSpec(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	volumes := []v1.Volume{
 | 
			
		||||
		{
 | 
			
		||||
			Name: "secret",
 | 
			
		||||
			VolumeSource: v1.VolumeSource{
 | 
			
		||||
				Secret: &v1.SecretVolumeSource{
 | 
			
		||||
					SecretName: "secret",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	basePod.Spec.Volumes = volumes
 | 
			
		||||
	return basePod
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// makeBasePod creates a Pod object to be used as a template.
 | 
			
		||||
func makeBasePod() *v1.Pod {
 | 
			
		||||
	basePod := &v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			GenerateName: "pod-",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: testutils.MakePodSpec(),
 | 
			
		||||
	}
 | 
			
		||||
	return basePod
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeBasePersistentVolumeClaim() *v1.PersistentVolumeClaim {
 | 
			
		||||
	return &v1.PersistentVolumeClaim{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			// Name is filled in NewCreatePodWithPersistentVolumeStrategy
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				annBindCompleted: "true",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PersistentVolumeClaimSpec{
 | 
			
		||||
			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
 | 
			
		||||
			Resources: v1.ResourceRequirements{
 | 
			
		||||
				Requests: v1.ResourceList{
 | 
			
		||||
					v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func awsVolumeFactory(id int) *v1.PersistentVolume {
 | 
			
		||||
	return &v1.PersistentVolume{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: fmt.Sprintf("vol-%d", id),
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PersistentVolumeSpec{
 | 
			
		||||
			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
 | 
			
		||||
			Capacity: v1.ResourceList{
 | 
			
		||||
				v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
 | 
			
		||||
			},
 | 
			
		||||
			PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain,
 | 
			
		||||
			PersistentVolumeSource: v1.PersistentVolumeSource{
 | 
			
		||||
				AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
 | 
			
		||||
					// VolumeID must be unique for each PV, so every PV is
 | 
			
		||||
					// counted as a separate volume in MaxPDVolumeCountChecker
 | 
			
		||||
					// predicate.
 | 
			
		||||
					VolumeID: fmt.Sprintf("vol-%d", id),
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func csiVolumeFactory(id int) *v1.PersistentVolume {
 | 
			
		||||
	return &v1.PersistentVolume{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: fmt.Sprintf("vol-%d", id),
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PersistentVolumeSpec{
 | 
			
		||||
			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
 | 
			
		||||
			Capacity: v1.ResourceList{
 | 
			
		||||
				v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
 | 
			
		||||
			},
 | 
			
		||||
			PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain,
 | 
			
		||||
			PersistentVolumeSource: v1.PersistentVolumeSource{
 | 
			
		||||
				CSI: &v1.CSIPersistentVolumeSource{
 | 
			
		||||
					// Handle must be unique for each PV, so every PV is
 | 
			
		||||
					// counted as a separate volume in CSIMaxVolumeLimitChecker
 | 
			
		||||
					// predicate.
 | 
			
		||||
					VolumeHandle: fmt.Sprintf("vol-%d", id),
 | 
			
		||||
					Driver:       testCSIDriver,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -38,6 +38,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/batch/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
 | 
			
		||||
@@ -47,8 +48,10 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -232,3 +232,37 @@ func CreateResourceQuotaWithRetries(c clientset.Interface, namespace string, obj
 | 
			
		||||
	}
 | 
			
		||||
	return RetryWithExponentialBackOff(createFunc)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CreatePersistentVolumeWithRetries(c clientset.Interface, obj *v1.PersistentVolume) error {
 | 
			
		||||
	if obj == nil {
 | 
			
		||||
		return fmt.Errorf("Object provided to create is empty")
 | 
			
		||||
	}
 | 
			
		||||
	createFunc := func() (bool, error) {
 | 
			
		||||
		_, err := c.CoreV1().PersistentVolumes().Create(obj)
 | 
			
		||||
		if err == nil || apierrs.IsAlreadyExists(err) {
 | 
			
		||||
			return true, nil
 | 
			
		||||
		}
 | 
			
		||||
		if IsRetryableAPIError(err) {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	return RetryWithExponentialBackOff(createFunc)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CreatePersistentVolumeClaimWithRetries(c clientset.Interface, namespace string, obj *v1.PersistentVolumeClaim) error {
 | 
			
		||||
	if obj == nil {
 | 
			
		||||
		return fmt.Errorf("Object provided to create is empty")
 | 
			
		||||
	}
 | 
			
		||||
	createFunc := func() (bool, error) {
 | 
			
		||||
		_, err := c.CoreV1().PersistentVolumeClaims(namespace).Create(obj)
 | 
			
		||||
		if err == nil || apierrs.IsAlreadyExists(err) {
 | 
			
		||||
			return true, nil
 | 
			
		||||
		}
 | 
			
		||||
		if IsRetryableAPIError(err) {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	return RetryWithExponentialBackOff(createFunc)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ import (
 | 
			
		||||
	apps "k8s.io/api/apps/v1"
 | 
			
		||||
	batch "k8s.io/api/batch/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	storagev1beta1 "k8s.io/api/storage/v1beta1"
 | 
			
		||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
			
		||||
	apierrs "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/resource"
 | 
			
		||||
@@ -36,7 +37,9 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/json"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/strategicpatch"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
@@ -908,12 +911,22 @@ type TestNodePreparer interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type PrepareNodeStrategy interface {
 | 
			
		||||
	// Modify pre-created Node objects before the test starts.
 | 
			
		||||
	PreparePatch(node *v1.Node) []byte
 | 
			
		||||
	// Create or modify any objects that depend on the node before the test starts.
 | 
			
		||||
	// Caller will re-try when http.StatusConflict error is returned.
 | 
			
		||||
	PrepareDependentObjects(node *v1.Node, client clientset.Interface) error
 | 
			
		||||
	// Clean up any node modifications after the test finishes.
 | 
			
		||||
	CleanupNode(node *v1.Node) *v1.Node
 | 
			
		||||
	// Clean up any objects that depend on the node after the test finishes.
 | 
			
		||||
	// Caller will re-try when http.StatusConflict error is returned.
 | 
			
		||||
	CleanupDependentObjects(nodeName string, client clientset.Interface) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type TrivialNodePrepareStrategy struct{}
 | 
			
		||||
 | 
			
		||||
var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{}
 | 
			
		||||
 | 
			
		||||
func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
 | 
			
		||||
	return []byte{}
 | 
			
		||||
}
 | 
			
		||||
@@ -923,11 +936,21 @@ func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
			
		||||
	return &nodeCopy
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (*TrivialNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (*TrivialNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type LabelNodePrepareStrategy struct {
 | 
			
		||||
	labelKey   string
 | 
			
		||||
	labelValue string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{}
 | 
			
		||||
 | 
			
		||||
func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy {
 | 
			
		||||
	return &LabelNodePrepareStrategy{
 | 
			
		||||
		labelKey:   labelKey,
 | 
			
		||||
@@ -949,6 +972,148 @@ func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
			
		||||
	return nodeCopy
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (*LabelNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (*LabelNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable.
 | 
			
		||||
// csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is
 | 
			
		||||
// set to nil.
 | 
			
		||||
type NodeAllocatableStrategy struct {
 | 
			
		||||
	// Node.status.allocatable to fill to all nodes.
 | 
			
		||||
	nodeAllocatable map[v1.ResourceName]string
 | 
			
		||||
	// Map <driver_name> -> VolumeNodeResources to fill into csiNode.spec.drivers[<driver_name>].
 | 
			
		||||
	csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources
 | 
			
		||||
	// List of in-tree volume plugins migrated to CSI.
 | 
			
		||||
	migratedPlugins []string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ PrepareNodeStrategy = &NodeAllocatableStrategy{}
 | 
			
		||||
 | 
			
		||||
func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy {
 | 
			
		||||
	return &NodeAllocatableStrategy{nodeAllocatable, csiNodeAllocatable, migratedPlugins}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte {
 | 
			
		||||
	newNode := node.DeepCopy()
 | 
			
		||||
	for name, value := range s.nodeAllocatable {
 | 
			
		||||
		newNode.Status.Allocatable[name] = resource.MustParse(value)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	oldJSON, err := json.Marshal(node)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	newJSON, err := json.Marshal(newNode)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return patch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NodeAllocatableStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
			
		||||
	nodeCopy := node.DeepCopy()
 | 
			
		||||
	for name := range s.nodeAllocatable {
 | 
			
		||||
		delete(nodeCopy.Status.Allocatable, name)
 | 
			
		||||
	}
 | 
			
		||||
	return nodeCopy
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NodeAllocatableStrategy) createCSINode(nodeName string, client clientset.Interface) error {
 | 
			
		||||
	csiNode := &storagev1beta1.CSINode{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: nodeName,
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				v1.MigratedPluginsAnnotationKey: strings.Join(s.migratedPlugins, ","),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: storagev1beta1.CSINodeSpec{
 | 
			
		||||
			Drivers: []storagev1beta1.CSINodeDriver{},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for driver, allocatable := range s.csiNodeAllocatable {
 | 
			
		||||
		d := storagev1beta1.CSINodeDriver{
 | 
			
		||||
			Name:        driver,
 | 
			
		||||
			Allocatable: allocatable,
 | 
			
		||||
			NodeID:      nodeName,
 | 
			
		||||
		}
 | 
			
		||||
		csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err := client.StorageV1beta1().CSINodes().Create(csiNode)
 | 
			
		||||
	if apierrs.IsAlreadyExists(err) {
 | 
			
		||||
		// Something created CSINode instance after we checked it did not exist.
 | 
			
		||||
		// Make the caller to re-try PrepareDependentObjects by returning Conflict error
 | 
			
		||||
		err = apierrs.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err)
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NodeAllocatableStrategy) updateCSINode(csiNode *storagev1beta1.CSINode, client clientset.Interface) error {
 | 
			
		||||
	for driverName, allocatable := range s.csiNodeAllocatable {
 | 
			
		||||
		found := false
 | 
			
		||||
		for i, driver := range csiNode.Spec.Drivers {
 | 
			
		||||
			if driver.Name == driverName {
 | 
			
		||||
				found = true
 | 
			
		||||
				csiNode.Spec.Drivers[i].Allocatable = allocatable
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !found {
 | 
			
		||||
			d := storagev1beta1.CSINodeDriver{
 | 
			
		||||
				Name:        driverName,
 | 
			
		||||
				Allocatable: allocatable,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.migratedPlugins, ",")
 | 
			
		||||
 | 
			
		||||
	_, err := client.StorageV1beta1().CSINodes().Update(csiNode)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NodeAllocatableStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
			
		||||
	csiNode, err := client.StorageV1beta1().CSINodes().Get(node.Name, metav1.GetOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if apierrs.IsNotFound(err) {
 | 
			
		||||
			return s.createCSINode(node.Name, client)
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return s.updateCSINode(csiNode, client)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NodeAllocatableStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
			
		||||
	csiNode, err := client.StorageV1beta1().CSINodes().Get(nodeName, metav1.GetOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if apierrs.IsNotFound(err) {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for driverName := range s.csiNodeAllocatable {
 | 
			
		||||
		for i, driver := range csiNode.Spec.Drivers {
 | 
			
		||||
			if driver.Name == driverName {
 | 
			
		||||
				csiNode.Spec.Drivers[i].Allocatable = nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return s.updateCSINode(csiNode, client)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	patch := strategy.PreparePatch(node)
 | 
			
		||||
@@ -957,17 +1122,34 @@ func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNo
 | 
			
		||||
	}
 | 
			
		||||
	for attempt := 0; attempt < retries; attempt++ {
 | 
			
		||||
		if _, err = client.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patch)); err == nil {
 | 
			
		||||
			return nil
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if !apierrs.IsConflict(err) {
 | 
			
		||||
			return fmt.Errorf("Error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(100 * time.Millisecond)
 | 
			
		||||
	}
 | 
			
		||||
	return fmt.Errorf("To many conflicts when applying patch %v to Node %v", string(patch), node.Name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("Too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for attempt := 0; attempt < retries; attempt++ {
 | 
			
		||||
		if err = strategy.PrepareDependentObjects(node, client); err == nil {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if !apierrs.IsConflict(err) {
 | 
			
		||||
			return fmt.Errorf("Error while preparing objects for node %s: %s", node.Name, err)
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(100 * time.Millisecond)
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("Too many conflicts when creating objects for node %s: %s", node.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	for attempt := 0; attempt < retries; attempt++ {
 | 
			
		||||
		node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@@ -978,14 +1160,31 @@ func DoCleanupNode(client clientset.Interface, nodeName string, strategy Prepare
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
		if _, err = client.CoreV1().Nodes().Update(updatedNode); err == nil {
 | 
			
		||||
			return nil
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if !apierrs.IsConflict(err) {
 | 
			
		||||
			return fmt.Errorf("Error when updating Node %v: %v", nodeName, err)
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(100 * time.Millisecond)
 | 
			
		||||
	}
 | 
			
		||||
	return fmt.Errorf("To many conflicts when trying to cleanup Node %v", nodeName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("Too many conflicts when trying to cleanup Node %v: %s", nodeName, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for attempt := 0; attempt < retries; attempt++ {
 | 
			
		||||
		err = strategy.CleanupDependentObjects(nodeName, client)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if !apierrs.IsConflict(err) {
 | 
			
		||||
			return fmt.Errorf("Error when cleaning up Node %v objects: %v", nodeName, err)
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(100 * time.Millisecond)
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("Too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error
 | 
			
		||||
@@ -1077,6 +1276,70 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe
 | 
			
		||||
	return createError
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error {
 | 
			
		||||
	var createError error
 | 
			
		||||
	lock := sync.Mutex{}
 | 
			
		||||
	createPodFunc := func(i int) {
 | 
			
		||||
		pvcName := fmt.Sprintf("pvc-%d", i)
 | 
			
		||||
 | 
			
		||||
		// pv
 | 
			
		||||
		pv := factory(i)
 | 
			
		||||
		// bind to "pvc-$i"
 | 
			
		||||
		pv.Spec.ClaimRef = &v1.ObjectReference{
 | 
			
		||||
			Kind:       "PersistentVolumeClaim",
 | 
			
		||||
			Namespace:  namespace,
 | 
			
		||||
			Name:       pvcName,
 | 
			
		||||
			APIVersion: "v1",
 | 
			
		||||
		}
 | 
			
		||||
		pv.Status.Phase = v1.VolumeBound
 | 
			
		||||
		if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
 | 
			
		||||
			lock.Lock()
 | 
			
		||||
			defer lock.Unlock()
 | 
			
		||||
			createError = fmt.Errorf("error creating PV: %s", err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// pvc
 | 
			
		||||
		pvc := claimTemplate.DeepCopy()
 | 
			
		||||
		pvc.Name = pvcName
 | 
			
		||||
		// bind to "pv-$i"
 | 
			
		||||
		pvc.Spec.VolumeName = pv.Name
 | 
			
		||||
		pvc.Status.Phase = v1.ClaimBound
 | 
			
		||||
		if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
 | 
			
		||||
			lock.Lock()
 | 
			
		||||
			defer lock.Unlock()
 | 
			
		||||
			createError = fmt.Errorf("error creating PVC: %s", err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// pod
 | 
			
		||||
		pod := podTemplate.DeepCopy()
 | 
			
		||||
		pod.Spec.Volumes = []v1.Volume{
 | 
			
		||||
			{
 | 
			
		||||
				Name: "vol",
 | 
			
		||||
				VolumeSource: v1.VolumeSource{
 | 
			
		||||
					PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
 | 
			
		||||
						ClaimName: pvcName,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
		if err := makeCreatePod(client, namespace, pod); err != nil {
 | 
			
		||||
			lock.Lock()
 | 
			
		||||
			defer lock.Unlock()
 | 
			
		||||
			createError = err
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if count < 30 {
 | 
			
		||||
		workqueue.ParallelizeUntil(context.TODO(), count, count, createPodFunc)
 | 
			
		||||
	} else {
 | 
			
		||||
		workqueue.ParallelizeUntil(context.TODO(), 30, count, createPodFunc)
 | 
			
		||||
	}
 | 
			
		||||
	return createError
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
 | 
			
		||||
	rc := &v1.ReplicationController{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
@@ -1105,6 +1368,14 @@ func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// volumeFactory creates an unique PersistentVolume for given integer.
 | 
			
		||||
type volumeFactory func(uniqueID int) *v1.PersistentVolume
 | 
			
		||||
 | 
			
		||||
func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
 | 
			
		||||
	return func(client clientset.Interface, namespace string, podCount int) error {
 | 
			
		||||
		return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
 | 
			
		||||
	basePod := &v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user