mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			334 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2024 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package apiserver
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/runtime/schema"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apiserver/pkg/admission"
 | |
| 	genericfeatures "k8s.io/apiserver/pkg/features"
 | |
| 	genericapiserver "k8s.io/apiserver/pkg/server"
 | |
| 	"k8s.io/apiserver/pkg/server/healthz"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
 | |
| 	kubeexternalinformers "k8s.io/client-go/informers"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | |
| 	v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
 | |
| 	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
 | |
| 	aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
 | |
| 	aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
 | |
| 	apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
 | |
| 	informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
 | |
| 	"k8s.io/kube-aggregator/pkg/controllers/autoregister"
 | |
| 
 | |
| 	"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
 | |
| 	"k8s.io/kubernetes/pkg/controlplane/controller/crdregistration"
 | |
| )
 | |
| 
 | |
| func CreateAggregatorConfig(
 | |
| 	kubeAPIServerConfig genericapiserver.Config,
 | |
| 	commandOptions options.CompletedOptions,
 | |
| 	externalInformers kubeexternalinformers.SharedInformerFactory,
 | |
| 	serviceResolver aggregatorapiserver.ServiceResolver,
 | |
| 	proxyTransport *http.Transport,
 | |
| 	peerProxy utilpeerproxy.Interface,
 | |
| 	pluginInitializers []admission.PluginInitializer,
 | |
| ) (*aggregatorapiserver.Config, error) {
 | |
| 	// make a shallow copy to let us twiddle a few things
 | |
| 	// most of the config actually remains the same.  We only need to mess with a couple items related to the particulars of the aggregator
 | |
| 	genericConfig := kubeAPIServerConfig
 | |
| 	genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
 | |
| 	genericConfig.RESTOptionsGetter = nil
 | |
| 	// prevent generic API server from installing the OpenAPI handler. Aggregator server
 | |
| 	// has its own customized OpenAPI handler.
 | |
| 	genericConfig.SkipOpenAPIInstallation = true
 | |
| 
 | |
| 	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
 | |
| 		utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
 | |
| 		// Add StorageVersionPrecondition handler to aggregator-apiserver.
 | |
| 		// The handler will block write requests to built-in resources until the
 | |
| 		// target resources' storage versions are up-to-date.
 | |
| 		genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
 | |
| 	}
 | |
| 
 | |
| 	if peerProxy != nil {
 | |
| 		originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
 | |
| 		genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
 | |
| 			// Add peer proxy handler to aggregator-apiserver.
 | |
| 			// wrap the peer proxy handler first.
 | |
| 			apiHandler = peerProxy.WrapHandler(apiHandler)
 | |
| 			return originalHandlerChainBuilder(apiHandler, c)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// copy the etcd options so we don't mutate originals.
 | |
| 	// we assume that the etcd options have been completed already.  avoid messing with anything outside
 | |
| 	// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
 | |
| 	etcdOptions := *commandOptions.Etcd
 | |
| 	etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)
 | |
| 	etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
 | |
| 	etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
 | |
| 	if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// override MergedResourceConfig with aggregator defaults and registry
 | |
| 	if err := commandOptions.APIEnablement.ApplyTo(
 | |
| 		&genericConfig,
 | |
| 		aggregatorapiserver.DefaultAPIResourceConfigSource(),
 | |
| 		aggregatorscheme.Scheme); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	aggregatorConfig := &aggregatorapiserver.Config{
 | |
| 		GenericConfig: &genericapiserver.RecommendedConfig{
 | |
| 			Config:                genericConfig,
 | |
| 			SharedInformerFactory: externalInformers,
 | |
| 		},
 | |
| 		ExtraConfig: aggregatorapiserver.ExtraConfig{
 | |
| 			ProxyClientCertFile:       commandOptions.ProxyClientCertFile,
 | |
| 			ProxyClientKeyFile:        commandOptions.ProxyClientKeyFile,
 | |
| 			PeerAdvertiseAddress:      commandOptions.PeerAdvertiseAddress,
 | |
| 			ServiceResolver:           serviceResolver,
 | |
| 			ProxyTransport:            proxyTransport,
 | |
| 			RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	// we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
 | |
| 	aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
 | |
| 
 | |
| 	return aggregatorConfig, nil
 | |
| }
 | |
| 
 | |
| func CreateAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, crds apiextensionsinformers.CustomResourceDefinitionInformer, crdAPIEnabled bool, apiVersionPriorities map[schema.GroupVersion]APIServicePriority) (*aggregatorapiserver.APIAggregator, error) {
 | |
| 	aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// create controllers for auto-registration
 | |
| 	apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
 | |
| 	apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController, apiVersionPriorities)
 | |
| 
 | |
| 	type controller interface {
 | |
| 		Run(workers int, stopCh <-chan struct{})
 | |
| 		WaitForInitialSync()
 | |
| 	}
 | |
| 	var crdRegistrationController controller
 | |
| 	if crdAPIEnabled {
 | |
| 		crdRegistrationController = crdregistration.NewCRDRegistrationController(
 | |
| 			crds,
 | |
| 			autoRegistrationController)
 | |
| 	}
 | |
| 
 | |
| 	// Imbue all builtin group-priorities onto the aggregated discovery
 | |
| 	if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
 | |
| 		for gv, entry := range apiVersionPriorities {
 | |
| 			aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
 | |
| 		if crdAPIEnabled {
 | |
| 			go crdRegistrationController.Run(5, context.Done())
 | |
| 		}
 | |
| 		go func() {
 | |
| 			// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
 | |
| 			// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
 | |
| 			// we only need to do this if CRDs are enabled on this server.  We can't use discovery because we are the source for discovery.
 | |
| 			if crdAPIEnabled {
 | |
| 				klog.Infof("waiting for initial CRD sync...")
 | |
| 				crdRegistrationController.WaitForInitialSync()
 | |
| 				klog.Infof("initial CRD sync complete...")
 | |
| 			} else {
 | |
| 				klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
 | |
| 			}
 | |
| 			autoRegistrationController.Run(5, context.Done())
 | |
| 		}()
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
 | |
| 		makeAPIServiceAvailableHealthCheck(
 | |
| 			"autoregister-completion",
 | |
| 			apiServices,
 | |
| 			aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
 | |
| 		),
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return aggregatorServer, nil
 | |
| }
 | |
| 
 | |
