mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 02:08:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			655 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			655 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2019 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package endpointslice
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"time"
 | |
| 
 | |
| 	"golang.org/x/time/rate"
 | |
| 
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	discovery "k8s.io/api/discovery/v1"
 | |
| 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	coreinformers "k8s.io/client-go/informers/core/v1"
 | |
| 	discoveryinformers "k8s.io/client-go/informers/discovery/v1"
 | |
| 	clientset "k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/kubernetes/scheme"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	corelisters "k8s.io/client-go/listers/core/v1"
 | |
| 	discoverylisters "k8s.io/client-go/listers/discovery/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	"k8s.io/client-go/util/workqueue"
 | |
| 	endpointslicerec "k8s.io/endpointslice"
 | |
| 	endpointslicemetrics "k8s.io/endpointslice/metrics"
 | |
| 	"k8s.io/endpointslice/topologycache"
 | |
| 	endpointsliceutil "k8s.io/endpointslice/util"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| 	endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice"
 | |
| 	"k8s.io/kubernetes/pkg/features"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// maxRetries is the number of times a service will be retried before it is
 | |
| 	// dropped out of the queue. Any sync error, such as a failure to create or
 | |
| 	// update an EndpointSlice could trigger a retry. With the current
 | |
| 	// rate-limiter in use (1s*2^(numRetries-1)) the following numbers represent
 | |
| 	// the sequence of delays between successive queuings of a service.
 | |
| 	//
 | |
| 	// 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1000s (max)
 | |
| 	maxRetries = 15
 | |
| 
 | |
| 	// endpointSliceChangeMinSyncDelay indicates the minimum delay before
 | |
| 	// queuing a syncService call after an EndpointSlice changes. If
 | |
| 	// endpointUpdatesBatchPeriod is greater than this value, it will be used
 | |
| 	// instead. This helps batch processing of changes to multiple
 | |
| 	// EndpointSlices.
 | |
| 	endpointSliceChangeMinSyncDelay = 1 * time.Second
 | |
| 
 | |
| 	// defaultSyncBackOff is the default backoff period for syncService calls.
 | |
| 	defaultSyncBackOff = 1 * time.Second
 | |
| 	// maxSyncBackOff is the max backoff period for syncService calls.
 | |
| 	maxSyncBackOff = 1000 * time.Second
 | |
| 
 | |
| 	// ControllerName is a unique value used with LabelManagedBy to indicated
 | |
| 	// the component managing an EndpointSlice.
 | |
| 	ControllerName = "endpointslice-controller.k8s.io"
 | |
| 
 | |
| 	// topologyQueueItemKey is the key for all items in the topologyQueue.
 | |
| 	topologyQueueItemKey = "topologyQueueItemKey"
 | |
| )
 | |
| 
 | |
| // NewController creates and initializes a new Controller
 | |
| func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
 | |
| 	serviceInformer coreinformers.ServiceInformer,
 | |
| 	nodeInformer coreinformers.NodeInformer,
 | |
| 	endpointSliceInformer discoveryinformers.EndpointSliceInformer,
 | |
| 	maxEndpointsPerSlice int32,
 | |
| 	client clientset.Interface,
 | |
| 	endpointUpdatesBatchPeriod time.Duration,
 | |
| ) *Controller {
 | |
| 	broadcaster := record.NewBroadcaster(record.WithContext(ctx))
 | |
| 	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
 | |
| 
 | |
| 	endpointslicemetrics.RegisterMetrics()
 | |
| 
 | |
| 	c := &Controller{
 | |
| 		client: client,
 | |
| 		// This is similar to the DefaultControllerRateLimiter, just with a
 | |
| 		// significantly higher default backoff (1s vs 5ms). This controller
 | |
| 		// processes events that can require significant EndpointSlice changes,
 | |
| 		// such as an update to a Service or Deployment. A more significant
 | |
| 		// rate limit back off here helps ensure that the Controller does not
 | |
| 		// overwhelm the API Server.
 | |
| 		serviceQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
 | |
| 			workqueue.NewTypedMaxOfRateLimiter(
 | |
| 				workqueue.NewTypedItemExponentialFailureRateLimiter[string](defaultSyncBackOff, maxSyncBackOff),
 | |
| 				// 10 qps, 100 bucket size. This is only for retry speed and its
 | |
| 				// only the overall factor (not per item).
 | |
| 				&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
 | |
| 			),
 | |
| 			workqueue.TypedRateLimitingQueueConfig[string]{
 | |
| 				Name: "endpoint_slice",
 | |
| 			},
 | |
| 		),
 | |
| 		topologyQueue: workqueue.NewTypedRateLimitingQueue[string](
 | |
| 			workqueue.DefaultTypedControllerRateLimiter[string](),
 | |
| 		),
 | |
| 		workerLoopPeriod: time.Second,
 | |
| 	}
 | |
| 
 | |
| 	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc: c.onServiceUpdate,
 | |
| 		UpdateFunc: func(old, cur interface{}) {
 | |
| 			c.onServiceUpdate(cur)
 | |
| 		},
 | |
| 		DeleteFunc: c.onServiceDelete,
 | |
| 	})
 | |
| 	c.serviceLister = serviceInformer.Lister()
 | |
| 	c.servicesSynced = serviceInformer.Informer().HasSynced
 | |
| 
 | |
| 	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    c.addPod,
 | |
| 		UpdateFunc: c.updatePod,
 | |
| 		DeleteFunc: c.deletePod,
 | |
| 	})
 | |
| 	c.podLister = podInformer.Lister()
 | |
| 	c.podsSynced = podInformer.Informer().HasSynced
 | |
| 
 | |
| 	c.nodeLister = nodeInformer.Lister()
 | |
| 	c.nodesSynced = nodeInformer.Informer().HasSynced
 | |
| 
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc: c.onEndpointSliceAdd,
 | |
| 		UpdateFunc: func(oldObj, newObj interface{}) {
 | |
| 			c.onEndpointSliceUpdate(logger, oldObj, newObj)
 | |
| 		},
 | |
| 		DeleteFunc: c.onEndpointSliceDelete,
 | |
| 	})
 | |
| 
 | |
| 	c.endpointSliceLister = endpointSliceInformer.Lister()
 | |
| 	c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
 | |
| 	c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()
 | |
| 
 | |
| 	c.maxEndpointsPerSlice = maxEndpointsPerSlice
 | |
| 
 | |
| 	c.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
 | |
| 
 | |
| 	c.eventBroadcaster = broadcaster
 | |
| 	c.eventRecorder = recorder
 | |
| 
 | |
| 	c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
 | |
| 
 | |
| 	if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
 | |
| 		nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: func(_ interface{}) {
 | |
| 				c.addNode()
 | |
| 			},
 | |
| 			UpdateFunc: func(oldObj, newObj interface{}) {
 | |
| 				c.updateNode(oldObj, newObj)
 | |
| 			},
 | |
| 			DeleteFunc: func(_ interface{}) {
 | |
| 				c.deleteNode()
 | |
| 			},
 | |
| 		})
 | |
| 
 | |
| 		c.topologyCache = topologycache.NewTopologyCache()
 | |
| 	}
 | |
| 
 | |
| 	c.reconciler = endpointslicerec.NewReconciler(
 | |
| 		c.client,
 | |
| 		c.nodeLister,
 | |
| 		c.maxEndpointsPerSlice,
 | |
| 		c.endpointSliceTracker,
 | |
| 		c.topologyCache,
 | |
| 		c.eventRecorder,
 | |
| 		ControllerName,
 | |
| 		endpointslicerec.WithTrafficDistributionEnabled(utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution)),
 | |
| 	)
 | |
| 
 | |
| 	return c
 | |
| }
 | |
| 
 | |
| // Controller manages selector-based service endpoint slices
 | |
| type Controller struct {
 | |
| 	client           clientset.Interface
 | |
| 	eventBroadcaster record.EventBroadcaster
 | |
| 	eventRecorder    record.EventRecorder
 | |
| 
 | |
| 	// serviceLister is able to list/get services and is populated by the
 | |
| 	// shared informer passed to NewController
 | |
| 	serviceLister corelisters.ServiceLister
 | |
| 	// servicesSynced returns true if the service shared informer has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	servicesSynced cache.InformerSynced
 | |
| 
 | |
| 	// podLister is able to list/get pods and is populated by the
 | |
| 	// shared informer passed to NewController
 | |
| 	podLister corelisters.PodLister
 | |
| 	// podsSynced returns true if the pod shared informer has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	podsSynced cache.InformerSynced
 | |
| 
 | |
| 	// endpointSliceLister is able to list/get endpoint slices and is populated by the
 | |
| 	// shared informer passed to NewController
 | |
| 	endpointSliceLister discoverylisters.EndpointSliceLister
 | |
| 	// endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	endpointSlicesSynced cache.InformerSynced
 | |
| 	// endpointSliceTracker tracks the list of EndpointSlices and associated
 | |
| 	// resource versions expected for each Service. It can help determine if a
 | |
| 	// cached EndpointSlice is out of date.
 | |
| 	endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
 | |
| 
 | |
| 	// nodeLister is able to list/get nodes and is populated by the
 | |
| 	// shared informer passed to NewController
 | |
| 	nodeLister corelisters.NodeLister
 | |
| 	// nodesSynced returns true if the node shared informer has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	nodesSynced cache.InformerSynced
 | |
| 
 | |
| 	// reconciler is an util used to reconcile EndpointSlice changes.
 | |
| 	reconciler *endpointslicerec.Reconciler
 | |
| 
 | |
| 	// triggerTimeTracker is an util used to compute and export the
 | |
| 	// EndpointsLastChangeTriggerTime annotation.
 | |
| 	triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
 | |
| 
 | |
| 	// Services that need to be updated. A channel is inappropriate here,
 | |
| 	// because it allows services with lots of pods to be serviced much
 | |
| 	// more often than services with few pods; it also would cause a
 | |
| 	// service that's inserted multiple times to be processed more than
 | |
| 	// necessary.
 | |
| 	serviceQueue workqueue.TypedRateLimitingInterface[string]
 | |
| 
 | |
| 	// topologyQueue is used to trigger a topology cache update and checking node
 | |
| 	// topology distribution.
 | |
| 	topologyQueue workqueue.TypedRateLimitingInterface[string]
 | |
| 
 | |
| 	// maxEndpointsPerSlice references the maximum number of endpoints that
 | |
| 	// should be added to an EndpointSlice
 | |
| 	maxEndpointsPerSlice int32
 | |
| 
 | |
| 	// workerLoopPeriod is the time between worker runs. The workers
 | |
| 	// process the queue of service and pod changes
 | |
| 	workerLoopPeriod time.Duration
 | |
| 
 | |
| 	// endpointUpdatesBatchPeriod is an artificial delay added to all service syncs triggered by pod changes.
 | |
| 	// This can be used to reduce overall number of all endpoint slice updates.
 | |
| 	endpointUpdatesBatchPeriod time.Duration
 | |
| 
 | |
| 	// topologyCache tracks the distribution of Nodes and endpoints across zones
 | |
| 	// to enable TopologyAwareHints.
 | |
| 	topologyCache *topologycache.TopologyCache
 | |
| }
 | |
| 
 | |
| // Run will not return until stopCh is closed.
 | |
