mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Merge pull request #77397 from jsafrane/scheduler-benchmark-volumes
Add benchmark for scheduling of pods with PVs
This commit is contained in:
		| @@ -33,11 +33,17 @@ go_test( | ||||
|     embed = [":go_default_library"], | ||||
|     tags = ["integration"], | ||||
|     deps = [ | ||||
|         "//pkg/features:go_default_library", | ||||
|         "//pkg/scheduler/factory:go_default_library", | ||||
|         "//pkg/volume/util:go_default_library", | ||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", | ||||
|         "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", | ||||
|         "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", | ||||
|         "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", | ||||
|         "//test/integration/framework:go_default_library", | ||||
|         "//test/utils:go_default_library", | ||||
|         "//vendor/k8s.io/klog:go_default_library", | ||||
|   | ||||
| @@ -22,16 +22,27 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	storagev1beta1 "k8s.io/api/storage/v1beta1" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	featuregatetesting "k8s.io/component-base/featuregate/testing" | ||||
| 	"k8s.io/csi-translation-lib/plugins" | ||||
| 	csilibplugins "k8s.io/csi-translation-lib/plugins" | ||||
| 	"k8s.io/klog" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/volume/util" | ||||
| 	"k8s.io/kubernetes/test/integration/framework" | ||||
| 	testutils "k8s.io/kubernetes/test/utils" | ||||
|  | ||||
| 	"k8s.io/klog" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	defaultNodeStrategy = &testutils.TrivialNodePrepareStrategy{} | ||||
|  | ||||
| 	testCSIDriver = plugins.AWSEBSDriverName | ||||
| 	// From PV controller | ||||
| 	annBindCompleted = "pv.kubernetes.io/bind-completed" | ||||
| ) | ||||
|  | ||||
| // BenchmarkScheduling benchmarks the scheduling rate when the cluster has | ||||
| @@ -79,6 +90,134 @@ func BenchmarkSchedulingPodAntiAffinity(b *testing.B) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // BenchmarkSchedulingSecrets benchmarks the scheduling rate of pods with | ||||
| // volumes that don't require any special handling, such as Secrets. | ||||
| // It can be used to compare scheduler efficiency with the other benchmarks | ||||
| // that use volume scheduling predicates. | ||||
| func BenchmarkSchedulingSecrets(b *testing.B) { | ||||
| 	tests := []struct{ nodes, existingPods, minPods int }{ | ||||
| 		{nodes: 500, existingPods: 250, minPods: 250}, | ||||
| 		{nodes: 500, existingPods: 5000, minPods: 250}, | ||||
| 		{nodes: 1000, existingPods: 1000, minPods: 500}, | ||||
| 		{nodes: 5000, existingPods: 1000, minPods: 1000}, | ||||
| 	} | ||||
| 	// The setup strategy creates pods with no volumes. | ||||
| 	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") | ||||
| 	// The test strategy creates pods with a secret. | ||||
| 	testBasePod := makeBasePodWithSecret() | ||||
| 	testStrategy := testutils.NewCustomCreatePodStrategy(testBasePod) | ||||
| 	for _, test := range tests { | ||||
| 		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) | ||||
| 		b.Run(name, func(b *testing.B) { | ||||
| 			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, defaultNodeStrategy, setupStrategy, testStrategy, b) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // BenchmarkSchedulingInTreePVs benchmarks the scheduling rate of pods with | ||||
| // in-tree volumes (used via PV/PVC). Nodes have default hardcoded attach limits | ||||
| // (39 for AWS EBS). | ||||
| func BenchmarkSchedulingInTreePVs(b *testing.B) { | ||||
| 	tests := []struct{ nodes, existingPods, minPods int }{ | ||||
| 		{nodes: 500, existingPods: 250, minPods: 250}, | ||||
| 		{nodes: 500, existingPods: 5000, minPods: 250}, | ||||
| 		{nodes: 1000, existingPods: 1000, minPods: 500}, | ||||
| 		{nodes: 5000, existingPods: 1000, minPods: 1000}, | ||||
| 	} | ||||
| 	// The setup strategy creates pods with no volumes. | ||||
| 	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") | ||||
|  | ||||
| 	// The test strategy creates pods with AWS EBS volume used via PV. | ||||
| 	baseClaim := makeBasePersistentVolumeClaim() | ||||
| 	basePod := makeBasePod() | ||||
| 	testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, awsVolumeFactory, basePod) | ||||
| 	for _, test := range tests { | ||||
| 		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) | ||||
| 		b.Run(name, func(b *testing.B) { | ||||
| 			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, defaultNodeStrategy, setupStrategy, testStrategy, b) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // BenchmarkSchedulingMigratedInTreePVs benchmarks the scheduling rate of pods with | ||||
| // in-tree volumes (used via PV/PVC) that are migrated to CSI. CSINode instances exist | ||||
| // for all nodes and have proper annotation that AWS is migrated. | ||||
| func BenchmarkSchedulingMigratedInTreePVs(b *testing.B) { | ||||
| 	tests := []struct{ nodes, existingPods, minPods int }{ | ||||
| 		{nodes: 500, existingPods: 250, minPods: 250}, | ||||
| 		{nodes: 500, existingPods: 5000, minPods: 250}, | ||||
| 		{nodes: 1000, existingPods: 1000, minPods: 500}, | ||||
| 		{nodes: 5000, existingPods: 1000, minPods: 1000}, | ||||
| 	} | ||||
| 	// The setup strategy creates pods with no volumes. | ||||
| 	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") | ||||
|  | ||||
| 	// The test strategy creates pods with AWS EBS volume used via PV. | ||||
| 	baseClaim := makeBasePersistentVolumeClaim() | ||||
| 	basePod := makeBasePod() | ||||
| 	testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, awsVolumeFactory, basePod) | ||||
|  | ||||
| 	// Each node can use the same amount of CSI volumes as in-tree AWS volume | ||||
| 	// plugin, so the results should be comparable with BenchmarkSchedulingInTreePVs. | ||||
| 	driverKey := util.GetCSIAttachLimitKey(testCSIDriver) | ||||
| 	allocatable := map[v1.ResourceName]string{ | ||||
| 		v1.ResourceName(driverKey): fmt.Sprintf("%d", util.DefaultMaxEBSVolumes), | ||||
| 	} | ||||
| 	var count int32 = util.DefaultMaxEBSVolumes | ||||
| 	csiAllocatable := map[string]*storagev1beta1.VolumeNodeResources{ | ||||
| 		testCSIDriver: { | ||||
| 			Count: &count, | ||||
| 		}, | ||||
| 	} | ||||
| 	nodeStrategy := testutils.NewNodeAllocatableStrategy(allocatable, csiAllocatable, []string{csilibplugins.AWSEBSInTreePluginName}) | ||||
| 	for _, test := range tests { | ||||
| 		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) | ||||
| 		b.Run(name, func(b *testing.B) { | ||||
| 			defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() | ||||
| 			defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() | ||||
| 			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, nodeStrategy, setupStrategy, testStrategy, b) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // node.status.allocatable. | ||||
| func BenchmarkSchedulingCSIPVs(b *testing.B) { | ||||
| 	tests := []struct{ nodes, existingPods, minPods int }{ | ||||
| 		{nodes: 500, existingPods: 250, minPods: 250}, | ||||
| 		{nodes: 500, existingPods: 5000, minPods: 250}, | ||||
| 		{nodes: 1000, existingPods: 1000, minPods: 500}, | ||||
| 		{nodes: 5000, existingPods: 1000, minPods: 1000}, | ||||
| 	} | ||||
|  | ||||
| 	// The setup strategy creates pods with no volumes. | ||||
| 	setupStrategy := testutils.NewSimpleWithControllerCreatePodStrategy("setup") | ||||
|  | ||||
| 	// The test strategy creates pods with CSI volume via PV. | ||||
| 	baseClaim := makeBasePersistentVolumeClaim() | ||||
| 	basePod := makeBasePod() | ||||
| 	testStrategy := testutils.NewCreatePodWithPersistentVolumeStrategy(baseClaim, csiVolumeFactory, basePod) | ||||
|  | ||||
| 	// Each node can use the same amount of CSI volumes as in-tree AWS volume | ||||
| 	// plugin, so the results should be comparable with BenchmarkSchedulingInTreePVs. | ||||
| 	driverKey := util.GetCSIAttachLimitKey(testCSIDriver) | ||||
| 	allocatable := map[v1.ResourceName]string{ | ||||
| 		v1.ResourceName(driverKey): fmt.Sprintf("%d", util.DefaultMaxEBSVolumes), | ||||
| 	} | ||||
| 	var count int32 = util.DefaultMaxEBSVolumes | ||||
| 	csiAllocatable := map[string]*storagev1beta1.VolumeNodeResources{ | ||||
| 		testCSIDriver: { | ||||
| 			Count: &count, | ||||
| 		}, | ||||
| 	} | ||||
| 	nodeStrategy := testutils.NewNodeAllocatableStrategy(allocatable, csiAllocatable, []string{}) | ||||
| 	for _, test := range tests { | ||||
| 		name := fmt.Sprintf("%vNodes/%vPods", test.nodes, test.existingPods) | ||||
| 		b.Run(name, func(b *testing.B) { | ||||
| 			benchmarkScheduling(test.nodes, test.existingPods, test.minPods, nodeStrategy, setupStrategy, testStrategy, b) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // BenchmarkSchedulingPodAffinity benchmarks the scheduling rate of pods with | ||||
| // PodAffinity rules when the cluster has various quantities of nodes and | ||||
| // scheduled pods. | ||||
| @@ -265,8 +404,110 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int, | ||||
| 		if len(scheduled) >= numExistingPods+b.N { | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		// Note: This might introduce slight deviation in accuracy of benchmark results. | ||||
| 		// Since the total amount of time is relatively large, it might not be a concern. | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // makeBasePodWithSecrets creates a Pod object to be used as a template. | ||||
| // The pod uses a single Secrets volume. | ||||
| func makeBasePodWithSecret() *v1.Pod { | ||||
| 	basePod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			GenerateName: "secret-volume-", | ||||
| 		}, | ||||
| 		Spec: testutils.MakePodSpec(), | ||||
| 	} | ||||
|  | ||||
| 	volumes := []v1.Volume{ | ||||
| 		{ | ||||
| 			Name: "secret", | ||||
| 			VolumeSource: v1.VolumeSource{ | ||||
| 				Secret: &v1.SecretVolumeSource{ | ||||
| 					SecretName: "secret", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	basePod.Spec.Volumes = volumes | ||||
| 	return basePod | ||||
| } | ||||
|  | ||||
| // makeBasePod creates a Pod object to be used as a template. | ||||
| func makeBasePod() *v1.Pod { | ||||
| 	basePod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			GenerateName: "pod-", | ||||
| 		}, | ||||
| 		Spec: testutils.MakePodSpec(), | ||||
| 	} | ||||
| 	return basePod | ||||
| } | ||||
|  | ||||
| func makeBasePersistentVolumeClaim() *v1.PersistentVolumeClaim { | ||||
| 	return &v1.PersistentVolumeClaim{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			// Name is filled in NewCreatePodWithPersistentVolumeStrategy | ||||
| 			Annotations: map[string]string{ | ||||
| 				annBindCompleted: "true", | ||||
| 			}, | ||||
| 		}, | ||||
| 		Spec: v1.PersistentVolumeClaimSpec{ | ||||
| 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, | ||||
| 			Resources: v1.ResourceRequirements{ | ||||
| 				Requests: v1.ResourceList{ | ||||
| 					v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func awsVolumeFactory(id int) *v1.PersistentVolume { | ||||
| 	return &v1.PersistentVolume{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: fmt.Sprintf("vol-%d", id), | ||||
| 		}, | ||||
| 		Spec: v1.PersistentVolumeSpec{ | ||||
| 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, | ||||
| 			Capacity: v1.ResourceList{ | ||||
| 				v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), | ||||
| 			}, | ||||
| 			PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, | ||||
| 			PersistentVolumeSource: v1.PersistentVolumeSource{ | ||||
| 				AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ | ||||
| 					// VolumeID must be unique for each PV, so every PV is | ||||
| 					// counted as a separate volume in MaxPDVolumeCountChecker | ||||
| 					// predicate. | ||||
| 					VolumeID: fmt.Sprintf("vol-%d", id), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func csiVolumeFactory(id int) *v1.PersistentVolume { | ||||
| 	return &v1.PersistentVolume{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: fmt.Sprintf("vol-%d", id), | ||||
| 		}, | ||||
| 		Spec: v1.PersistentVolumeSpec{ | ||||
| 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, | ||||
| 			Capacity: v1.ResourceList{ | ||||
| 				v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"), | ||||
| 			}, | ||||
| 			PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, | ||||
| 			PersistentVolumeSource: v1.PersistentVolumeSource{ | ||||
| 				CSI: &v1.CSIPersistentVolumeSource{ | ||||
| 					// Handle must be unique for each PV, so every PV is | ||||
| 					// counted as a separate volume in CSIMaxVolumeLimitChecker | ||||
| 					// predicate. | ||||
| 					VolumeHandle: fmt.Sprintf("vol-%d", id), | ||||
| 					Driver:       testCSIDriver, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -38,6 +38,7 @@ go_library( | ||||
|         "//staging/src/k8s.io/api/auditregistration/v1alpha1:go_default_library", | ||||
|         "//staging/src/k8s.io/api/batch/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", | ||||
| @@ -47,8 +48,10 @@ go_library( | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", | ||||
|   | ||||
| @@ -232,3 +232,37 @@ func CreateResourceQuotaWithRetries(c clientset.Interface, namespace string, obj | ||||
| 	} | ||||
| 	return RetryWithExponentialBackOff(createFunc) | ||||
| } | ||||
|  | ||||
| func CreatePersistentVolumeWithRetries(c clientset.Interface, obj *v1.PersistentVolume) error { | ||||
| 	if obj == nil { | ||||
| 		return fmt.Errorf("Object provided to create is empty") | ||||
| 	} | ||||
| 	createFunc := func() (bool, error) { | ||||
| 		_, err := c.CoreV1().PersistentVolumes().Create(obj) | ||||
| 		if err == nil || apierrs.IsAlreadyExists(err) { | ||||
| 			return true, nil | ||||
| 		} | ||||
| 		if IsRetryableAPIError(err) { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err) | ||||
| 	} | ||||
| 	return RetryWithExponentialBackOff(createFunc) | ||||
| } | ||||
|  | ||||
| func CreatePersistentVolumeClaimWithRetries(c clientset.Interface, namespace string, obj *v1.PersistentVolumeClaim) error { | ||||
| 	if obj == nil { | ||||
| 		return fmt.Errorf("Object provided to create is empty") | ||||
| 	} | ||||
| 	createFunc := func() (bool, error) { | ||||
| 		_, err := c.CoreV1().PersistentVolumeClaims(namespace).Create(obj) | ||||
| 		if err == nil || apierrs.IsAlreadyExists(err) { | ||||
| 			return true, nil | ||||
| 		} | ||||
| 		if IsRetryableAPIError(err) { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return false, fmt.Errorf("Failed to create object with non-retriable error: %v", err) | ||||
| 	} | ||||
| 	return RetryWithExponentialBackOff(createFunc) | ||||
| } | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	apps "k8s.io/api/apps/v1" | ||||
| 	batch "k8s.io/api/batch/v1" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	storagev1beta1 "k8s.io/api/storage/v1beta1" | ||||
| 	apiequality "k8s.io/apimachinery/pkg/api/equality" | ||||
| 	apierrs "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| @@ -36,7 +37,9 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/json" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/strategicpatch" | ||||
| 	"k8s.io/apimachinery/pkg/util/uuid" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| @@ -908,12 +911,22 @@ type TestNodePreparer interface { | ||||
| } | ||||
|  | ||||
| type PrepareNodeStrategy interface { | ||||
| 	// Modify pre-created Node objects before the test starts. | ||||
| 	PreparePatch(node *v1.Node) []byte | ||||
| 	// Create or modify any objects that depend on the node before the test starts. | ||||
| 	// Caller will re-try when http.StatusConflict error is returned. | ||||
| 	PrepareDependentObjects(node *v1.Node, client clientset.Interface) error | ||||
| 	// Clean up any node modifications after the test finishes. | ||||
| 	CleanupNode(node *v1.Node) *v1.Node | ||||
| 	// Clean up any objects that depend on the node after the test finishes. | ||||
| 	// Caller will re-try when http.StatusConflict error is returned. | ||||
| 	CleanupDependentObjects(nodeName string, client clientset.Interface) error | ||||
| } | ||||
|  | ||||
| type TrivialNodePrepareStrategy struct{} | ||||
|  | ||||
| var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{} | ||||
|  | ||||
| func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte { | ||||
| 	return []byte{} | ||||
| } | ||||
| @@ -923,11 +936,21 @@ func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node { | ||||
| 	return &nodeCopy | ||||
| } | ||||
|  | ||||
| func (*TrivialNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (*TrivialNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| type LabelNodePrepareStrategy struct { | ||||
| 	labelKey   string | ||||
| 	labelValue string | ||||
| } | ||||
|  | ||||
| var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{} | ||||
|  | ||||
| func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy { | ||||
| 	return &LabelNodePrepareStrategy{ | ||||
| 		labelKey:   labelKey, | ||||
| @@ -949,6 +972,148 @@ func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node { | ||||
| 	return nodeCopy | ||||
| } | ||||
|  | ||||
| func (*LabelNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (*LabelNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable. | ||||
| // csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is | ||||
| // set to nil. | ||||
| type NodeAllocatableStrategy struct { | ||||
| 	// Node.status.allocatable to fill to all nodes. | ||||
| 	nodeAllocatable map[v1.ResourceName]string | ||||
| 	// Map <driver_name> -> VolumeNodeResources to fill into csiNode.spec.drivers[<driver_name>]. | ||||
| 	csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources | ||||
| 	// List of in-tree volume plugins migrated to CSI. | ||||
| 	migratedPlugins []string | ||||
| } | ||||
|  | ||||
| var _ PrepareNodeStrategy = &NodeAllocatableStrategy{} | ||||
|  | ||||
| func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1beta1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy { | ||||
| 	return &NodeAllocatableStrategy{nodeAllocatable, csiNodeAllocatable, migratedPlugins} | ||||
| } | ||||
|  | ||||
| func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte { | ||||
| 	newNode := node.DeepCopy() | ||||
| 	for name, value := range s.nodeAllocatable { | ||||
| 		newNode.Status.Allocatable[name] = resource.MustParse(value) | ||||
| 	} | ||||
|  | ||||
| 	oldJSON, err := json.Marshal(node) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	newJSON, err := json.Marshal(newNode) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
|  | ||||
| 	patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{}) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	return patch | ||||
| } | ||||
|  | ||||
| func (s *NodeAllocatableStrategy) CleanupNode(node *v1.Node) *v1.Node { | ||||
| 	nodeCopy := node.DeepCopy() | ||||
| 	for name := range s.nodeAllocatable { | ||||
| 		delete(nodeCopy.Status.Allocatable, name) | ||||
| 	} | ||||
| 	return nodeCopy | ||||
| } | ||||
|  | ||||
| func (s *NodeAllocatableStrategy) createCSINode(nodeName string, client clientset.Interface) error { | ||||
| 	csiNode := &storagev1beta1.CSINode{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: nodeName, | ||||
| 			Annotations: map[string]string{ | ||||
| 				v1.MigratedPluginsAnnotationKey: strings.Join(s.migratedPlugins, ","), | ||||
| 			}, | ||||
| 		}, | ||||
| 		Spec: storagev1beta1.CSINodeSpec{ | ||||
| 			Drivers: []storagev1beta1.CSINodeDriver{}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for driver, allocatable := range s.csiNodeAllocatable { | ||||
| 		d := storagev1beta1.CSINodeDriver{ | ||||
| 			Name:        driver, | ||||
| 			Allocatable: allocatable, | ||||
| 			NodeID:      nodeName, | ||||
| 		} | ||||
| 		csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d) | ||||
| 	} | ||||
|  | ||||
| 	_, err := client.StorageV1beta1().CSINodes().Create(csiNode) | ||||
| 	if apierrs.IsAlreadyExists(err) { | ||||
| 		// Something created CSINode instance after we checked it did not exist. | ||||
| 		// Make the caller to re-try PrepareDependentObjects by returning Conflict error | ||||
| 		err = apierrs.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err) | ||||
| 	} | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *NodeAllocatableStrategy) updateCSINode(csiNode *storagev1beta1.CSINode, client clientset.Interface) error { | ||||
| 	for driverName, allocatable := range s.csiNodeAllocatable { | ||||
| 		found := false | ||||
| 		for i, driver := range csiNode.Spec.Drivers { | ||||
| 			if driver.Name == driverName { | ||||
| 				found = true | ||||
| 				csiNode.Spec.Drivers[i].Allocatable = allocatable | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !found { | ||||
| 			d := storagev1beta1.CSINodeDriver{ | ||||
| 				Name:        driverName, | ||||
| 				Allocatable: allocatable, | ||||
| 			} | ||||
|  | ||||
| 			csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d) | ||||
| 		} | ||||
| 	} | ||||
| 	csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.migratedPlugins, ",") | ||||
|  | ||||
| 	_, err := client.StorageV1beta1().CSINodes().Update(csiNode) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *NodeAllocatableStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error { | ||||
| 	csiNode, err := client.StorageV1beta1().CSINodes().Get(node.Name, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		if apierrs.IsNotFound(err) { | ||||
| 			return s.createCSINode(node.Name, client) | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
| 	return s.updateCSINode(csiNode, client) | ||||
| } | ||||
|  | ||||
| func (s *NodeAllocatableStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error { | ||||
| 	csiNode, err := client.StorageV1beta1().CSINodes().Get(nodeName, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		if apierrs.IsNotFound(err) { | ||||
| 			return nil | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	for driverName := range s.csiNodeAllocatable { | ||||
| 		for i, driver := range csiNode.Spec.Drivers { | ||||
| 			if driver.Name == driverName { | ||||
| 				csiNode.Spec.Drivers[i].Allocatable = nil | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return s.updateCSINode(csiNode, client) | ||||
| } | ||||
|  | ||||
| func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error { | ||||
| 	var err error | ||||
| 	patch := strategy.PreparePatch(node) | ||||
| @@ -957,17 +1122,34 @@ func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNo | ||||
| 	} | ||||
| 	for attempt := 0; attempt < retries; attempt++ { | ||||
| 		if _, err = client.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patch)); err == nil { | ||||
| 			return nil | ||||
| 			break | ||||
| 		} | ||||
| 		if !apierrs.IsConflict(err) { | ||||
| 			return fmt.Errorf("Error while applying patch %v to Node %v: %v", string(patch), node.Name, err) | ||||
| 		} | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 	} | ||||
| 	return fmt.Errorf("To many conflicts when applying patch %v to Node %v", string(patch), node.Name) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err) | ||||
| 	} | ||||
|  | ||||
| 	for attempt := 0; attempt < retries; attempt++ { | ||||
| 		if err = strategy.PrepareDependentObjects(node, client); err == nil { | ||||
| 			break | ||||
| 		} | ||||
| 		if !apierrs.IsConflict(err) { | ||||
| 			return fmt.Errorf("Error while preparing objects for node %s: %s", node.Name, err) | ||||
| 		} | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Too many conflicts when creating objects for node %s: %s", node.Name, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error { | ||||
| 	var err error | ||||
| 	for attempt := 0; attempt < retries; attempt++ { | ||||
| 		node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) | ||||
| 		if err != nil { | ||||
| @@ -978,14 +1160,31 @@ func DoCleanupNode(client clientset.Interface, nodeName string, strategy Prepare | ||||
| 			return nil | ||||
| 		} | ||||
| 		if _, err = client.CoreV1().Nodes().Update(updatedNode); err == nil { | ||||
| 			return nil | ||||
| 			break | ||||
| 		} | ||||
| 		if !apierrs.IsConflict(err) { | ||||
| 			return fmt.Errorf("Error when updating Node %v: %v", nodeName, err) | ||||
| 		} | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 	} | ||||
| 	return fmt.Errorf("To many conflicts when trying to cleanup Node %v", nodeName) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Too many conflicts when trying to cleanup Node %v: %s", nodeName, err) | ||||
| 	} | ||||
|  | ||||
| 	for attempt := 0; attempt < retries; attempt++ { | ||||
| 		err = strategy.CleanupDependentObjects(nodeName, client) | ||||
| 		if err == nil { | ||||
| 			break | ||||
| 		} | ||||
| 		if !apierrs.IsConflict(err) { | ||||
| 			return fmt.Errorf("Error when cleaning up Node %v objects: %v", nodeName, err) | ||||
| 		} | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error | ||||
| @@ -1077,6 +1276,70 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe | ||||
| 	return createError | ||||
| } | ||||
|  | ||||
| func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int) error { | ||||
| 	var createError error | ||||
| 	lock := sync.Mutex{} | ||||
| 	createPodFunc := func(i int) { | ||||
| 		pvcName := fmt.Sprintf("pvc-%d", i) | ||||
|  | ||||
| 		// pv | ||||
| 		pv := factory(i) | ||||
| 		// bind to "pvc-$i" | ||||
| 		pv.Spec.ClaimRef = &v1.ObjectReference{ | ||||
| 			Kind:       "PersistentVolumeClaim", | ||||
| 			Namespace:  namespace, | ||||
| 			Name:       pvcName, | ||||
| 			APIVersion: "v1", | ||||
| 		} | ||||
| 		pv.Status.Phase = v1.VolumeBound | ||||
| 		if err := CreatePersistentVolumeWithRetries(client, pv); err != nil { | ||||
| 			lock.Lock() | ||||
| 			defer lock.Unlock() | ||||
| 			createError = fmt.Errorf("error creating PV: %s", err) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// pvc | ||||
| 		pvc := claimTemplate.DeepCopy() | ||||
| 		pvc.Name = pvcName | ||||
| 		// bind to "pv-$i" | ||||
| 		pvc.Spec.VolumeName = pv.Name | ||||
| 		pvc.Status.Phase = v1.ClaimBound | ||||
| 		if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil { | ||||
| 			lock.Lock() | ||||
| 			defer lock.Unlock() | ||||
| 			createError = fmt.Errorf("error creating PVC: %s", err) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// pod | ||||
| 		pod := podTemplate.DeepCopy() | ||||
| 		pod.Spec.Volumes = []v1.Volume{ | ||||
| 			{ | ||||
| 				Name: "vol", | ||||
| 				VolumeSource: v1.VolumeSource{ | ||||
| 					PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ | ||||
| 						ClaimName: pvcName, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		if err := makeCreatePod(client, namespace, pod); err != nil { | ||||
| 			lock.Lock() | ||||
| 			defer lock.Unlock() | ||||
| 			createError = err | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if count < 30 { | ||||
| 		workqueue.ParallelizeUntil(context.TODO(), count, count, createPodFunc) | ||||
| 	} else { | ||||
| 		workqueue.ParallelizeUntil(context.TODO(), 30, count, createPodFunc) | ||||
| 	} | ||||
| 	return createError | ||||
| } | ||||
|  | ||||
| func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error { | ||||
| 	rc := &v1.ReplicationController{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| @@ -1105,6 +1368,14 @@ func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // volumeFactory creates an unique PersistentVolume for given integer. | ||||
| type volumeFactory func(uniqueID int) *v1.PersistentVolume | ||||
|  | ||||
| func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy { | ||||
| 	return func(client clientset.Interface, namespace string, podCount int) error { | ||||
| 		return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount) | ||||
| 	} | ||||
| } | ||||
| func NewSimpleCreatePodStrategy() TestPodCreateStrategy { | ||||
| 	basePod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot