mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	Init etcd and apiserver per test case in scheduler_perf integration tests
This commit is contained in:
		@@ -24,7 +24,6 @@ import (
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/meta"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
			
		||||
@@ -142,18 +141,6 @@ func (c *createAny) create(tCtx ktesting.TContext, env map[string]any) {
 | 
			
		||||
			}
 | 
			
		||||
			_, err = resourceClient.Create(tCtx, obj, options)
 | 
			
		||||
		}
 | 
			
		||||
		if err == nil && shouldCleanup(tCtx) {
 | 
			
		||||
			tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
 | 
			
		||||
				del := resourceClient.Delete
 | 
			
		||||
				if mapping.Scope.Name() != meta.RESTScopeNameNamespace {
 | 
			
		||||
					del = resourceClient.Namespace(c.Namespace).Delete
 | 
			
		||||
				}
 | 
			
		||||
				err := del(tCtx, obj.GetName(), metav1.DeleteOptions{})
 | 
			
		||||
				if !apierrors.IsNotFound(err) {
 | 
			
		||||
					tCtx.ExpectNoError(err, fmt.Sprintf("deleting %s.%s %s", obj.GetKind(), obj.GetAPIVersion(), klog.KObj(obj)))
 | 
			
		||||
				}
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	// Retry, some errors (like CRD just created and type not ready for use yet) are temporary.
 | 
			
		||||
 
 | 
			
		||||
@@ -31,8 +31,6 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/go-cmp/cmp"
 | 
			
		||||
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/meta"
 | 
			
		||||
@@ -764,32 +762,35 @@ func initTestOutput(tb testing.TB) io.Writer {
 | 
			
		||||
	return output
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type cleanupKeyType struct{}
 | 
			
		||||
 | 
			
		||||
var cleanupKey = cleanupKeyType{}
 | 
			
		||||
 | 
			
		||||
// shouldCleanup returns true if a function should clean up resource in the
 | 
			
		||||
// apiserver when the test is done. This is true for unit tests (etcd and
 | 
			
		||||
// apiserver get reused) and false for benchmarks (each benchmark starts with a
 | 
			
		||||
// clean state, so cleaning up just wastes time).
 | 
			
		||||
//
 | 
			
		||||
// The default if not explicitly set in the context is true.
 | 
			
		||||
func shouldCleanup(ctx context.Context) bool {
 | 
			
		||||
	val := ctx.Value(cleanupKey)
 | 
			
		||||
	if enabled, ok := val.(bool); ok {
 | 
			
		||||
		return enabled
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// withCleanup sets whether cleaning up resources in the apiserver
 | 
			
		||||
// should be done. The default is true.
 | 
			
		||||
func withCleanup(tCtx ktesting.TContext, enabled bool) ktesting.TContext {
 | 
			
		||||
	return ktesting.WithValue(tCtx, cleanupKey, enabled)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
 | 
			
		||||
 | 
			
		||||
func setupTestCase(t testing.TB, tc *testCase, output io.Writer, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
 | 
			
		||||
	tCtx := ktesting.Init(t, initoption.PerTestOutput(*useTestingLog))
 | 
			
		||||
 | 
			
		||||
	// Ensure that there are no leaked
 | 
			
		||||
	// goroutines.  They could influence
 | 
			
		||||
	// performance of the next benchmark.
 | 
			
		||||
	// This must *after* RedirectKlog
 | 
			
		||||
	// because then during cleanup, the
 | 
			
		||||
	// test will wait for goroutines to
 | 
			
		||||
	// quit *before* restoring klog settings.
 | 
			
		||||
	framework.GoleakCheck(t)
 | 
			
		||||
 | 
			
		||||
	// Now that we are ready to run, start
 | 
			
		||||
	// etcd.
 | 
			
		||||
	framework.StartEtcd(t, output)
 | 
			
		||||
 | 
			
		||||
	for feature, flag := range tc.FeatureGates {
 | 
			
		||||
		featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 30 minutes should be plenty enough even for the 5000-node tests.
 | 
			
		||||
	timeout := 30 * time.Minute
 | 
			
		||||
	tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
 | 
			
		||||
 | 
			
		||||
	return setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RunBenchmarkPerfScheduling runs the scheduler performance tests.
 | 
			
		||||
//
 | 
			
		||||
// You can pass your own scheduler plugins via outOfTreePluginRegistry.
 | 
			
		||||
@@ -821,33 +822,8 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
 | 
			
		||||
					if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
 | 
			
		||||
						b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter)
 | 
			
		||||
					}
 | 
			
		||||
					tCtx := ktesting.Init(b, initoption.PerTestOutput(*useTestingLog))
 | 
			
		||||
 | 
			
		||||
					// Ensure that there are no leaked
 | 
			
		||||
					// goroutines.  They could influence
 | 
			
		||||
					// performance of the next benchmark.
 | 
			
		||||
					// This must *after* RedirectKlog
 | 
			
		||||
					// because then during cleanup, the
 | 
			
		||||
					// test will wait for goroutines to
 | 
			
		||||
					// quit *before* restoring klog settings.
 | 
			
		||||
					framework.GoleakCheck(b)
 | 
			
		||||
 | 
			
		||||
					// Now that we are ready to run, start
 | 
			
		||||
					// etcd.
 | 
			
		||||
					framework.StartEtcd(b, output)
 | 
			
		||||
 | 
			
		||||
					// 30 minutes should be plenty enough even for the 5000-node tests.
 | 
			
		||||
					timeout := 30 * time.Minute
 | 
			
		||||
					tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
 | 
			
		||||
 | 
			
		||||
					for feature, flag := range tc.FeatureGates {
 | 
			
		||||
						featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)
 | 
			
		||||
					}
 | 
			
		||||
					informerFactory, tCtx := setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
 | 
			
		||||
 | 
			
		||||
					// No need to clean up, each benchmark testcase starts with an empty
 | 
			
		||||
					// etcd database.
 | 
			
		||||
					tCtx = withCleanup(tCtx, false)
 | 
			
		||||
					informerFactory, tCtx := setupTestCase(b, tc, output, outOfTreePluginRegistry)
 | 
			
		||||
 | 
			
		||||
					results := runWorkload(tCtx, tc, w, informerFactory)
 | 
			
		||||
					dataItems.DataItems = append(dataItems.DataItems, results...)
 | 
			
		||||
@@ -889,16 +865,6 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
 | 
			
		||||
 | 
			
		||||
var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
 | 
			
		||||
 | 
			
		||||
type schedulerConfig struct {
 | 
			
		||||
	schedulerConfigPath string
 | 
			
		||||
	featureGates        map[featuregate.Feature]bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c schedulerConfig) equals(tc *testCase) bool {
 | 
			
		||||
	return c.schedulerConfigPath == tc.SchedulerConfigPath &&
 | 
			
		||||
		cmp.Equal(c.featureGates, tc.FeatureGates)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
 | 
			
		||||
	data, err := os.ReadFile(file)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -997,7 +963,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
 | 
			
		||||
			b.ReportMetric(duration.Seconds(), "runtime_seconds")
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	cleanup := shouldCleanup(tCtx)
 | 
			
		||||
 | 
			
		||||
	// Disable error checking of the sampling interval length in the
 | 
			
		||||
	// throughput collector by default. When running benchmarks, report
 | 
			
		||||
@@ -1028,11 +993,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
 | 
			
		||||
	// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
 | 
			
		||||
	numPodsScheduledPerNamespace := make(map[string]int)
 | 
			
		||||
 | 
			
		||||
	if cleanup {
 | 
			
		||||
		// This must run before controllers get shut down.
 | 
			
		||||
		defer cleanupWorkload(tCtx, tc, numPodsScheduledPerNamespace)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
 | 
			
		||||
		realOp, err := op.realOp.patchParams(w)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@@ -1052,13 +1012,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
 | 
			
		||||
			if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
 | 
			
		||||
				tCtx.Fatalf("op %d: %v", opIndex, err)
 | 
			
		||||
			}
 | 
			
		||||
			if cleanup {
 | 
			
		||||
				defer func() {
 | 
			
		||||
					if err := nodePreparer.CleanupNodes(tCtx); err != nil {
 | 
			
		||||
						tCtx.Fatalf("failed to clean up nodes, error: %v", err)
 | 
			
		||||
					}
 | 
			
		||||
				}()
 | 
			
		||||
			}
 | 
			
		||||
			nextNodeIndex += concreteOp.Count
 | 
			
		||||
 | 
			
		||||
		case *createNamespacesOp:
 | 
			
		||||
@@ -1333,51 +1286,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
 | 
			
		||||
	return dataItems
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// cleanupWorkload ensures that everything is removed from the API server that
 | 
			
		||||
// might have been created by runWorkload. This must be done before starting
 | 
			
		||||
// the next workload because otherwise it might stumble over previously created
 | 
			
		||||
// objects. For example, the namespaces are the same in different workloads, so
 | 
			
		||||
// not deleting them would cause the next one to fail with "cannot create
 | 
			
		||||
// namespace: already exists".
 | 
			
		||||
//
 | 
			
		||||
// Calling cleanupWorkload can be skipped if it is known that the next workload
 | 
			
		||||
// will run with a fresh etcd instance.
 | 
			
		||||
func cleanupWorkload(tCtx ktesting.TContext, tc *testCase, numPodsScheduledPerNamespace map[string]int) {
 | 
			
		||||
	deleteNow := *metav1.NewDeleteOptions(0)
 | 
			
		||||
	for namespace := range numPodsScheduledPerNamespace {
 | 
			
		||||
		// Pods have to be deleted explicitly, with no grace period. Normally
 | 
			
		||||
		// kubelet will set the DeletionGracePeriodSeconds to zero when it's okay
 | 
			
		||||
		// to remove a deleted pod, but we don't run kubelet...
 | 
			
		||||
		if err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, deleteNow, metav1.ListOptions{}); err != nil {
 | 
			
		||||
			tCtx.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
 | 
			
		||||
		}
 | 
			
		||||
		if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, namespace, deleteNow); err != nil {
 | 
			
		||||
			tCtx.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// We need to wait here because even with deletion timestamp set,
 | 
			
		||||
	// actually removing a namespace can take some time (garbage collecting
 | 
			
		||||
	// other generated object like secrets, etc.) and we don't want to
 | 
			
		||||
	// start the next workloads while that cleanup is still going on.
 | 
			
		||||
	if err := wait.PollUntilContextTimeout(tCtx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
 | 
			
		||||
		namespaces, err := tCtx.Client().CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
		for _, namespace := range namespaces.Items {
 | 
			
		||||
			if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok {
 | 
			
		||||
				// A namespace created by the workload, need to wait.
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// All namespaces gone.
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		tCtx.Fatalf("failed while waiting for namespace removal: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
 | 
			
		||||
	if _, ok := (*podsPerNamespace)[namespace]; !ok {
 | 
			
		||||
		// The namespace has not created yet.
 | 
			
		||||
 
 | 
			
		||||
@@ -18,11 +18,6 @@ package benchmark
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	"k8s.io/kubernetes/test/integration/framework"
 | 
			
		||||
	"k8s.io/kubernetes/test/utils/ktesting"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestScheduling(t *testing.T) {
 | 
			
		||||
@@ -34,69 +29,18 @@ func TestScheduling(t *testing.T) {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check for leaks at the very end.
 | 
			
		||||
	framework.GoleakCheck(t)
 | 
			
		||||
 | 
			
		||||
	// All integration test cases share the same etcd, similar to
 | 
			
		||||
	// https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222.
 | 
			
		||||
	framework.StartEtcd(t, nil)
 | 
			
		||||
 | 
			
		||||
	// Workloads with the same configuration share the same apiserver. For that
 | 
			
		||||
	// we first need to determine what those different configs are.
 | 
			
		||||
	var configs []schedulerConfig
 | 
			
		||||
	for _, tc := range testCases {
 | 
			
		||||
		tcEnabled := false
 | 
			
		||||
		for _, w := range tc.Workloads {
 | 
			
		||||
			if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
 | 
			
		||||
				tcEnabled = true
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !tcEnabled {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		exists := false
 | 
			
		||||
		for _, config := range configs {
 | 
			
		||||
			if config.equals(tc) {
 | 
			
		||||
				exists = true
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !exists {
 | 
			
		||||
			configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, config := range configs {
 | 
			
		||||
		// Not a sub test because we don't have a good name for it.
 | 
			
		||||
		func() {
 | 
			
		||||
			tCtx := ktesting.Init(t)
 | 
			
		||||
 | 
			
		||||
			// No timeout here because the `go test -timeout` will ensure that
 | 
			
		||||
			// the test doesn't get stuck forever.
 | 
			
		||||
 | 
			
		||||
			for feature, flag := range config.featureGates {
 | 
			
		||||
				featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)
 | 
			
		||||
			}
 | 
			
		||||
			informerFactory, tCtx := setupClusterForWorkload(tCtx, config.schedulerConfigPath, config.featureGates, nil)
 | 
			
		||||
 | 
			
		||||
			for _, tc := range testCases {
 | 
			
		||||
				if !config.equals(tc) {
 | 
			
		||||
					// Runs with some other config.
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				t.Run(tc.Name, func(t *testing.T) {
 | 
			
		||||
					for _, w := range tc.Workloads {
 | 
			
		||||
						t.Run(w.Name, func(t *testing.T) {
 | 
			
		||||
							if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
 | 
			
		||||
								t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
 | 
			
		||||
							}
 | 
			
		||||
							tCtx := ktesting.WithTB(tCtx, t)
 | 
			
		||||
							runWorkload(tCtx, tc, w, informerFactory)
 | 
			
		||||
						})
 | 
			
		||||
		t.Run(tc.Name, func(t *testing.T) {
 | 
			
		||||
			for _, w := range tc.Workloads {
 | 
			
		||||
				t.Run(w.Name, func(t *testing.T) {
 | 
			
		||||
					if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
 | 
			
		||||
						t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
 | 
			
		||||
					}
 | 
			
		||||
					informerFactory, tCtx := setupTestCase(t, tc, nil, nil)
 | 
			
		||||
 | 
			
		||||
					runWorkload(tCtx, tc, w, informerFactory)
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user