mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Implement consistency checking
This commit is contained in:
		
				
					committed by
					
						
						Marek Siarkowicz
					
				
			
			
				
	
			
			
			
						parent
						
							c30b1eb09b
						
					
				
				
					commit
					e4d73c56cd
				
			@@ -71,15 +71,17 @@ func StorageWithCacher() generic.StorageDecorator {
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, func() {}, err
 | 
								return nil, func() {}, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							delegator := cacherstorage.NewCacheDelegator(cacher, s)
 | 
				
			||||||
		var once sync.Once
 | 
							var once sync.Once
 | 
				
			||||||
		destroyFunc := func() {
 | 
							destroyFunc := func() {
 | 
				
			||||||
			once.Do(func() {
 | 
								once.Do(func() {
 | 
				
			||||||
 | 
									delegator.Stop()
 | 
				
			||||||
				cacher.Stop()
 | 
									cacher.Stop()
 | 
				
			||||||
				d()
 | 
									d()
 | 
				
			||||||
			})
 | 
								})
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return cacherstorage.NewCacheDelegator(cacher, s), destroyFunc, nil
 | 
							return delegator, destroyFunc, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2459,8 +2459,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		d := destroyFunc
 | 
							d := destroyFunc
 | 
				
			||||||
		s = cacherstorage.NewCacheDelegator(cacher, s)
 | 
							delegator := cacherstorage.NewCacheDelegator(cacher, s)
 | 
				
			||||||
 | 
							s = delegator
 | 
				
			||||||
		destroyFunc = func() {
 | 
							destroyFunc = func() {
 | 
				
			||||||
 | 
								delegator.Stop()
 | 
				
			||||||
			cacher.Stop()
 | 
								cacher.Stop()
 | 
				
			||||||
			d()
 | 
								d()
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -729,7 +729,7 @@ type listResp struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetList implements storage.Interface
 | 
					// GetList implements storage.Interface
 | 
				
			||||||
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
 | 
					func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
 | 
				
			||||||
	// For recursive lists, we need to make sure the key ended with "/" so that we only
 | 
						// For recursive lists, we need to make sure the key ended with "/" so that we only
 | 
				
			||||||
	// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
 | 
						// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
 | 
				
			||||||
	// with prefix "/a" will return all three, while with prefix "/a/" will return only
 | 
						// with prefix "/a" will return all three, while with prefix "/a/" will return only
 | 
				
			||||||
@@ -738,6 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
 | 
				
			|||||||
	if opts.Recursive && !strings.HasSuffix(key, "/") {
 | 
						if opts.Recursive && !strings.HasSuffix(key, "/") {
 | 
				
			||||||
		preparedKey += "/"
 | 
							preparedKey += "/"
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, span := tracing.Start(ctx, "cacher.GetList",
 | 
						ctx, span := tracing.Start(ctx, "cacher.GetList",
 | 
				
			||||||
		attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
 | 
							attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -484,10 +484,6 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
 | 
				
			|||||||
		t.Fatalf("Failed to initialize cacher: %v", err)
 | 
							t.Fatalf("Failed to initialize cacher: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
	terminate := func() {
 | 
					 | 
				
			||||||
		cacher.Stop()
 | 
					 | 
				
			||||||
		server.Terminate(t)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Since some tests depend on the fact that GetList shouldn't fail,
 | 
						// Since some tests depend on the fact that GetList shouldn't fail,
 | 
				
			||||||
	// we wait until the error from the underlying storage is consumed.
 | 
						// we wait until the error from the underlying storage is consumed.
 | 
				
			||||||
@@ -503,8 +499,14 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
 | 
				
			|||||||
			t.Fatal(err)
 | 
								t.Fatal(err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						delegator := NewCacheDelegator(cacher, wrappedStorage)
 | 
				
			||||||
 | 
						terminate := func() {
 | 
				
			||||||
 | 
							delegator.Stop()
 | 
				
			||||||
 | 
							cacher.Stop()
 | 
				
			||||||
 | 
							server.Terminate(t)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return ctx, NewCacheDelegator(cacher, wrappedStorage), server, terminate
 | 
						return ctx, delegator, server, terminate
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
 | 
					func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -207,6 +207,15 @@ func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, e
 | 
				
			|||||||
	return 100, nil
 | 
						return 100, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type dummyCacher struct {
 | 
				
			||||||
 | 
						dummyStorage
 | 
				
			||||||
 | 
						ready bool
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *dummyCacher) Ready() bool {
 | 
				
			||||||
 | 
						return d.ready
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestGetListCacheBypass(t *testing.T) {
 | 
					func TestGetListCacheBypass(t *testing.T) {
 | 
				
			||||||
	type opts struct {
 | 
						type opts struct {
 | 
				
			||||||
		ResourceVersion      string
 | 
							ResourceVersion      string
 | 
				
			||||||
@@ -326,6 +335,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
	result := &example.PodList{}
 | 
						result := &example.PodList{}
 | 
				
			||||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
 | 
						if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
 | 
				
			||||||
		if err := cacher.ready.wait(context.Background()); err != nil {
 | 
							if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
@@ -450,6 +460,7 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			defer cacher.Stop()
 | 
								defer cacher.Stop()
 | 
				
			||||||
			delegator := NewCacheDelegator(cacher, backingStorage)
 | 
								delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
								defer delegator.Stop()
 | 
				
			||||||
			if err := cacher.ready.wait(context.Background()); err != nil {
 | 
								if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
				t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
									t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -533,6 +544,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pred := storage.SelectionPredicate{
 | 
						pred := storage.SelectionPredicate{
 | 
				
			||||||
		Limit: 500,
 | 
							Limit: 500,
 | 
				
			||||||
@@ -572,6 +584,7 @@ func TestGetCacheBypass(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	result := &example.Pod{}
 | 
						result := &example.Pod{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -608,6 +621,7 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
 | 
						if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
 | 
				
			||||||
		if err := cacher.ready.wait(context.Background()); err != nil {
 | 
							if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
@@ -645,6 +659,7 @@ func TestTooManyRequestsNotReturned(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	opts := storage.ListOptions{
 | 
						opts := storage.ListOptions{
 | 
				
			||||||
		ResourceVersion: "0",
 | 
							ResourceVersion: "0",
 | 
				
			||||||
@@ -890,6 +905,7 @@ func TestCacherDontAcceptRequestsStopped(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
							t.Fatalf("Couldn't create cacher: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
 | 
						if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
 | 
				
			||||||
		if err := cacher.ready.wait(context.Background()); err != nil {
 | 
							if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
@@ -2326,6 +2342,7 @@ func BenchmarkCacher_GetList(b *testing.B) {
 | 
				
			|||||||
				}
 | 
									}
 | 
				
			||||||
				defer cacher.Stop()
 | 
									defer cacher.Stop()
 | 
				
			||||||
				delegator := NewCacheDelegator(cacher, store)
 | 
									delegator := NewCacheDelegator(cacher, store)
 | 
				
			||||||
 | 
									defer delegator.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				// prepare result and pred
 | 
									// prepare result and pred
 | 
				
			||||||
				parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
 | 
									parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
 | 
				
			||||||
@@ -3207,6 +3224,7 @@ func TestRetryAfterForUnreadyCache(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	result := &example.PodList{}
 | 
						result := &example.PodList{}
 | 
				
			||||||
	delegator := NewCacheDelegator(cacher, backingStorage)
 | 
						delegator := NewCacheDelegator(cacher, backingStorage)
 | 
				
			||||||
 | 
						defer delegator.Stop()
 | 
				
			||||||
	err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
 | 
						err = delegator.GetList(context.TODO(), "/pods/ns", opts, result)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !apierrors.IsTooManyRequests(err) {
 | 
						if !apierrors.IsTooManyRequests(err) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,12 +18,20 @@ package cacher
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"hash"
 | 
				
			||||||
 | 
						"hash/fnv"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"go.opentelemetry.io/otel/attribute"
 | 
						"go.opentelemetry.io/otel/attribute"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/api/meta"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/watch"
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/audit"
 | 
						"k8s.io/apiserver/pkg/audit"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/features"
 | 
						"k8s.io/apiserver/pkg/features"
 | 
				
			||||||
@@ -35,16 +43,45 @@ import (
 | 
				
			|||||||
	"k8s.io/klog/v2"
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						// ConsistencyCheckPeriod is the period of checking consistency between etcd and cache.
 | 
				
			||||||
 | 
						// 5 minutes were proposed to match the default compaction period. It's magnitute higher than
 | 
				
			||||||
 | 
						// List latency SLO (30 seconds) and timeout (1 minute).
 | 
				
			||||||
 | 
						ConsistencyCheckPeriod = 5 * time.Minute
 | 
				
			||||||
 | 
						// ConsistencyCheckerEnabled enables the consistency checking mechanism for cache.
 | 
				
			||||||
 | 
						// Based on KUBE_WATCHCACHE_CONSISTANCY_CHECKER environment variable.
 | 
				
			||||||
 | 
						ConsistencyCheckerEnabled = false
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						ConsistencyCheckerEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTANCY_CHECKER"))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
 | 
					func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
 | 
				
			||||||
	return &CacheDelegator{
 | 
						d := &CacheDelegator{
 | 
				
			||||||
		cacher:  cacher,
 | 
							cacher:  cacher,
 | 
				
			||||||
		storage: storage,
 | 
							storage: storage,
 | 
				
			||||||
 | 
							stopCh:  make(chan struct{}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if ConsistencyCheckerEnabled {
 | 
				
			||||||
 | 
							d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.newListFunc, cacher, storage)
 | 
				
			||||||
 | 
							d.wg.Add(1)
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								defer d.wg.Done()
 | 
				
			||||||
 | 
								d.checker.startChecking(d.stopCh)
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return d
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type CacheDelegator struct {
 | 
					type CacheDelegator struct {
 | 
				
			||||||
	cacher  *Cacher
 | 
						cacher  *Cacher
 | 
				
			||||||
	storage storage.Interface
 | 
						storage storage.Interface
 | 
				
			||||||
 | 
						checker *consistencyChecker
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						wg       sync.WaitGroup
 | 
				
			||||||
 | 
						stopOnce sync.Once
 | 
				
			||||||
 | 
						stopCh   chan struct{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var _ storage.Interface = (*CacheDelegator)(nil)
 | 
					var _ storage.Interface = (*CacheDelegator)(nil)
 | 
				
			||||||
@@ -168,14 +205,18 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
 | 
				
			||||||
 | 
							opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	err = c.cacher.GetList(ctx, key, opts, listObj, listRV)
 | 
						err = c.cacher.GetList(ctx, key, opts, listObj)
 | 
				
			||||||
	success := "true"
 | 
						success := "true"
 | 
				
			||||||
	fallback := "false"
 | 
						fallback := "false"
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		if consistentRead {
 | 
							if consistentRead {
 | 
				
			||||||
			if storage.IsTooLargeResourceVersion(err) {
 | 
								if storage.IsTooLargeResourceVersion(err) {
 | 
				
			||||||
				fallback = "true"
 | 
									fallback = "true"
 | 
				
			||||||
 | 
									// Reset resourceVersion during fallback from consistent read.
 | 
				
			||||||
 | 
									opts.ResourceVersion = ""
 | 
				
			||||||
				err = c.storage.GetList(ctx, key, opts, listObj)
 | 
									err = c.storage.GetList(ctx, key, opts, listObj)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
@@ -258,3 +299,156 @@ func (c *CacheDelegator) ReadinessCheck() error {
 | 
				
			|||||||
func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
 | 
					func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
 | 
				
			||||||
	return c.storage.RequestWatchProgress(ctx)
 | 
						return c.storage.RequestWatchProgress(ctx)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *CacheDelegator) Stop() {
 | 
				
			||||||
 | 
						c.stopOnce.Do(func() {
 | 
				
			||||||
 | 
							close(c.stopCh)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						c.wg.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newConsistencyChecker(resourcePrefix string, newListFunc func() runtime.Object, cacher getListerReady, etcd getLister) *consistencyChecker {
 | 
				
			||||||
 | 
						return &consistencyChecker{
 | 
				
			||||||
 | 
							resourcePrefix: resourcePrefix,
 | 
				
			||||||
 | 
							newListFunc:    newListFunc,
 | 
				
			||||||
 | 
							cacher:         cacher,
 | 
				
			||||||
 | 
							etcd:           etcd,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type consistencyChecker struct {
 | 
				
			||||||
 | 
						resourcePrefix string
 | 
				
			||||||
 | 
						newListFunc    func() runtime.Object
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cacher getListerReady
 | 
				
			||||||
 | 
						etcd   getLister
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type getListerReady interface {
 | 
				
			||||||
 | 
						getLister
 | 
				
			||||||
 | 
						Ready() bool
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type getLister interface {
 | 
				
			||||||
 | 
						GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
 | 
				
			||||||
 | 
						err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
 | 
				
			||||||
 | 
							c.check(ctx)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							klog.InfoS("Cache consistency check exiting", "resource", c.resourcePrefix, "err", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *consistencyChecker) check(ctx context.Context) {
 | 
				
			||||||
 | 
						digests, err := c.calculateDigests(ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							klog.ErrorS(err, "Cache consistentency check error", "resource", c.resourcePrefix)
 | 
				
			||||||
 | 
							metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "error").Inc()
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if digests.CacheDigest == digests.EtcdDigest {
 | 
				
			||||||
 | 
							klog.V(3).InfoS("Cache consistentency check passed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "digest", digests.CacheDigest)
 | 
				
			||||||
 | 
							metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "success").Inc()
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							klog.ErrorS(nil, "Cache consistentency check failed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "etcdDigest", digests.EtcdDigest, "cacheDigest", digests.CacheDigest)
 | 
				
			||||||
 | 
							metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "failure").Inc()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
 | 
				
			||||||
 | 
						if !c.cacher.Ready() {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("cache is not ready")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						cacheDigest, resourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, storage.ListOptions{
 | 
				
			||||||
 | 
							ResourceVersion:      "0",
 | 
				
			||||||
 | 
							Predicate:            storage.Everything,
 | 
				
			||||||
 | 
							ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("failed calculating cache digest: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						etcdDigest, _, err := c.calculateStoreDigest(ctx, c.etcd, storage.ListOptions{
 | 
				
			||||||
 | 
							ResourceVersion:      resourceVersion,
 | 
				
			||||||
 | 
							Predicate:            storage.Everything,
 | 
				
			||||||
 | 
							ResourceVersionMatch: metav1.ResourceVersionMatchExact,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &storageDigest{
 | 
				
			||||||
 | 
							ResourceVersion: resourceVersion,
 | 
				
			||||||
 | 
							CacheDigest:     cacheDigest,
 | 
				
			||||||
 | 
							EtcdDigest:      etcdDigest,
 | 
				
			||||||
 | 
						}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type storageDigest struct {
 | 
				
			||||||
 | 
						ResourceVersion string
 | 
				
			||||||
 | 
						CacheDigest     string
 | 
				
			||||||
 | 
						EtcdDigest      string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, opts storage.ListOptions) (digest, rv string, err error) {
 | 
				
			||||||
 | 
						// TODO: Implement pagination
 | 
				
			||||||
 | 
						resp := c.newListFunc()
 | 
				
			||||||
 | 
						err = store.GetList(ctx, c.resourcePrefix, opts, resp)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return "", "", err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						digest, err = listDigest(resp)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return "", "", err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						list, err := meta.ListAccessor(resp)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return "", "", err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return digest, list.GetResourceVersion(), nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func listDigest(list runtime.Object) (string, error) {
 | 
				
			||||||
 | 
						h := fnv.New64()
 | 
				
			||||||
 | 
						err := meta.EachListItem(list, func(obj runtime.Object) error {
 | 
				
			||||||
 | 
							objectMeta, err := meta.Accessor(obj)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err = addObjectToDigest(h, objectMeta)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return "", err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return fmt.Sprintf("%x", h.Sum64()), nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func addObjectToDigest(h hash.Hash64, objectMeta metav1.Object) error {
 | 
				
			||||||
 | 
						_, err := h.Write([]byte(objectMeta.GetNamespace()))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = h.Write([]byte("/"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = h.Write([]byte(objectMeta.GetName()))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = h.Write([]byte("/"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = h.Write([]byte(objectMeta.GetResourceVersion()))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -0,0 +1,194 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2025 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package cacher
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/apis/example"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/storage"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestCalculateDigest(t *testing.T) {
 | 
				
			||||||
 | 
						newListFunc := func() runtime.Object { return &example.PodList{} }
 | 
				
			||||||
 | 
						testCases := []struct {
 | 
				
			||||||
 | 
							desc            string
 | 
				
			||||||
 | 
							resourceVersion string
 | 
				
			||||||
 | 
							cacherReady     bool
 | 
				
			||||||
 | 
							cacherItems     []example.Pod
 | 
				
			||||||
 | 
							etcdItems       []example.Pod
 | 
				
			||||||
 | 
							resourcePrefix  string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							expectListKey string
 | 
				
			||||||
 | 
							expectDigest  storageDigest
 | 
				
			||||||
 | 
							expectErr     bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "not ready",
 | 
				
			||||||
 | 
								cacherReady:     false,
 | 
				
			||||||
 | 
								resourceVersion: "1",
 | 
				
			||||||
 | 
								expectErr:       true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "empty",
 | 
				
			||||||
 | 
								resourceVersion: "1",
 | 
				
			||||||
 | 
								cacherReady:     true,
 | 
				
			||||||
 | 
								expectDigest: storageDigest{
 | 
				
			||||||
 | 
									ResourceVersion: "1",
 | 
				
			||||||
 | 
									CacheDigest:     "cbf29ce484222325",
 | 
				
			||||||
 | 
									EtcdDigest:      "cbf29ce484222325",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "with one element equal",
 | 
				
			||||||
 | 
								resourceVersion: "2",
 | 
				
			||||||
 | 
								cacherReady:     true,
 | 
				
			||||||
 | 
								cacherItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								etcdItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectDigest: storageDigest{
 | 
				
			||||||
 | 
									ResourceVersion: "2",
 | 
				
			||||||
 | 
									CacheDigest:     "86bf3a5e80d1c5cb",
 | 
				
			||||||
 | 
									EtcdDigest:      "86bf3a5e80d1c5cb",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "namespace changes digest",
 | 
				
			||||||
 | 
								resourceVersion: "2",
 | 
				
			||||||
 | 
								cacherReady:     true,
 | 
				
			||||||
 | 
								cacherItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "pod", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								etcdItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-public", Name: "pod", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectDigest: storageDigest{
 | 
				
			||||||
 | 
									ResourceVersion: "2",
 | 
				
			||||||
 | 
									CacheDigest:     "4ae4e750bd825b17",
 | 
				
			||||||
 | 
									EtcdDigest:      "f940a60af965b03",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "name changes digest",
 | 
				
			||||||
 | 
								resourceVersion: "2",
 | 
				
			||||||
 | 
								cacherReady:     true,
 | 
				
			||||||
 | 
								cacherItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod2", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								etcdItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod3", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectDigest: storageDigest{
 | 
				
			||||||
 | 
									ResourceVersion: "2",
 | 
				
			||||||
 | 
									CacheDigest:     "c9120494e4c1897d",
 | 
				
			||||||
 | 
									EtcdDigest:      "c9156494e4c46274",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "resourceVersion changes digest",
 | 
				
			||||||
 | 
								resourceVersion: "2",
 | 
				
			||||||
 | 
								cacherReady:     true,
 | 
				
			||||||
 | 
								cacherItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "3"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								etcdItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "pod", ResourceVersion: "4"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectDigest: storageDigest{
 | 
				
			||||||
 | 
									ResourceVersion: "2",
 | 
				
			||||||
 | 
									CacheDigest:     "86bf3a5e80d1c5ca",
 | 
				
			||||||
 | 
									EtcdDigest:      "86bf3a5e80d1c5cd",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								desc:            "watch missed write event",
 | 
				
			||||||
 | 
								resourceVersion: "3",
 | 
				
			||||||
 | 
								cacherReady:     true,
 | 
				
			||||||
 | 
								cacherItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								etcdItems: []example.Pod{
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "2"}},
 | 
				
			||||||
 | 
									{ObjectMeta: metav1.ObjectMeta{Namespace: "Default", Name: "pod", ResourceVersion: "3"}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectDigest: storageDigest{
 | 
				
			||||||
 | 
									ResourceVersion: "3",
 | 
				
			||||||
 | 
									CacheDigest:     "1859bac707c2cb2b",
 | 
				
			||||||
 | 
									EtcdDigest:      "11d147fc800df0e0",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, tc := range testCases {
 | 
				
			||||||
 | 
							t.Run(tc.desc, func(t *testing.T) {
 | 
				
			||||||
 | 
								etcd := &dummyStorage{
 | 
				
			||||||
 | 
									getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
 | 
				
			||||||
 | 
										if key != tc.expectListKey {
 | 
				
			||||||
 | 
											t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										if opts.ResourceVersion != tc.resourceVersion {
 | 
				
			||||||
 | 
											t.Fatalf("Expect GetList resourceVersion %q, got %q", tc.resourceVersion, opts.ResourceVersion)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										if opts.ResourceVersionMatch != metav1.ResourceVersionMatchExact {
 | 
				
			||||||
 | 
											t.Fatalf("Expect GetList match exact, got %q", opts.ResourceVersionMatch)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										podList := listObj.(*example.PodList)
 | 
				
			||||||
 | 
										podList.Items = tc.etcdItems
 | 
				
			||||||
 | 
										podList.ResourceVersion = tc.resourceVersion
 | 
				
			||||||
 | 
										return nil
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								cacher := &dummyCacher{
 | 
				
			||||||
 | 
									ready: tc.cacherReady,
 | 
				
			||||||
 | 
									dummyStorage: dummyStorage{
 | 
				
			||||||
 | 
										getListFn: func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
 | 
				
			||||||
 | 
											if key != tc.expectListKey {
 | 
				
			||||||
 | 
												t.Fatalf("Expect GetList key %q, got %q", tc.expectListKey, key)
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
											if opts.ResourceVersion != "0" {
 | 
				
			||||||
 | 
												t.Fatalf("Expect GetList resourceVersion 0, got %q", opts.ResourceVersion)
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
											if opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan {
 | 
				
			||||||
 | 
												t.Fatalf("Expect GetList match not older than, got %q", opts.ResourceVersionMatch)
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
											podList := listObj.(*example.PodList)
 | 
				
			||||||
 | 
											podList.Items = tc.cacherItems
 | 
				
			||||||
 | 
											podList.ResourceVersion = tc.resourceVersion
 | 
				
			||||||
 | 
											return nil
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								checker := newConsistencyChecker(tc.resourcePrefix, newListFunc, cacher, etcd)
 | 
				
			||||||
 | 
								digest, err := checker.calculateDigests(context.Background())
 | 
				
			||||||
 | 
								if (err != nil) != tc.expectErr {
 | 
				
			||||||
 | 
									t.Fatalf("Expect error: %v, got: %v", tc.expectErr, err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if *digest != tc.expectDigest {
 | 
				
			||||||
 | 
									t.Errorf("Expect: %+v Got: %+v", &tc.expectDigest, *digest)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -176,6 +176,14 @@ var (
 | 
				
			|||||||
			Help:           "Counter for consistent reads from cache.",
 | 
								Help:           "Counter for consistent reads from cache.",
 | 
				
			||||||
			StabilityLevel: compbasemetrics.ALPHA,
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
		}, []string{"resource", "success", "fallback"})
 | 
							}, []string{"resource", "success", "fallback"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						StorageConsistencyCheckTotal = compbasemetrics.NewCounterVec(
 | 
				
			||||||
 | 
							&compbasemetrics.CounterOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Name:           "storage_consistency_checks_total",
 | 
				
			||||||
 | 
								Help:           "Counter for status of consistency checks between etcd and watch cache",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.INTERNAL,
 | 
				
			||||||
 | 
							}, []string{"resource", "status"})
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var registerMetrics sync.Once
 | 
					var registerMetrics sync.Once
 | 
				
			||||||
@@ -198,6 +206,7 @@ func Register() {
 | 
				
			|||||||
		legacyregistry.MustRegister(WatchCacheInitializations)
 | 
							legacyregistry.MustRegister(WatchCacheInitializations)
 | 
				
			||||||
		legacyregistry.MustRegister(WatchCacheReadWait)
 | 
							legacyregistry.MustRegister(WatchCacheReadWait)
 | 
				
			||||||
		legacyregistry.MustRegister(ConsistentReadTotal)
 | 
							legacyregistry.MustRegister(ConsistentReadTotal)
 | 
				
			||||||
 | 
							legacyregistry.MustRegister(StorageConsistencyCheckTotal)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,16 +20,22 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
						"net/http"
 | 
				
			||||||
	"runtime"
 | 
						"runtime"
 | 
				
			||||||
	"slices"
 | 
						"slices"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dto "github.com/prometheus/client_model/go"
 | 
				
			||||||
 | 
						"github.com/prometheus/common/expfmt"
 | 
				
			||||||
	"github.com/prometheus/common/model"
 | 
						"github.com/prometheus/common/model"
 | 
				
			||||||
	admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
 | 
						admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/endpoints/metrics"
 | 
						"k8s.io/apiserver/pkg/endpoints/metrics"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/storage/cacher"
 | 
				
			||||||
	clientset "k8s.io/client-go/kubernetes"
 | 
						clientset "k8s.io/client-go/kubernetes"
 | 
				
			||||||
	restclient "k8s.io/client-go/rest"
 | 
						restclient "k8s.io/client-go/rest"
 | 
				
			||||||
	compbasemetrics "k8s.io/component-base/metrics"
 | 
						compbasemetrics "k8s.io/component-base/metrics"
 | 
				
			||||||
@@ -619,3 +625,108 @@ func sampleExistsInSamples(s *model.Sample, samples model.Samples) bool {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return false
 | 
						return false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchCacheConsistencyCheckMetrics(t *testing.T) {
 | 
				
			||||||
 | 
						period := time.Second
 | 
				
			||||||
 | 
						clean := overrideConsistencyCheckerTimings(period)
 | 
				
			||||||
 | 
						defer clean()
 | 
				
			||||||
 | 
						server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
 | 
				
			||||||
 | 
						defer server.TearDownFn()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						rt, err := restclient.TransportFor(server.ClientConfig)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						req, err := http.NewRequest(http.MethodGet, server.ClientConfig.Host+"/metrics", nil)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Do at least 2 scrape cycles to require 2 successes
 | 
				
			||||||
 | 
						delay := 2 * period
 | 
				
			||||||
 | 
						time.Sleep(delay)
 | 
				
			||||||
 | 
						resp, err := rt.RoundTrip(req)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer func() {
 | 
				
			||||||
 | 
							err := resp.Body.Close()
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
						statuses, err := parseConsistencyCheckMetric(resp.Body)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						resourceSuccesses := 0
 | 
				
			||||||
 | 
						for status, count := range statuses {
 | 
				
			||||||
 | 
							switch status.status {
 | 
				
			||||||
 | 
							case "success":
 | 
				
			||||||
 | 
								if count >= 2 {
 | 
				
			||||||
 | 
									resourceSuccesses++
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							case "failure":
 | 
				
			||||||
 | 
								t.Errorf("Failure checking consistency of resource %q", status.resource)
 | 
				
			||||||
 | 
							case "error":
 | 
				
			||||||
 | 
								t.Errorf("Error when checking consistency of resource %q", status.resource)
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								t.Errorf("Unknown status of resource %q, status: %q", status.resource, status.status)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if resourceSuccesses <= 10 {
 | 
				
			||||||
 | 
							t.Errorf("Expected at least 10 resources with success, got: %d", resourceSuccesses)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func overrideConsistencyCheckerTimings(period time.Duration) func() {
 | 
				
			||||||
 | 
						tmpPeriod := cacher.ConsistencyCheckPeriod
 | 
				
			||||||
 | 
						tmpEnabled := cacher.ConsistencyCheckerEnabled
 | 
				
			||||||
 | 
						cacher.ConsistencyCheckPeriod = period
 | 
				
			||||||
 | 
						cacher.ConsistencyCheckerEnabled = true
 | 
				
			||||||
 | 
						return func() {
 | 
				
			||||||
 | 
							cacher.ConsistencyCheckPeriod = tmpPeriod
 | 
				
			||||||
 | 
							cacher.ConsistencyCheckerEnabled = tmpEnabled
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func parseConsistencyCheckMetric(r io.Reader) (map[consistencyCheckStatus]float64, error) {
 | 
				
			||||||
 | 
						statuses := map[consistencyCheckStatus]float64{}
 | 
				
			||||||
 | 
						metric, err := parseMetric(r, "apiserver_storage_consistency_checks_total")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return statuses, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, m := range metric.GetMetric() {
 | 
				
			||||||
 | 
							status := consistencyCheckStatus{}
 | 
				
			||||||
 | 
							for _, label := range m.GetLabel() {
 | 
				
			||||||
 | 
								switch label.GetName() {
 | 
				
			||||||
 | 
								case "resource":
 | 
				
			||||||
 | 
									status.resource = label.GetValue()
 | 
				
			||||||
 | 
								case "status":
 | 
				
			||||||
 | 
									status.status = label.GetValue()
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									return statuses, fmt.Errorf("Unknown label: %v", label.GetName())
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							statuses[status] = m.GetCounter().GetValue()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return statuses, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type consistencyCheckStatus struct {
 | 
				
			||||||
 | 
						resource string
 | 
				
			||||||
 | 
						status   string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func parseMetric(r io.Reader, name string) (*dto.MetricFamily, error) {
 | 
				
			||||||
 | 
						var parser expfmt.TextParser
 | 
				
			||||||
 | 
						mfs, err := parser.TextToMetricFamilies(r)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for metricName, metric := range mfs {
 | 
				
			||||||
 | 
							if metricName == name {
 | 
				
			||||||
 | 
								return metric, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil, fmt.Errorf("Metric not found %q", name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user