mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Implement resilient watchcache initialization post-start-hook
This commit is contained in:
		@@ -145,6 +145,7 @@ func TestAddFlags(t *testing.T) {
 | 
			
		||||
				MaxMutatingRequestsInFlight:  200,
 | 
			
		||||
				RequestTimeout:               time.Duration(2) * time.Minute,
 | 
			
		||||
				MinRequestTimeout:            1800,
 | 
			
		||||
				StorageInitializationTimeout: time.Minute,
 | 
			
		||||
				JSONPatchMaxCopyBytes:        int64(3 * 1024 * 1024),
 | 
			
		||||
				MaxRequestBodyBytes:          int64(3 * 1024 * 1024),
 | 
			
		||||
				ComponentGlobalsRegistry:     componentGlobalsRegistry,
 | 
			
		||||
 
 | 
			
		||||
@@ -129,6 +129,7 @@ func TestAddFlags(t *testing.T) {
 | 
			
		||||
			MaxMutatingRequestsInFlight:  200,
 | 
			
		||||
			RequestTimeout:               time.Duration(2) * time.Minute,
 | 
			
		||||
			MinRequestTimeout:            1800,
 | 
			
		||||
			StorageInitializationTimeout: time.Minute,
 | 
			
		||||
			JSONPatchMaxCopyBytes:        int64(3 * 1024 * 1024),
 | 
			
		||||
			MaxRequestBodyBytes:          int64(3 * 1024 * 1024),
 | 
			
		||||
			ComponentGlobalsRegistry:     componentGlobalsRegistry,
 | 
			
		||||
 
 | 
			
		||||
@@ -245,6 +245,10 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) {
 | 
			
		||||
		s.GenericAPIServer.AddPostStartHookOrDie("storage-readiness", s.GenericAPIServer.StorageReadinessHook.Hook)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
 | 
			
		||||
		go legacytokentracking.NewController(client).Run(hookContext.StopCh)
 | 
			
		||||
		return nil
 | 
			
		||||
 
 | 
			
		||||
@@ -1275,6 +1275,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
 | 
			
		||||
 | 
			
		||||
	genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
			
		||||
 | 
			
		||||
	genericfeatures.WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta},
 | 
			
		||||
 | 
			
		||||
	genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
 | 
			
		||||
 | 
			
		||||
	genericfeatures.WatchList: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
 
 | 
			
		||||
@@ -290,6 +290,12 @@ const (
 | 
			
		||||
	// Enables support for watch bookmark events.
 | 
			
		||||
	WatchBookmark featuregate.Feature = "WatchBookmark"
 | 
			
		||||
 | 
			
		||||
	// owner: @wojtek-t
 | 
			
		||||
	// beta: v1.31
 | 
			
		||||
	//
 | 
			
		||||
	// Enables post-start-hook for storage readiness
 | 
			
		||||
	WatchCacheInitializationPostStartHook featuregate.Feature = "WatchCacheInitializationPostStartHook"
 | 
			
		||||
 | 
			
		||||
	// owner: @serathius
 | 
			
		||||
	// beta: 1.30
 | 
			
		||||
	// Enables watches without resourceVersion to be served from storage.
 | 
			
		||||
@@ -407,6 +413,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
 | 
			
		||||
 | 
			
		||||
	WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
			
		||||
 | 
			
		||||
	WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta},
 | 
			
		||||
 | 
			
		||||
	WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
 | 
			
		||||
 | 
			
		||||
	InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
 
 | 
			
		||||
@@ -226,6 +226,10 @@ type Store struct {
 | 
			
		||||
	// storageVersionHash as empty in the discovery document.
 | 
			
		||||
	StorageVersioner runtime.GroupVersioner
 | 
			
		||||
 | 
			
		||||
	// ReadinessCheckFunc checks if the storage is ready for accepting requests.
 | 
			
		||||
	// The field is optional, if set needs to be thread-safe.
 | 
			
		||||
	ReadinessCheckFunc func() error
 | 
			
		||||
 | 
			
		||||
	// DestroyFunc cleans up clients used by the underlying Storage; optional.
 | 
			
		||||
	// If set, DestroyFunc has to be implemented in thread-safe way and
 | 
			
		||||
	// be prepared for being called more than once.
 | 
			
		||||
@@ -234,6 +238,7 @@ type Store struct {
 | 
			
		||||
 | 
			
		||||
// Note: the rest.StandardStorage interface aggregates the common REST verbs
 | 
			
		||||
var _ rest.StandardStorage = &Store{}
 | 
			
		||||
var _ rest.StorageWithReadiness = &Store{}
 | 
			
		||||
var _ rest.TableConvertor = &Store{}
 | 
			
		||||
var _ GenericStore = &Store{}
 | 
			
		||||
 | 
			
		||||
@@ -292,6 +297,14 @@ func (e *Store) New() runtime.Object {
 | 
			
		||||
	return e.NewFunc()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadinessCheck checks if the storage is ready for accepting requests.
 | 
			
		||||
func (e *Store) ReadinessCheck() error {
 | 
			
		||||
	if e.ReadinessCheckFunc != nil {
 | 
			
		||||
		return e.ReadinessCheckFunc()
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Destroy cleans up its resources on shutdown.
 | 
			
		||||
func (e *Store) Destroy() {
 | 
			
		||||
	if e.DestroyFunc != nil {
 | 
			
		||||
@@ -1614,6 +1627,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if e.Storage.Storage != nil {
 | 
			
		||||
		e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -52,6 +52,8 @@ import (
 | 
			
		||||
// Storage is a generic interface for RESTful storage services.
 | 
			
		||||
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
 | 
			
		||||
// that objects may implement any of the below interfaces.
 | 
			
		||||
//
 | 
			
		||||
// Consider using StorageWithReadiness whenever possible.
 | 
			
		||||
type Storage interface {
 | 
			
		||||
	// New returns an empty object that can be used with Create and Update after request data has been put into it.
 | 
			
		||||
	// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
 | 
			
		||||
@@ -63,6 +65,14 @@ type Storage interface {
 | 
			
		||||
	Destroy()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// StorageWithReadiness extends Storage interface with the readiness check.
 | 
			
		||||
type StorageWithReadiness interface {
 | 
			
		||||
	Storage
 | 
			
		||||
 | 
			
		||||
	// ReadinessCheck allows for checking storage readiness.
 | 
			
		||||
	ReadinessCheck() error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Scoper indicates what scope the resource is at. It must be specified.
 | 
			
		||||
// It is usually provided automatically based on your strategy.
 | 
			
		||||
type Scoper interface {
 | 
			
		||||
 
 | 
			
		||||
@@ -216,6 +216,10 @@ type Config struct {
 | 
			
		||||
	// twice this value.  Note that it is up to the request handlers to ignore or honor this timeout. In seconds.
 | 
			
		||||
	MinRequestTimeout int
 | 
			
		||||
 | 
			
		||||
	// StorageInitializationTimeout defines the maximum amount of time to wait for storage initialization
 | 
			
		||||
	// before declaring apiserver ready.
 | 
			
		||||
	StorageInitializationTimeout time.Duration
 | 
			
		||||
 | 
			
		||||
	// This represents the maximum amount of time it should take for apiserver to complete its startup
 | 
			
		||||
	// sequence and become healthy. From apiserver's start time to when this amount of time has
 | 
			
		||||
	// elapsed, /livez will assume that unfinished post-start hooks will complete successfully and
 | 
			
		||||
@@ -426,6 +430,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
 | 
			
		||||
		MaxMutatingRequestsInFlight:    200,
 | 
			
		||||
		RequestTimeout:                 time.Duration(60) * time.Second,
 | 
			
		||||
		MinRequestTimeout:              1800,
 | 
			
		||||
		StorageInitializationTimeout:   time.Minute,
 | 
			
		||||
		LivezGracePeriod:               time.Duration(0),
 | 
			
		||||
		ShutdownDelayDuration:          time.Duration(0),
 | 
			
		||||
		// 1.5MB is the default client request size in bytes
 | 
			
		||||
@@ -824,6 +829,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
 | 
			
		||||
		ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,
 | 
			
		||||
 | 
			
		||||
		APIServerID:           c.APIServerID,
 | 
			
		||||
		StorageReadinessHook:  NewStorageReadinessHook(c.StorageInitializationTimeout),
 | 
			
		||||
		StorageVersionManager: c.StorageVersionManager,
 | 
			
		||||
 | 
			
		||||
		EffectiveVersion: c.EffectiveVersion,
 | 
			
		||||
 
 | 
			
		||||
@@ -233,6 +233,10 @@ type GenericAPIServer struct {
 | 
			
		||||
	// APIServerID is the ID of this API server
 | 
			
		||||
	APIServerID string
 | 
			
		||||
 | 
			
		||||
	// StorageReadinessHook implements post-start-hook functionality for checking readiness
 | 
			
		||||
	// of underlying storage for registered resources.
 | 
			
		||||
	StorageReadinessHook *StorageReadinessHook
 | 
			
		||||
 | 
			
		||||
	// StorageVersionManager holds the storage versions of the API resources installed by this server.
 | 
			
		||||
	StorageVersionManager storageversion.Manager
 | 
			
		||||
 | 
			
		||||
@@ -844,6 +848,7 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
 | 
			
		||||
	} else {
 | 
			
		||||
		s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService())
 | 
			
		||||
	}
 | 
			
		||||
	s.registerStorageReadinessCheck("", apiGroupInfo)
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -902,10 +907,28 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro
 | 
			
		||||
 | 
			
		||||
		s.DiscoveryGroupManager.AddGroup(apiGroup)
 | 
			
		||||
		s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
 | 
			
		||||
		s.registerStorageReadinessCheck(apiGroupInfo.PrioritizedVersions[0].Group, apiGroupInfo)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// registerStorageReadinessCheck registers the readiness checks for all underlying storages
 | 
			
		||||
// for a given APIGroup.
 | 
			
		||||
func (s *GenericAPIServer) registerStorageReadinessCheck(groupName string, apiGroupInfo *APIGroupInfo) {
 | 
			
		||||
	for version, storageMap := range apiGroupInfo.VersionedResourcesStorageMap {
 | 
			
		||||
		for resource, storage := range storageMap {
 | 
			
		||||
			if withReadiness, ok := storage.(rest.StorageWithReadiness); ok {
 | 
			
		||||
				gvr := metav1.GroupVersionResource{
 | 
			
		||||
					Group:    groupName,
 | 
			
		||||
					Version:  version,
 | 
			
		||||
					Resource: resource,
 | 
			
		||||
				}
 | 
			
		||||
				s.StorageReadinessHook.RegisterStorage(gvr, withReadiness)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// InstallAPIGroup exposes the given api group in the API.
 | 
			
		||||
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
 | 
			
		||||
// underlying storage will be destroyed on this servers shutdown.
 | 
			
		||||
 
 | 
			
		||||
@@ -56,6 +56,7 @@ type ServerRunOptions struct {
 | 
			
		||||
	GoawayChance                 float64
 | 
			
		||||
	LivezGracePeriod             time.Duration
 | 
			
		||||
	MinRequestTimeout            int
 | 
			
		||||
	StorageInitializationTimeout time.Duration
 | 
			
		||||
	ShutdownDelayDuration        time.Duration
 | 
			
		||||
	// We intentionally did not add a flag for this option. Users of the
 | 
			
		||||
	// apiserver library can wire it to a flag.
 | 
			
		||||
@@ -116,6 +117,7 @@ func NewServerRunOptionsForComponent(componentName string, componentGlobalsRegis
 | 
			
		||||
		RequestTimeout:                      defaults.RequestTimeout,
 | 
			
		||||
		LivezGracePeriod:                    defaults.LivezGracePeriod,
 | 
			
		||||
		MinRequestTimeout:                   defaults.MinRequestTimeout,
 | 
			
		||||
		StorageInitializationTimeout:        defaults.StorageInitializationTimeout,
 | 
			
		||||
		ShutdownDelayDuration:               defaults.ShutdownDelayDuration,
 | 
			
		||||
		ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod,
 | 
			
		||||
		JSONPatchMaxCopyBytes:               defaults.JSONPatchMaxCopyBytes,
 | 
			
		||||
@@ -140,6 +142,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
 | 
			
		||||
	c.RequestTimeout = s.RequestTimeout
 | 
			
		||||
	c.GoawayChance = s.GoawayChance
 | 
			
		||||
	c.MinRequestTimeout = s.MinRequestTimeout
 | 
			
		||||
	c.StorageInitializationTimeout = s.StorageInitializationTimeout
 | 
			
		||||
	c.ShutdownDelayDuration = s.ShutdownDelayDuration
 | 
			
		||||
	c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
 | 
			
		||||
	c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
 | 
			
		||||
@@ -197,6 +200,10 @@ func (s *ServerRunOptions) Validate() []error {
 | 
			
		||||
		errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.StorageInitializationTimeout < 0 {
 | 
			
		||||
		errors = append(errors, fmt.Errorf("--storage-initialization-timeout can not be negative value"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.ShutdownDelayDuration < 0 {
 | 
			
		||||
		errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value"))
 | 
			
		||||
	}
 | 
			
		||||
@@ -350,6 +357,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
 | 
			
		||||
		"handler, which picks a randomized value above this number as the connection timeout, "+
 | 
			
		||||
		"to spread out load.")
 | 
			
		||||
 | 
			
		||||
	fs.DurationVar(&s.StorageInitializationTimeout, "storage-initialization-timeout", s.StorageInitializationTimeout,
 | 
			
		||||
		"Maximum amount of time to wait for storage initialization before declaring apiserver ready. Defaults to 1m.")
 | 
			
		||||
 | 
			
		||||
	fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+
 | 
			
		||||
		"Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+
 | 
			
		||||
		"will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,91 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2024 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package server
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/registry/rest"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// StorageReadinessHook implements PostStartHook functionality for checking readiness
 | 
			
		||||
// of underlying storage for registered resources.
 | 
			
		||||
type StorageReadinessHook struct {
 | 
			
		||||
	timeout time.Duration
 | 
			
		||||
 | 
			
		||||
	lock   sync.Mutex
 | 
			
		||||
	checks map[string]func() error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewStorageReadinessHook created new StorageReadinessHook.
 | 
			
		||||
func NewStorageReadinessHook(timeout time.Duration) *StorageReadinessHook {
 | 
			
		||||
	return &StorageReadinessHook{
 | 
			
		||||
		checks:  make(map[string]func() error),
 | 
			
		||||
		timeout: timeout,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *StorageReadinessHook) RegisterStorage(gvr metav1.GroupVersionResource, storage rest.StorageWithReadiness) {
 | 
			
		||||
	h.lock.Lock()
 | 
			
		||||
	defer h.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	if _, ok := h.checks[gvr.String()]; !ok {
 | 
			
		||||
		h.checks[gvr.String()] = storage.ReadinessCheck
 | 
			
		||||
	} else {
 | 
			
		||||
		klog.Errorf("Registering storage readiness hook for %s again: ", gvr.String())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *StorageReadinessHook) check() bool {
 | 
			
		||||
	h.lock.Lock()
 | 
			
		||||
	defer h.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	failedChecks := []string{}
 | 
			
		||||
	for gvr, check := range h.checks {
 | 
			
		||||
		if err := check(); err != nil {
 | 
			
		||||
			failedChecks = append(failedChecks, gvr)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(failedChecks) == 0 {
 | 
			
		||||
		klog.Infof("Storage is ready for all registered resources")
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	klog.V(4).Infof("Storage is not ready for: %v", failedChecks)
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *StorageReadinessHook) Hook(ctx PostStartHookContext) error {
 | 
			
		||||
	deadlineCtx, cancel := context.WithTimeout(ctx, h.timeout)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
	err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true,
 | 
			
		||||
		func(_ context.Context) (bool, error) {
 | 
			
		||||
			if ok := h.check(); ok {
 | 
			
		||||
				return true, nil
 | 
			
		||||
			}
 | 
			
		||||
			return false, nil
 | 
			
		||||
		})
 | 
			
		||||
	if errors.Is(err, context.DeadlineExceeded) {
 | 
			
		||||
		klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring")
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,85 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2024 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package server
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type fakeReadinessStorage struct {
 | 
			
		||||
	result error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *fakeReadinessStorage) New() runtime.Object   { return nil }
 | 
			
		||||
func (s *fakeReadinessStorage) Destroy()              {}
 | 
			
		||||
func (s *fakeReadinessStorage) ReadinessCheck() error { return s.result }
 | 
			
		||||
 | 
			
		||||
func testGVR(index int) metav1.GroupVersionResource {
 | 
			
		||||
	return metav1.GroupVersionResource{
 | 
			
		||||
		Group:    "group",
 | 
			
		||||
		Version:  "version",
 | 
			
		||||
		Resource: fmt.Sprintf("resource-%d", index),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestStorageReadinessHook(t *testing.T) {
 | 
			
		||||
	h := NewStorageReadinessHook(time.Second)
 | 
			
		||||
 | 
			
		||||
	numChecks := 5
 | 
			
		||||
	storages := make([]*fakeReadinessStorage, numChecks)
 | 
			
		||||
	for i := 0; i < numChecks; i++ {
 | 
			
		||||
		storages[i] = &fakeReadinessStorage{
 | 
			
		||||
			result: fmt.Errorf("failed"),
 | 
			
		||||
		}
 | 
			
		||||
		h.RegisterStorage(testGVR(i), storages[i])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < numChecks; i++ {
 | 
			
		||||
		if ok := h.check(); ok {
 | 
			
		||||
			t.Errorf("%d: unexpected check pass", i)
 | 
			
		||||
		}
 | 
			
		||||
		storages[i].result = nil
 | 
			
		||||
	}
 | 
			
		||||
	if ok := h.check(); !ok {
 | 
			
		||||
		t.Errorf("unexpected check failure")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestStorageReadinessHookTimeout(t *testing.T) {
 | 
			
		||||
	h := NewStorageReadinessHook(time.Second)
 | 
			
		||||
 | 
			
		||||
	storage := &fakeReadinessStorage{
 | 
			
		||||
		result: fmt.Errorf("failed"),
 | 
			
		||||
	}
 | 
			
		||||
	h.RegisterStorage(testGVR(0), storage)
 | 
			
		||||
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	hookCtx := PostStartHookContext{
 | 
			
		||||
		LoopbackClientConfig: nil,
 | 
			
		||||
		StopCh:               ctx.Done(),
 | 
			
		||||
		Context:              ctx,
 | 
			
		||||
	}
 | 
			
		||||
	if err := h.Hook(hookCtx); err != nil {
 | 
			
		||||
		t.Errorf("unexpected hook failure on timeout")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -962,6 +962,14 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
 | 
			
		||||
	return c.storage.Count(pathPrefix)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadinessCheck implements storage.Interface.
 | 
			
		||||
func (c *Cacher) ReadinessCheck() error {
 | 
			
		||||
	if !c.ready.check() {
 | 
			
		||||
		return storage.ErrStorageNotReady
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// baseObjectThreadUnsafe omits locking for cachingObject.
 | 
			
		||||
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
 | 
			
		||||
	if co, ok := object.(*cachingObject); ok {
 | 
			
		||||
 
 | 
			
		||||
@@ -181,6 +181,9 @@ func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.O
 | 
			
		||||
func (d *dummyStorage) Count(_ string) (int64, error) {
 | 
			
		||||
	return 0, fmt.Errorf("unimplemented")
 | 
			
		||||
}
 | 
			
		||||
func (d *dummyStorage) ReadinessCheck() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (d *dummyStorage) injectError(err error) {
 | 
			
		||||
	d.Lock()
 | 
			
		||||
	defer d.Unlock()
 | 
			
		||||
 
 | 
			
		||||
@@ -25,7 +25,10 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/validation/field"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
 | 
			
		||||
var (
 | 
			
		||||
	ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
 | 
			
		||||
	ErrStorageNotReady            = errors.New("storage not ready")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	ErrCodeKeyNotFound int = iota + 1
 | 
			
		||||
 
 | 
			
		||||
@@ -591,6 +591,11 @@ func (s *store) Count(key string) (int64, error) {
 | 
			
		||||
	return getResp.Count, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadinessCheck implements storage.Interface.
 | 
			
		||||
func (s *store) ReadinessCheck() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request.
 | 
			
		||||
func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) {
 | 
			
		||||
	var withRev int64
 | 
			
		||||
 
 | 
			
		||||
@@ -243,6 +243,9 @@ type Interface interface {
 | 
			
		||||
	// Count returns number of different entries under the key (generally being path prefix).
 | 
			
		||||
	Count(key string) (int64, error)
 | 
			
		||||
 | 
			
		||||
	// ReadinessCheck checks if the storage is ready for accepting requests.
 | 
			
		||||
	ReadinessCheck() error
 | 
			
		||||
 | 
			
		||||
	// RequestWatchProgress requests the a watch stream progress status be sent in the
 | 
			
		||||
	// watch response stream as soon as possible.
 | 
			
		||||
	// Used for monitor watch progress even if watching resources with no changes.
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,8 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	apiserverfeatures "k8s.io/apiserver/pkg/features"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	restclient "k8s.io/client-go/rest"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
			
		||||
@@ -126,6 +128,13 @@ var _ = SIGDescribe("health handlers", func() {
 | 
			
		||||
	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
 | 
			
		||||
 | 
			
		||||
	ginkgo.It("should contain necessary checks", func(ctx context.Context) {
 | 
			
		||||
		if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) {
 | 
			
		||||
			storageReadinessCheck := "[+]poststarthook/storage-readiness ok"
 | 
			
		||||
			requiredHealthzChecks.Insert(storageReadinessCheck)
 | 
			
		||||
			requiredLivezChecks.Insert(storageReadinessCheck)
 | 
			
		||||
			requiredReadyzChecks.Insert(storageReadinessCheck)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		ginkgo.By("/health")
 | 
			
		||||
		err := testPath(ctx, f.ClientSet, "/healthz?verbose=1", requiredHealthzChecks)
 | 
			
		||||
		framework.ExpectNoError(err)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user