| func (c *Controller) Run(ctx context.Context, workers int) {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 
 | |
| 	// Start events processing pipeline.
 | |
| 	c.eventBroadcaster.StartLogging(klog.Infof)
 | |
| 	c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
 | |
| 	defer c.eventBroadcaster.Shutdown()
 | |
| 
 | |
| 	defer c.serviceQueue.ShutDown()
 | |
| 	defer c.topologyQueue.ShutDown()
 | |
| 
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	logger.Info("Starting endpoint slice controller")
 | |
| 	defer logger.Info("Shutting down endpoint slice controller")
 | |
| 
 | |
| 	if !cache.WaitForNamedCacheSync("endpoint_slice", ctx.Done(), c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	logger.V(2).Info("Starting service queue worker threads", "total", workers)
 | |
| 	for i := 0; i < workers; i++ {
 | |
| 		go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
 | |
| 	}
 | |
| 	logger.V(2).Info("Starting topology queue worker threads", "total", 1)
 | |
| 	go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
 | |
| 
 | |
| 	<-ctx.Done()
 | |
| }
 | |
| 
 | |
| // serviceQueueWorker runs a worker thread that just dequeues items, processes
 | |
| // them, and marks them done. You may run as many of these in parallel as you
 | |
| // wish; the workqueue guarantees that they will not end up processing the same
 | |
| // service at the same time
 | |
| func (c *Controller) serviceQueueWorker(logger klog.Logger) {
 | |
| 	for c.processNextServiceWorkItem(logger) {
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Controller) processNextServiceWorkItem(logger klog.Logger) bool {
 | |
| 	cKey, quit := c.serviceQueue.Get()
 | |
| 	if quit {
 | |
| 		return false
 | |
| 	}
 | |
| 	defer c.serviceQueue.Done(cKey)
 | |
| 
 | |
| 	err := c.syncService(logger, cKey)
 | |
| 	c.handleErr(logger, err, cKey)
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (c *Controller) topologyQueueWorker(logger klog.Logger) {
 | |
| 	for c.processNextTopologyWorkItem(logger) {
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Controller) processNextTopologyWorkItem(logger klog.Logger) bool {
 | |
| 	key, quit := c.topologyQueue.Get()
 | |
| 	if quit {
 | |
| 		return false
 | |
| 	}
 | |
| 	defer c.topologyQueue.Done(key)
 | |
| 	c.checkNodeTopologyDistribution(logger)
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (c *Controller) handleErr(logger klog.Logger, err error, key string) {
 | |
| 	trackSync(err)
 | |
| 
 | |
| 	if err == nil {
 | |
| 		c.serviceQueue.Forget(key)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if c.serviceQueue.NumRequeues(key) < maxRetries {
 | |
| 		logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
 | |
| 		c.serviceQueue.AddRateLimited(key)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
 | |
| 	c.serviceQueue.Forget(key)
 | |
| 	utilruntime.HandleError(err)
 | |
| }
 | |
| 
 | |
| func (c *Controller) syncService(logger klog.Logger, key string) error {
 | |
| 	startTime := time.Now()
 | |
| 	defer func() {
 | |
| 		logger.V(4).Info("Finished syncing service endpoint slices", "key", key, "elapsedTime", time.Since(startTime))
 | |
| 	}()
 | |
| 
 | |
| 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	service, err := c.serviceLister.Services(namespace).Get(name)
 | |
| 	if err != nil {
 | |
| 		if !apierrors.IsNotFound(err) {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		c.triggerTimeTracker.DeleteService(namespace, name)
 | |
| 		c.reconciler.DeleteService(namespace, name)
 | |
| 		c.endpointSliceTracker.DeleteService(namespace, name)
 | |
| 		// The service has been deleted, return nil so that it won't be retried.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if service.Spec.Type == v1.ServiceTypeExternalName {
 | |
| 		// services with Type ExternalName receive no endpoints from this controller;
 | |
| 		// Ref: https://issues.k8s.io/105986
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if service.Spec.Selector == nil {
 | |
| 		// services without a selector receive no endpoint slices from this controller;
 | |
| 		// these services will receive endpoint slices that are created out-of-band via the REST API.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	logger.V(5).Info("About to update endpoint slices for service", "key", key)
 | |
| 
 | |
| 	podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
 | |
| 	pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
 | |
| 	if err != nil {
 | |
| 		// Since we're getting stuff from a local cache, it is basically
 | |
| 		// impossible to get this error.
 | |
| 		c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
 | |
| 			"Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	esLabelSelector := labels.Set(map[string]string{
 | |
| 		discovery.LabelServiceName: service.Name,
 | |
| 		discovery.LabelManagedBy:   c.reconciler.GetControllerName(),
 | |
| 	}).AsSelectorPreValidated()
 | |
| 	endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		// Since we're getting stuff from a local cache, it is basically
 | |
| 		// impossible to get this error.
 | |
| 		c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
 | |
| 			"Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Drop EndpointSlices that have been marked for deletion to prevent the controller from getting stuck.
 | |
| 	endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)
 | |
| 
 | |
| 	if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
 | |
| 		return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
 | |
| 	}
 | |
| 
 | |
| 	// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
 | |
| 	// state of the trigger time tracker gets updated even if the sync turns out
 | |
| 	// to be no-op and we don't update the EndpointSlice objects.
 | |
| 	lastChangeTriggerTime := c.triggerTimeTracker.
 | |
| 		ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
 | |
| 
 | |
| 	err = c.reconciler.Reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
 | |
| 	if err != nil {
 | |
| 		c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
 | |
| 			"Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
 | |
| func (c *Controller) onServiceUpdate(obj interface{}) {
 | |
| 	key, err := controller.KeyFunc(obj)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	c.serviceQueue.Add(key)
 | |
| }
 | |
| 
 | |
| // onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
 | |
| func (c *Controller) onServiceDelete(obj interface{}) {
 | |
| 	key, err := controller.KeyFunc(obj)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	c.serviceQueue.Add(key)
 | |
| }
 | |
| 
 | |
| // onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
 | |
| // EndpointSlice resource version does not match the expected version in the
 | |
| // endpointSliceTracker.
 | |
| func (c *Controller) onEndpointSliceAdd(obj interface{}) {
 | |
| 	endpointSlice := obj.(*discovery.EndpointSlice)
 | |
| 	if endpointSlice == nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
 | |
| 		return
 | |
| 	}
 | |
| 	if c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
 | |
| 		c.queueServiceForEndpointSlice(endpointSlice)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
 | |
| // the EndpointSlice resource version does not match the expected version in the
 | |
| // endpointSliceTracker or the managed-by value of the EndpointSlice has changed
 | |
| // from or to this controller.
 | |
| func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
 | |
| 	prevEndpointSlice := prevObj.(*discovery.EndpointSlice)
 | |
| 	endpointSlice := obj.(*discovery.EndpointSlice)
 | |
| 	if endpointSlice == nil || prevEndpointSlice == nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
 | |
| 		return
 | |
| 	}
 | |
| 	// EndpointSlice generation does not change when labels change. Although the
 | |
| 	// controller will never change LabelServiceName, users might. This check
 | |
| 	// ensures that we handle changes to this label.
 | |
| 	svcName := endpointSlice.Labels[discovery.LabelServiceName]
 | |
| 	prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
 | |
| 	if svcName != prevSvcName {
 | |
| 		logger.Info("label changed", "label", discovery.LabelServiceName, "oldService", prevSvcName, "newService", svcName, "endpointslice", klog.KObj(endpointSlice))
 | |
| 		c.queueServiceForEndpointSlice(endpointSlice)
 | |
| 		c.queueServiceForEndpointSlice(prevEndpointSlice)
 | |
| 		return
 | |
| 	}
 | |
| 	if c.reconciler.ManagedByChanged(prevEndpointSlice, endpointSlice) || (c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
 | |
| 		c.queueServiceForEndpointSlice(endpointSlice)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
 | |
| // EndpointSlice resource version does not match the expected version in the
 | |
| // endpointSliceTracker.
 | |
| func (c *Controller) onEndpointSliceDelete(obj interface{}) {
 | |
| 	endpointSlice := getEndpointSliceFromDeleteAction(obj)
 | |
| 	if endpointSlice != nil && c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
 | |
| 		// This returns false if we didn't expect the EndpointSlice to be
 | |
| 		// deleted. If that is the case, we queue the Service for another sync.
 | |
| 		if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
 | |
| 			c.queueServiceForEndpointSlice(endpointSlice)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // queueServiceForEndpointSlice attempts to queue the corresponding Service for
 | |
| // the provided EndpointSlice.
 | |
| func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
 | |
| 	key, err := endpointslicerec.ServiceControllerKey(endpointSlice)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// queue after the max of endpointSliceChangeMinSyncDelay and
 | |
| 	// endpointUpdatesBatchPeriod.
 | |
| 	delay := endpointSliceChangeMinSyncDelay
 | |
| 	if c.endpointUpdatesBatchPeriod > delay {
 | |
| 		delay = c.endpointUpdatesBatchPeriod
 | |
| 	}
 | |
| 	c.serviceQueue.AddAfter(key, delay)
 | |
| }
 | |
| 
 | |
| func (c *Controller) addPod(obj interface{}) {
 | |
| 	pod := obj.(*v1.Pod)
 | |
| 	services, err := endpointsliceutil.GetPodServiceMemberships(c.serviceLister, pod)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
 | |
| 		return
 | |
| 	}
 | |
| 	for key := range services {
 | |
| 		c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Controller) updatePod(old, cur interface{}) {
 | |
| 	services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
 | |
| 	for key := range services {
 | |
| 		c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // When a pod is deleted, enqueue the services the pod used to be a member of
 | |
| // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
 | |
| func (c *Controller) deletePod(obj interface{}) {
 | |
| 	pod := endpointsliceutil.GetPodFromDeleteAction(obj)
 | |
| 	if pod != nil {
 | |
| 		c.addPod(pod)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Controller) addNode() {
 | |
| 	c.topologyQueue.Add(topologyQueueItemKey)
 | |
| }
 | |
| 
 | |
| func (c *Controller) updateNode(old, cur interface{}) {
 | |
| 	oldNode := old.(*v1.Node)
 | |
| 	curNode := cur.(*v1.Node)
 | |
| 
 | |
| 	// LabelTopologyZone may be added by cloud provider asynchronously after the Node is created.
 | |
| 	// The topology cache should be updated in this case.
 | |
| 	if isNodeReady(oldNode) != isNodeReady(curNode) ||
 | |
| 		oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
 | |
| 		c.topologyQueue.Add(topologyQueueItemKey)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Controller) deleteNode() {
 | |
| 	c.topologyQueue.Add(topologyQueueItemKey)
 | |
| }
 | |
| 
 | |
| // checkNodeTopologyDistribution updates Nodes in the topology cache and then
 | |
| // queues any Services that are past the threshold.
 | |
| func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
 | |
| 	if c.topologyCache == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	nodes, err := c.nodeLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		logger.Error(err, "Error listing Nodes")
 | |
| 		return
 | |
| 	}
 | |
| 	c.topologyCache.SetNodes(logger, nodes)
 | |
| 	serviceKeys := c.topologyCache.GetOverloadedServices()
 | |
| 	for _, serviceKey := range serviceKeys {
 | |
| 		logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
 | |
| 		c.serviceQueue.Add(serviceKey)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // trackSync increments the EndpointSliceSyncs metric with the result of a sync.
 | |
| func trackSync(err error) {
 | |
| 	metricLabel := "success"
 | |
| 	if err != nil {
 | |
| 		if endpointslicepkg.IsStaleInformerCacheErr(err) {
 | |
| 			metricLabel = "stale"
 | |
| 		} else {
 | |
| 			metricLabel = "error"
 | |
| 		}
 | |
| 	}
 | |
| 	endpointslicemetrics.EndpointSliceSyncs.WithLabelValues(metricLabel).Inc()
 | |
| }
 | |
| 
 | |
| func dropEndpointSlicesPendingDeletion(endpointSlices []*discovery.EndpointSlice) []*discovery.EndpointSlice {
 | |
| 	n := 0
 | |
| 	for _, endpointSlice := range endpointSlices {
 | |
| 		if endpointSlice.DeletionTimestamp == nil {
 | |
| 			endpointSlices[n] = endpointSlice
 | |
| 			n++
 | |
| 		}
 | |
| 	}
 | |
| 	return endpointSlices[:n]
 | |
| }
 | |
| 
 | |
| // getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
 | |
| func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
 | |
| 	if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
 | |
| 		// Enqueue all the services that the pod used to be a member of.
 | |
| 		// This is the same thing we do when we add a pod.
 | |
| 		return endpointSlice
 | |
| 	}
 | |
| 	// If we reached here it means the pod was deleted but its final state is unrecorded.
 | |
| 	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 	if !ok {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
 | |
| 		return nil
 | |
| 	}
 | |
| 	endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
 | |
| 	if !ok {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
 | |
| 		return nil
 | |
| 	}
 | |
| 	return endpointSlice
 | |
| }
 | |
| 
 | |
| // isNodeReady returns true if a node is ready; false otherwise.
 | |
| func isNodeReady(node *v1.Node) bool {
 | |
| 	for _, c := range node.Status.Conditions {
 | |
| 		if c.Type == v1.NodeReady {
 | |
| 			return c.Status == v1.ConditionTrue
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | 
