Introduce a churnOp to scheduler perf testing framework

- support two modes: recreate and create
- use DynmaicClient to create API objects
This commit is contained in:
Wei Huang
2021-02-08 16:53:13 -08:00
parent 6404eda8de
commit 1e5878b910
9 changed files with 276 additions and 13 deletions

View File

@@ -21,16 +21,25 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"strings"
"sync"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
@@ -43,7 +52,15 @@ const (
configFile = "config/performance-config.yaml"
createNodesOpcode = "createNodes"
createPodsOpcode = "createPods"
churnOpcode = "churn"
barrierOpcode = "barrier"
// Two modes supported in "churn" operator.
// Recreate creates a number of API objects and then delete them, and repeat the iteration.
Recreate = "recreate"
// Create continuously create API objects without deleting them.
Create = "create"
)
var (
@@ -90,7 +107,7 @@ func (tc *testCase) collectsMetrics() bool {
// workload is a subtest under a testCase that tests the scheduler performance
// for a certain ordering of ops. The set of nodes created and pods scheduled
// in a workload may be heterogenous.
// in a workload may be heterogeneous.
type workload struct {
// Name of the workload.
Name string
@@ -109,6 +126,7 @@ func (op *op) UnmarshalJSON(b []byte) error {
possibleOps := []realOp{
&createNodesOp{},
&createPodsOp{},
&churnOp{},
&barrierOp{},
// TODO(#93793): add a sleep timer op to simulate waiting?
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
@@ -252,6 +270,55 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
return &cpo, (&cpo).isValid(false)
}
// churnOp defines an op where services are created as a part of a workload.
type churnOp struct {
// Must be "churnOp".
Opcode string
// Value must be one of the followings:
// - recreate. In this mode, API objects will be created for N cycles, and then
// deleted in the next N cycles. N is specified by the "Number" field.
// - create. In this mode, API objects will be created (without deletion) until
// reaching a threshold - which is specified by the "Number" field.
Mode string
// Maximum number of API objects to be created.
// Defaults to 0, which means unlimited.
Number int
// Intervals of churning. Defaults to 500 millisecond.
IntervalMilliseconds int64
// Namespace the churning objects should be created in. Optional, defaults to a unique
// namespace of the format "namespace-<number>".
Namespace *string
// Path of API spec files.
TemplatePaths []string
}
func (co *churnOp) isValid(_ bool) error {
if co.Opcode != churnOpcode {
return fmt.Errorf("invalid opcode")
}
if co.Mode != Recreate && co.Mode != Create {
return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
}
if co.Number < 0 {
return fmt.Errorf("number (%v) cannot be negative", co.Number)
}
if co.Mode == Recreate && co.Number == 0 {
return fmt.Errorf("number cannot be 0 when mode is %v", Recreate)
}
if len(co.TemplatePaths) == 0 {
return fmt.Errorf("at least one template spec file needs to be specified")
}
return nil
}
func (*churnOp) collectsMetrics() bool {
return false
}
func (co churnOp) patchParams(w *workload) (realOp, error) {
return &co, nil
}
// barrierOp defines an op that can be used to wait until all scheduled pods of
// one or many namespaces have been bound to nodes. This is useful when pods
// were scheduled with SkipWaitToCompletion set to true.
@@ -309,7 +376,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
// 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
finalFunc, podInformer, clientset := mustSetupScheduler()
finalFunc, podInformer, client, dynClient := mustSetupScheduler()
b.Cleanup(finalFunc)
var mu sync.Mutex
@@ -329,7 +396,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, clientset)
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
@@ -359,7 +426,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
go collector.run(collectorCtx)
}
}
if err := createPods(namespace, concreteOp, clientset); err != nil {
if err := createPods(namespace, concreteOp, client); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
if concreteOp.SkipWaitToCompletion {
@@ -387,6 +454,103 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
mu.Unlock()
}
case *churnOp:
var namespace string
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
} else {
namespace = fmt.Sprintf("namespace-%d", opIndex)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery()))
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = dynClient.Resource(gvr).Namespace(namespace)
} else {
dynRes = dynClient.Resource(gvr)
}
churnFns = append(churnFns, func(name string) string {
if name != "" {
dynRes.Delete(ctx, name, metav1.DeleteOptions{})
return ""
}
live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{})
if err != nil {
return ""
}
return live.GetName()
})
}
var interval int64 = 500
if concreteOp.IntervalMilliseconds != 0 {
interval = concreteOp.IntervalMilliseconds
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
if concreteOp.Mode == Recreate {
go func() {
retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals {
retVals[i] = make([]string, concreteOp.Number)
}
count := 0
for {
select {
case <-ticker.C:
for i := range churnFns {
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
}
count++
case <-ctx.Done():
return
}
}
}()
} else if concreteOp.Mode == Create {
go func() {
count, threshold := 0, concreteOp.Number
if threshold == 0 {
threshold = math.MaxInt32
}
for count < threshold {
select {
case <-ticker.C:
for i := range churnFns {
churnFns[i]("")
}
count++
case <-ctx.Done():
return
}
}
}()
}
case *barrierOp:
for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
@@ -525,6 +689,28 @@ func getSpecFromFile(path *string, spec interface{}) error {
return yaml.UnmarshalStrict(bytes, spec)
}
func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) {
bytes, err := ioutil.ReadFile(path)
if err != nil {
return nil, nil, err
}
bytes, err = yaml.YAMLToJSONStrict(bytes)
if err != nil {
return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err)
}
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
if err != nil {
return nil, nil, err
}
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path)
}
return unstructuredObj, gvk, nil
}
func getTestCases(path string) ([]*testCase, error) {
testCases := make([]*testCase, 0)
if err := getSpecFromFile(&path, &testCases); err != nil {