From ec78b8305ad392f6faf4e5247ea33ceabb484c3f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 13 Jun 2025 16:34:42 +0200 Subject: [PATCH] Estimate average size of objects in etcd and plug it into request cost estimator --- pkg/features/kube_features.go | 4 + .../apiserver/pkg/features/kube_features.go | 9 ++ .../pkg/registry/generic/registry/dryrun.go | 4 +- .../pkg/registry/generic/registry/store.go | 8 +- .../pkg/storage/cacher/cacher_test.go | 14 +- .../storage/cacher/cacher_whitebox_test.go | 4 +- .../apiserver/pkg/storage/cacher/delegator.go | 4 +- .../apiserver/pkg/storage/etcd3/stats.go | 133 ++++++++++++++++++ .../apiserver/pkg/storage/etcd3/stats_test.go | 120 ++++++++++++++++ .../apiserver/pkg/storage/etcd3/store.go | 43 ++++-- .../apiserver/pkg/storage/etcd3/store_test.go | 17 ++- .../apiserver/pkg/storage/etcd3/watcher.go | 11 ++ .../apiserver/pkg/storage/interfaces.go | 13 +- .../pkg/storage/testing/store_tests.go | 97 +++++++++---- .../request/list_work_estimator.go | 9 +- .../request/object_count_tracker.go | 33 ++--- .../request/object_count_tracker_test.go | 23 +-- .../pkg/util/flowcontrol/request/width.go | 7 +- .../util/flowcontrol/request/width_test.go | 5 +- .../reference/versioned_feature_list.yaml | 6 + 20 files changed, 464 insertions(+), 100 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats_test.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index b1aca7e2cb7..3b294186f72 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1277,6 +1277,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated}, }, + genericfeatures.SizeBasedListCostEstimate: { + {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, + }, + genericfeatures.StorageVersionAPI: { {Version: version.MustParse("1.20"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index fd4a9255676..065d96f2fc1 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -176,6 +176,11 @@ const ( // if the generated name conflicts with an existing resource name, up to a maximum number of 7 retries. RetryGenerateName featuregate.Feature = "RetryGenerateName" + // owner: @serathius + // + // Enables APF to use size of objects for estimating request cost. + SizeBasedListCostEstimate featuregate.Feature = "SizeBasedListCostEstimate" + // owner: @cici37 // // StrictCostEnforcementForVAP is used to apply strict CEL cost validation for ValidatingAdmissionPolicy. @@ -362,6 +367,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated}, }, + SizeBasedListCostEstimate: { + {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, + }, + StorageVersionAPI: { {Version: version.MustParse("1.20"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go index dd6a2c78ec0..fe9bc48969b 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go @@ -107,8 +107,8 @@ func (s *DryRunnableStorage) GuaranteedUpdate( return s.Storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject) } -func (s *DryRunnableStorage) Count(ctx context.Context, key string) (int64, error) { - return s.Storage.Count(ctx, key) +func (s *DryRunnableStorage) Stats(ctx context.Context) (storage.Stats, error) { + return s.Storage.Stats(ctx) } func (s *DryRunnableStorage) copyInto(in, out runtime.Object) error { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 422353a38d1..83644ce834c 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1667,15 +1667,15 @@ func (e *Store) startObservingCount(period time.Duration, objectCountTracker flo klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "/"+prefix) stopCh := make(chan struct{}) go wait.JitterUntil(func() { - count, err := e.Storage.Count(ctx, prefix) + stats, err := e.Storage.Stats(ctx) if err != nil { klog.V(5).InfoS("Failed to update storage count metric", "err", err) - count = -1 + stats.ObjectCount = -1 } - metrics.UpdateObjectCount(e.DefaultQualifiedResource, count) + metrics.UpdateObjectCount(e.DefaultQualifiedResource, stats.ObjectCount) if objectCountTracker != nil { - objectCountTracker.Set(resourceName, count) + objectCountTracker.Set(resourceName, stats) } }, period, resourceCountPollPeriodJitter, true, stopCh) return func() { close(stopCh) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 5547bf065dc..db857a35038 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/storage" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -279,10 +280,15 @@ func TestTransformationFailure(t *testing.T) { // TODO(#109831): Enable use of this test and run it. } -func TestCount(t *testing.T) { - ctx, cacher, terminate := testSetup(t) - t.Cleanup(terminate) - storagetesting.RunTestCount(ctx, t, cacher) +func TestStats(t *testing.T) { + for _, sizeBasedListCostEstimate := range []bool{true, false} { + t.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v", sizeBasedListCostEstimate), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedListCostEstimate) + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestStats(ctx, t, cacher, codecs.LegacyCodec(examplev1.SchemeGroupVersion), identity.NewEncryptCheckTransformer(), sizeBasedListCostEstimate) + }) + } } func TestWatch(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 7f3ed9f13b4..c12b21bfb59 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -189,8 +189,8 @@ func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts stora func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ runtime.Object) error { return fmt.Errorf("unimplemented") } -func (d *dummyStorage) Count(_ context.Context, _ string) (int64, error) { - return 0, fmt.Errorf("unimplemented") +func (d *dummyStorage) Stats(_ context.Context) (storage.Stats, error) { + return storage.Stats{}, fmt.Errorf("unimplemented") } func (d *dummyStorage) ReadinessCheck() error { return nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index a1dc0f57314..b65dffa8796 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -274,8 +274,8 @@ func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, desti return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil) } -func (c *CacheDelegator) Count(ctx context.Context, pathPrefix string) (int64, error) { - return c.storage.Count(ctx, pathPrefix) +func (c *CacheDelegator) Stats(ctx context.Context) (storage.Stats, error) { + return c.storage.Stats(ctx) } func (c *CacheDelegator) ReadinessCheck() error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go new file mode 100644 index 00000000000..af7bae69377 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go @@ -0,0 +1,133 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "context" + "sync" + + "go.etcd.io/etcd/api/v3/mvccpb" + "k8s.io/apiserver/pkg/storage" +) + +type keysFunc func(context.Context) ([]string, error) + +func newStatsCache(getKeys keysFunc) *statsCache { + sc := &statsCache{ + getKeys: getKeys, + keys: make(map[string]sizeRevision), + } + return sc +} + +// statsCache efficiently estimates the average object size +// based on the last observed state of individual keys. +// By plugging statsCache into GetList and Watch functions, +// a fairly accurate estimate of object sizes can be maintained +// without additional requests to the underlying storage. +// To handle potential out-of-order or incomplete data, +// it uses a per-key revision to identify the newer state. +// This approach may leak keys if delete events are not observed, +// thus we run a background goroutine to periodically cleanup keys if needed. +type statsCache struct { + getKeys keysFunc + + lock sync.Mutex + keys map[string]sizeRevision +} + +type sizeRevision struct { + sizeBytes int64 + revision int64 +} + +func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) { + keys, err := sc.getKeys(ctx) + if err != nil { + return storage.Stats{}, err + } + stats := storage.Stats{ + ObjectCount: int64(len(keys)), + } + sc.lock.Lock() + defer sc.lock.Unlock() + sc.cleanKeys(keys) + if len(sc.keys) != 0 { + stats.EstimatedAverageObjectSizeBytes = sc.keySizes() / int64(len(sc.keys)) + } + return stats, nil +} + +func (sc *statsCache) cleanKeys(keepKeys []string) { + newKeys := make(map[string]sizeRevision, len(keepKeys)) + for _, key := range keepKeys { + keySizeRevision, ok := sc.keys[key] + if !ok { + continue + } + newKeys[key] = keySizeRevision + } + sc.keys = newKeys +} + +func (sc *statsCache) keySizes() (totalSize int64) { + for _, sizeRevision := range sc.keys { + totalSize += sizeRevision.sizeBytes + } + return totalSize +} + +func (sc *statsCache) Update(kvs []*mvccpb.KeyValue) { + sc.lock.Lock() + defer sc.lock.Unlock() + for _, kv := range kvs { + sc.updateKey(kv) + } +} + +func (sc *statsCache) UpdateKey(kv *mvccpb.KeyValue) { + sc.lock.Lock() + defer sc.lock.Unlock() + + sc.updateKey(kv) +} + +func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) { + key := string(kv.Key) + keySizeRevision := sc.keys[key] + if keySizeRevision.revision >= kv.ModRevision { + return + } + + sc.keys[key] = sizeRevision{ + sizeBytes: int64(len(kv.Value)), + revision: kv.ModRevision, + } +} + +func (sc *statsCache) DeleteKey(kv *mvccpb.KeyValue) { + sc.lock.Lock() + defer sc.lock.Unlock() + + key := string(kv.Key) + keySizeRevision := sc.keys[key] + if keySizeRevision.revision >= kv.ModRevision { + return + } + + delete(sc.keys, key) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats_test.go new file mode 100644 index 00000000000..bc64c9973db --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +func TestStatsCache(t *testing.T) { + ctx := t.Context() + store := newStatsCache(func(ctx context.Context) ([]string, error) { return []string{}, nil }) + + stats, err := store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2}) + store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1"}, nil } + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3}) + store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1", "foo2"}, nil } + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(15), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("012345678901234567890123456789"), ModRevision: 4}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(25), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("0123456789"), ModRevision: 5}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(20), stats.EstimatedAverageObjectSizeBytes) + + store.DeleteKey(&mvccpb.KeyValue{Key: []byte("foo1"), ModRevision: 6}) + store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo2"}, nil } + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + // Snapshot revision from revision 3 + store.Update([]*mvccpb.KeyValue{ + {Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2}, + {Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3}, + }) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + // Replay from revision 2 + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("012345678901234567890123456789"), ModRevision: 4}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("0123456789"), ModRevision: 5}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + store.DeleteKey(&mvccpb.KeyValue{Key: []byte("foo1"), ModRevision: 6}) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes) + + store.DeleteKey(&mvccpb.KeyValue{Key: []byte("foo1"), ModRevision: 7}) + store.getKeys = func(ctx context.Context) ([]string, error) { return []string{}, nil } + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes) + + // Old snapshot might restore old revision if keys were recreated + store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1", "foo2"}, nil } + store.Update([]*mvccpb.KeyValue{ + {Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2}, + {Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3}, + }) + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(15), stats.EstimatedAverageObjectSizeBytes) + + // Cleanup if keys were deleted + store.getKeys = func(ctx context.Context) ([]string, error) { return []string{}, nil } + stats, err = store.Stats(ctx) + require.NoError(t, err) + assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index e1350042fb2..dbd7f675d4b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -88,6 +88,7 @@ type store struct { resourcePrefix string newListFunc func() runtime.Object + stats *statsCache } var _ storage.Interface = (*store)(nil) @@ -181,6 +182,11 @@ func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() resourcePrefix: resourcePrefix, newListFunc: newListFunc, } + if utilfeature.DefaultFeatureGate.Enabled(features.SizeBasedListCostEstimate) { + stats := newStatsCache(s.getKeys) + s.stats = stats + w.stats = stats + } w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { return s.GetCurrentResourceVersion(ctx) @@ -606,26 +612,30 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje } } -func (s *store) Count(ctx context.Context, key string) (int64, error) { - preparedKey, err := s.prepareKey(key) - if err != nil { - return 0, err +func (s *store) Stats(ctx context.Context) (stats storage.Stats, err error) { + if s.stats != nil { + return s.stats.Stats(ctx) } - - // We need to make sure the key ended with "/" so that we only get children "directories". - // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three, - // while with prefix "/a/" will return only "/a/b" which is the correct answer. - if !strings.HasSuffix(preparedKey, "/") { - preparedKey += "/" - } - startTime := time.Now() - count, err := s.client.Kubernetes.Count(ctx, preparedKey, kubernetes.CountOptions{}) + count, err := s.client.Kubernetes.Count(ctx, s.pathPrefix, kubernetes.CountOptions{}) metrics.RecordEtcdRequest("listWithCount", s.groupResource, err, startTime) if err != nil { - return 0, err + return storage.Stats{}, err } - return count, nil + return storage.Stats{ + ObjectCount: count, + }, nil +} + +func (s *store) getKeys(ctx context.Context) ([]string, error) { + startTime := time.Now() + resp, err := s.client.KV.Get(ctx, s.pathPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + metrics.RecordEtcdRequest("listOnlyKeys", s.groupResource, err, startTime) + keys := make([]string, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + } + return keys, nil } // ReadinessCheck implements storage.Interface. @@ -756,6 +766,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption } else { growSlice(v, 2048, len(getResp.Kvs)) } + if s.stats != nil { + s.stats.Update(getResp.Kvs) + } // take items from the response until the bucket is full, filtering as we go for i, kv := range getResp.Kvs { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ae2a5530b8d..b91c2192a65 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -43,10 +43,13 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/testserver" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" ) @@ -343,9 +346,15 @@ func TestListResourceVersionMatch(t *testing.T) { storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store}) } -func TestCount(t *testing.T) { - ctx, store, _ := testSetup(t) - storagetesting.RunTestCount(ctx, t, store) +func TestStats(t *testing.T) { + for _, sizeBasedListCostEstimate := range []bool{true, false} { + t.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v", sizeBasedListCostEstimate), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedListCostEstimate) + // Match transformer with cacher tests. + ctx, store, _ := testSetup(t) + storagetesting.RunTestStats(ctx, t, store, store.codec, store.transformer, sizeBasedListCostEstimate) + }) + } } // ======================================================================= @@ -666,8 +675,6 @@ func TestInvalidKeys(t *testing.T) { expectInvalidKey("Get", store.Get(ctx, invalidKey, storage.GetOptions{}, nil)) expectInvalidKey("GetList", store.GetList(ctx, invalidKey, storage.ListOptions{}, nil)) expectInvalidKey("GuaranteedUpdate", store.GuaranteedUpdate(ctx, invalidKey, nil, true, nil, nil, nil)) - _, countErr := store.Count(t.Context(), invalidKey) - expectInvalidKey("Count", countErr) } func BenchmarkStore_GetList(b *testing.B) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 63c664a5329..76f65f160eb 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -77,6 +77,7 @@ type watcher struct { versioner storage.Versioner transformer value.Transformer getCurrentStorageRV func(context.Context) (uint64, error) + stats *statsCache } // watchChan implements watch.Interface. @@ -91,6 +92,7 @@ type watchChan struct { cancel context.CancelFunc incomingEventChan chan *event resultChan chan watch.Event + stats *statsCache } // Watch watches on a key and returns a watch.Interface that transfers relevant notifications. @@ -134,6 +136,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re internalPred: pred, incomingEventChan: make(chan *event, incomingBufSize), resultChan: make(chan watch.Event, outgoingBufSize), + stats: w.stats, } if pred.Empty() { // The filter doesn't filter out any object. @@ -402,6 +405,14 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd } for _, e := range wres.Events { + if wc.stats != nil { + switch e.Type { + case clientv3.EventTypePut: + wc.stats.UpdateKey(e.Kv) + case clientv3.EventTypeDelete: + wc.stats.DeleteKey(e.Kv) + } + } metrics.RecordEtcdEvent(wc.watcher.groupResource) parsedEvent, err := parseEvent(e) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index d1f7ce31cc7..582e369de9b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -243,8 +243,8 @@ type Interface interface { ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error - // Count returns number of different entries under the key (generally being path prefix). - Count(ctx context.Context, key string) (int64, error) + // Stats returns storage stats. + Stats(ctx context.Context) (Stats, error) // ReadinessCheck checks if the storage is ready for accepting requests. ReadinessCheck() error @@ -370,3 +370,12 @@ func ValidateListOptions(keyPrefix string, versioner Versioner, opts ListOptions } return withRev, "", nil } + +// Stats provides statistics information about storage. +type Stats struct { + // ObjectCount informs about number of objects stored in the storage. + ObjectCount int64 + // EstimatedAverageObjectSizeBytes informs about size of objects stored in the storage, based on size of serialized values. + // Value is an estimate, meaning it doesn't need to provide accurate nor consistent. + EstimatedAverageObjectSizeBytes int64 +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 6610edad14e..85e16eac17c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -3127,43 +3127,84 @@ func RunTestTransformationFailure(ctx context.Context, t *testing.T, store Inter } } -func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) { - resourceA := "/foo.bar.io/abc" +func RunTestStats(ctx context.Context, t *testing.T, store storage.Interface, codec runtime.Codec, transformer value.Transformer, sizeEnabled bool) { + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 0, EstimatedAverageObjectSizeBytes: 0}) - // resourceA is intentionally a prefix of resourceB to ensure that the count - // for resourceA does not include any objects from resourceB. - resourceB := fmt.Sprintf("%sdef", resourceA) - - resourceACountExpected := 5 - for i := 1; i <= resourceACountExpected; i++ { - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}} - - key := fmt.Sprintf("%s/%d", resourceA, i) - if err := store.Create(ctx, key, obj, nil, 0); err != nil { - t.Fatalf("Create failed: %v", err) - } + foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + fooKey := computePodKey(foo) + if err := store.Create(ctx, fooKey, foo, nil, 0); err != nil { + t.Fatalf("Create failed: %v", err) } + fooSize := objectSize(t, codec, foo, transformer) + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: fooSize}) - resourceBCount := 4 - for i := 1; i <= resourceBCount; i++ { - obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}} - - key := fmt.Sprintf("%s/%d", resourceB, i) - if err := store.Create(ctx, key, obj, nil, 0); err != nil { - t.Fatalf("Create failed: %v", err) - } + bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} + barKey := computePodKey(bar) + if err := store.Create(ctx, barKey, bar, nil, 0); err != nil { + t.Fatalf("Create failed: %v", err) } + barSize := objectSize(t, codec, bar, transformer) + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 2, EstimatedAverageObjectSizeBytes: (fooSize + barSize) / 2}) - resourceACountGot, err := store.Count(t.Context(), resourceA) + if err := store.GuaranteedUpdate(ctx, barKey, bar, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.Labels = map[string]string{"foo": "bar"} + return pod, nil + }), nil); err != nil { + t.Errorf("Update failed: %v", err) + } + // ResourceVerson is not stored. + bar.ResourceVersion = "" + barSize = objectSize(t, codec, bar, transformer) + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 2, EstimatedAverageObjectSizeBytes: (fooSize + barSize) / 2}) + + if err := store.Delete(ctx, fooKey, foo, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err != nil { + t.Errorf("Delete failed: %v", err) + } + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: barSize}) + + if err := store.Delete(ctx, fooKey, foo, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err == nil { + t.Errorf("Delete expected to fail") + } + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: barSize}) + + if err := store.Delete(ctx, barKey, bar, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err != nil { + t.Errorf("Delete failed: %v", err) + } + assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 0, EstimatedAverageObjectSizeBytes: 0}) +} + +func assertStats(t *testing.T, store storage.Interface, sizeEnabled bool, expectStats storage.Stats) { + t.Helper() + // Execute consistent LIST to refresh state of cache. + err := store.GetList(t.Context(), "/pods", storage.ListOptions{Recursive: true, Predicate: storage.Everything}, &example.PodList{}) if err != nil { - t.Fatalf("store.Count failed: %v", err) + t.Fatalf("GetList failed: %v", err) + } + stats, err := store.Stats(t.Context()) + if err != nil { + t.Fatalf("store.Stats failed: %v", err) } - // count for resourceA should not include the objects for resourceB - // even though resourceA is a prefix of resourceB. - if int64(resourceACountExpected) != resourceACountGot { - t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot) + if !sizeEnabled { + expectStats.EstimatedAverageObjectSizeBytes = 0 } + if expectStats != stats { + t.Errorf("store.Stats: expected %+v but got %+v", expectStats, stats) + } +} + +func objectSize(t *testing.T, codec runtime.Codec, obj runtime.Object, transformer value.Transformer) int64 { + data, err := runtime.Encode(codec, obj) + if err != nil { + t.Fatalf("Encode failed: %v", err) + } + data, err = transformer.TransformToStorage(t.Context(), data, value.DefaultContext{}) + if err != nil { + t.Fatalf("Transform failed: %v", err) + } + return int64(len(data)) } func RunTestListPaging(ctx context.Context, t *testing.T, store storage.Interface) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 6e46e2d59fd..57d8a440bef 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -27,10 +27,10 @@ import ( "k8s.io/klog/v2" ) -func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { +func newListWorkEstimator(countFn statsGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { estimator := &listWorkEstimator{ config: config, - countGetterFn: countFn, + statsGetterFn: countFn, maxSeatsFn: maxSeatsFn, } return estimator.estimate @@ -38,7 +38,7 @@ func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorCo type listWorkEstimator struct { config *WorkEstimatorConfig - countGetterFn objectCountGetterFunc + statsGetterFn statsGetterFunc maxSeatsFn maxSeatsFunc } @@ -90,7 +90,8 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe listFromStorage := result.ShouldDelegate isListFromCache := requestInfo.Verb == "watch" || !listFromStorage - numStored, err := e.countGetterFn(key(requestInfo)) + stats, err := e.statsGetterFn(key(requestInfo)) + numStored := stats.ObjectCount switch { case err == ObjectCountStaleErr: // object count going stale is indicative of degradation, so we should diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go index 62a5e4f2d4b..60d33d839ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage" "k8s.io/klog/v2" "k8s.io/utils/clock" ) @@ -55,7 +56,7 @@ var ( type StorageObjectCountTracker interface { // Set is invoked to update the current number of total // objects for the given resource - Set(string, int64) + Set(string, storage.Stats) // Get returns the total number of objects for the given resource. // The following errors are returned: @@ -63,7 +64,7 @@ type StorageObjectCountTracker interface { // failures ObjectCountStaleErr is returned. // - if the given resource is not being tracked then // ObjectCountNotFoundErr is returned. - Get(string) (int64, error) + Get(string) (storage.Stats, error) // RunUntil starts all the necessary maintenance. RunUntil(stopCh <-chan struct{}) @@ -75,14 +76,14 @@ type StorageObjectCountTracker interface { func NewStorageObjectCountTracker() StorageObjectCountTracker { return &objectCountTracker{ clock: &clock.RealClock{}, - counts: map[string]*timestampedCount{}, + counts: map[string]*timestampedStats{}, } } -// timestampedCount stores the count of a given resource with a last updated +// timestampedStats stores the count of a given resource with a last updated // timestamp so we can prune it after it goes stale for certain threshold. -type timestampedCount struct { - count int64 +type timestampedStats struct { + storage.Stats lastUpdatedAt time.Time } @@ -92,11 +93,11 @@ type objectCountTracker struct { clock clock.PassiveClock lock sync.RWMutex - counts map[string]*timestampedCount + counts map[string]*timestampedStats } -func (t *objectCountTracker) Set(groupResource string, count int64) { - if count <= -1 { +func (t *objectCountTracker) Set(groupResource string, stats storage.Stats) { + if stats.ObjectCount <= -1 { // a value of -1 indicates that the 'Count' call failed to contact // the storage layer, in most cases this error can be transient. // we will continue to work with the count that is in the cache @@ -114,18 +115,18 @@ func (t *objectCountTracker) Set(groupResource string, count int64) { defer t.lock.Unlock() if item, ok := t.counts[groupResource]; ok { - item.count = count + item.Stats = stats item.lastUpdatedAt = now return } - t.counts[groupResource] = ×tampedCount{ - count: count, + t.counts[groupResource] = ×tampedStats{ + Stats: stats, lastUpdatedAt: now, } } -func (t *objectCountTracker) Get(groupResource string) (int64, error) { +func (t *objectCountTracker) Get(groupResource string) (storage.Stats, error) { staleThreshold := t.clock.Now().Add(-staleTolerationThreshold) t.lock.RLock() @@ -133,11 +134,11 @@ func (t *objectCountTracker) Get(groupResource string) (int64, error) { if item, ok := t.counts[groupResource]; ok { if item.lastUpdatedAt.Before(staleThreshold) { - return item.count, ObjectCountStaleErr + return item.Stats, ObjectCountStaleErr } - return item.count, nil + return item.Stats, nil } - return 0, ObjectCountNotFoundErr + return storage.Stats{}, ObjectCountNotFoundErr } // RunUntil runs all the necessary maintenance. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go index 06442e299e2..95d7a1bb11f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "k8s.io/apiserver/pkg/storage" testclock "k8s.io/utils/clock/testing" ) @@ -69,21 +70,21 @@ func TestStorageObjectCountTracker(t *testing.T) { fakeClock := &testclock.FakePassiveClock{} tracker := &objectCountTracker{ clock: fakeClock, - counts: map[string]*timestampedCount{}, + counts: map[string]*timestampedStats{}, } key := "foo.bar.resource" now := time.Now() fakeClock.SetTime(now.Add(-test.lastUpdated)) - tracker.Set(key, test.count) + tracker.Set(key, storage.Stats{ObjectCount: test.count}) fakeClock.SetTime(now) - countGot, err := tracker.Get(key) + stats, err := tracker.Get(key) if test.errExpected != err { t.Errorf("Expected error: %v, but got: %v", test.errExpected, err) } - if test.countExpected != countGot { - t.Errorf("Expected count: %d, but got: %d", test.countExpected, countGot) + if test.countExpected != stats.ObjectCount { + t.Errorf("Expected count: %d, but got: %d", test.countExpected, stats.ObjectCount) } if test.count <= -1 && len(tracker.counts) > 0 { t.Errorf("Expected the cache to be empty, but got: %d", len(tracker.counts)) @@ -96,23 +97,23 @@ func TestStorageObjectCountTrackerWithPrune(t *testing.T) { fakeClock := &testclock.FakePassiveClock{} tracker := &objectCountTracker{ clock: fakeClock, - counts: map[string]*timestampedCount{}, + counts: map[string]*timestampedStats{}, } now := time.Now() fakeClock.SetTime(now.Add(-61 * time.Minute)) - tracker.Set("k1", 61) + tracker.Set("k1", storage.Stats{ObjectCount: 61}) fakeClock.SetTime(now.Add(-60 * time.Minute)) - tracker.Set("k2", 60) + tracker.Set("k2", storage.Stats{ObjectCount: 60}) // we are going to prune keys that are stale for >= 1h // so the above keys are expected to be pruned and the // key below should not be pruned. mostRecent := now.Add(-59 * time.Minute) fakeClock.SetTime(mostRecent) - tracker.Set("k3", 59) - expected := map[string]*timestampedCount{ + tracker.Set("k3", storage.Stats{ObjectCount: 59}) + expected := map[string]*timestampedStats{ "k3": { - count: 59, + Stats: storage.Stats{ObjectCount: 59}, lastUpdatedAt: mostRecent, }, } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 43bc29ab835..daefe051a3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -23,6 +23,7 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" @@ -56,9 +57,9 @@ func (we *WorkEstimate) MaxSeats() int { return int(we.FinalSeats) } -// objectCountGetterFunc represents a function that gets the total +// statsGetterFunc represents a function that gets the total // number of objects for a given resource. -type objectCountGetterFunc func(string) (int64, error) +type statsGetterFunc func(string) (storage.Stats, error) // watchCountGetterFunc represents a function that gets the total // number of watchers potentially interested in a given request. @@ -71,7 +72,7 @@ type maxSeatsFunc func(priorityLevelName string) uint64 // NewWorkEstimator estimates the work that will be done by a given request, // if no WorkEstimatorFunc matches the given request then the default // work estimate of 1 seat is allocated to the request. -func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { +func NewWorkEstimator(objectCountFn statsGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc { estimator := &workEstimator{ minimumSeats: config.MinimumSeats, maximumSeatsLimit: config.MaximumSeatsLimit, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index 29e3c2ca8de..0090a21371a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -24,6 +24,7 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" ) @@ -547,8 +548,8 @@ func TestWorkEstimator(t *testing.T) { if len(counts) == 0 { counts = map[string]int64{} } - countsFn := func(key string) (int64, error) { - return counts[key], test.countErr + countsFn := func(key string) (storage.Stats, error) { + return storage.Stats{ObjectCount: counts[key]}, test.countErr } watchCountsFn := func(_ *apirequest.RequestInfo) int { return test.watchCount diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index fa4a76dc67c..f52877ce83e 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1429,6 +1429,12 @@ lockToDefault: true preRelease: GA version: "1.33" +- name: SizeBasedListCostEstimate + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "1.34" - name: SizeMemoryBackedVolumes versionedSpecs: - default: false