| func makeAPIService(gv schema.GroupVersion, apiVersionPriorities map[schema.GroupVersion]APIServicePriority) *v1.APIService {
 | |
| 	apiServicePriority, ok := apiVersionPriorities[gv]
 | |
| 	if !ok {
 | |
| 		// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
 | |
| 		// being permanently stuck in the APIServices list.
 | |
| 		klog.Infof("Skipping APIService creation for %v", gv)
 | |
| 		return nil
 | |
| 	}
 | |
| 	return &v1.APIService{
 | |
| 		ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
 | |
| 		Spec: v1.APIServiceSpec{
 | |
| 			Group:                gv.Group,
 | |
| 			Version:              gv.Version,
 | |
| 			GroupPriorityMinimum: apiServicePriority.Group,
 | |
| 			VersionPriority:      apiServicePriority.Version,
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
 | |
| // once all of the specified services have been observed to be available at least once.
 | |
| func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker {
 | |
| 	// Track the auto-registered API services that have not been observed to be available yet
 | |
| 	pendingServiceNamesLock := &sync.RWMutex{}
 | |
| 	pendingServiceNames := sets.NewString()
 | |
| 	for _, service := range apiServices {
 | |
| 		pendingServiceNames.Insert(service.Name)
 | |
| 	}
 | |
| 
 | |
| 	// When an APIService in the list is seen as available, remove it from the pending list
 | |
| 	handleAPIServiceChange := func(service *v1.APIService) {
 | |
| 		pendingServiceNamesLock.Lock()
 | |
| 		defer pendingServiceNamesLock.Unlock()
 | |
| 		if !pendingServiceNames.Has(service.Name) {
 | |
| 			return
 | |
| 		}
 | |
| 		if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
 | |
| 			pendingServiceNames.Delete(service.Name)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Watch add/update events for APIServices
 | |
| 	apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ //nolint:errcheck // no way to return error
 | |
| 		AddFunc:    func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
 | |
| 		UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
 | |
| 	})
 | |
| 
 | |
| 	// Don't return healthy until the pending list is empty
 | |
| 	return healthz.NamedCheck(name, func(r *http.Request) error {
 | |
| 		pendingServiceNamesLock.RLock()
 | |
| 		defer pendingServiceNamesLock.RUnlock()
 | |
| 		if pendingServiceNames.Len() > 0 {
 | |
| 			return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // APIServicePriority defines group priority that is used in discovery. This controls
 | |
| // group position in the kubectl output.
 | |
| type APIServicePriority struct {
 | |
| 	// Group indicates the order of the group relative to other groups.
 | |
| 	Group int32
 | |
| 	// Version indicates the relative order of the Version inside of its group.
 | |
| 	Version int32
 | |
| }
 | |
| 
 | |
| // DefaultGenericAPIServicePriorities returns the APIService priorities for generic APIs
 | |
| func DefaultGenericAPIServicePriorities() map[schema.GroupVersion]APIServicePriority {
 | |
| 	// The proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
 | |
| 	// is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
 | |
| 	// This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
 | |
| 	// That ripples out every bit as far as you'd expect, so for 1.7 we'll include the list here instead of being built up during storage.
 | |
| 	return map[schema.GroupVersion]APIServicePriority{
 | |
| 		{Group: "", Version: "v1"}: {Group: 18000, Version: 1},
 | |
| 		// to my knowledge, nothing below here collides
 | |
| 		{Group: "events.k8s.io", Version: "v1"}:                      {Group: 17750, Version: 15},
 | |
| 		{Group: "events.k8s.io", Version: "v1beta1"}:                 {Group: 17750, Version: 5},
 | |
| 		{Group: "authentication.k8s.io", Version: "v1"}:              {Group: 17700, Version: 15},
 | |
| 		{Group: "authentication.k8s.io", Version: "v1beta1"}:         {Group: 17700, Version: 9},
 | |
| 		{Group: "authentication.k8s.io", Version: "v1alpha1"}:        {Group: 17700, Version: 1},
 | |
| 		{Group: "authorization.k8s.io", Version: "v1"}:               {Group: 17600, Version: 15},
 | |
| 		{Group: "certificates.k8s.io", Version: "v1"}:                {Group: 17300, Version: 15},
 | |
| 		{Group: "certificates.k8s.io", Version: "v1alpha1"}:          {Group: 17300, Version: 1},
 | |
| 		{Group: "rbac.authorization.k8s.io", Version: "v1"}:          {Group: 17000, Version: 15},
 | |
| 		{Group: "apiextensions.k8s.io", Version: "v1"}:               {Group: 16700, Version: 15},
 | |
| 		{Group: "admissionregistration.k8s.io", Version: "v1"}:       {Group: 16700, Version: 15},
 | |
| 		{Group: "admissionregistration.k8s.io", Version: "v1beta1"}:  {Group: 16700, Version: 12},
 | |
| 		{Group: "admissionregistration.k8s.io", Version: "v1alpha1"}: {Group: 16700, Version: 9},
 | |
| 		{Group: "coordination.k8s.io", Version: "v1"}:                {Group: 16500, Version: 15},
 | |
| 		{Group: "coordination.k8s.io", Version: "v1alpha1"}:          {Group: 16500, Version: 9},
 | |
| 		{Group: "discovery.k8s.io", Version: "v1"}:                   {Group: 16200, Version: 15},
 | |
| 		{Group: "discovery.k8s.io", Version: "v1beta1"}:              {Group: 16200, Version: 12},
 | |
| 		{Group: "flowcontrol.apiserver.k8s.io", Version: "v1"}:       {Group: 16100, Version: 21},
 | |
| 		{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta3"}:  {Group: 16100, Version: 18},
 | |
| 		{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2"}:  {Group: 16100, Version: 15},
 | |
| 		{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta1"}:  {Group: 16100, Version: 12},
 | |
| 		{Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1"}: {Group: 16100, Version: 9},
 | |
| 		{Group: "internal.apiserver.k8s.io", Version: "v1alpha1"}:    {Group: 16000, Version: 9},
 | |
| 		{Group: "resource.k8s.io", Version: "v1alpha3"}:              {Group: 15900, Version: 9},
 | |
| 		{Group: "storagemigration.k8s.io", Version: "v1alpha1"}:      {Group: 15800, Version: 9},
 | |
| 		// Append a new group to the end of the list if unsure.
 | |
| 		// You can use min(existing group)-100 as the initial value for a group.
 | |
| 		// Version can be set to 9 (to have space around) for a new group.
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration, apiVersionPriorities map[schema.GroupVersion]APIServicePriority) []*v1.APIService {
 | |
| 	apiServices := []*v1.APIService{}
 | |
| 
 | |
| 	for _, curr := range delegateAPIServer.ListedPaths() {
 | |
| 		if curr == "/api/v1" {
 | |
| 			apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"}, apiVersionPriorities)
 | |
| 			registration.AddAPIServiceToSyncOnStart(apiService)
 | |
| 			apiServices = append(apiServices, apiService)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if !strings.HasPrefix(curr, "/apis/") {
 | |
| 			continue
 | |
| 		}
 | |
| 		// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
 | |
| 		tokens := strings.Split(curr, "/")
 | |
| 		if len(tokens) != 4 {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]}, apiVersionPriorities)
 | |
| 		if apiService == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		registration.AddAPIServiceToSyncOnStart(apiService)
 | |
| 		apiServices = append(apiServices, apiService)
 | |
| 	}
 | |
| 
 | |
| 	return apiServices
 | |
| }
 | 
