mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			204 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			204 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2019 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package proxy
 | 
						|
 | 
						|
import (
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	"k8s.io/kubernetes/pkg/features"
 | 
						|
)
 | 
						|
 | 
						|
// CategorizeEndpoints returns:
 | 
						|
//
 | 
						|
//   - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if
 | 
						|
//     relevant). This will be nil if the service does not ever use Cluster traffic policy.
 | 
						|
//
 | 
						|
//   - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if
 | 
						|
//     relevant). This will be nil if the service does not ever use Local traffic policy.
 | 
						|
//
 | 
						|
//   - The combined list of all endpoints reachable from this node (which is the union of the
 | 
						|
//     previous two lists, but in the case where it is identical to one or the other, we avoid
 | 
						|
//     allocating a separate list).
 | 
						|
//
 | 
						|
//   - An indication of whether the service has any endpoints reachable from anywhere in the
 | 
						|
//     cluster. (This may be true even if allReachableEndpoints is empty.)
 | 
						|
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
 | 
						|
	var useTopology, useServingTerminatingEndpoints bool
 | 
						|
 | 
						|
	if svcInfo.UsesClusterEndpoints() {
 | 
						|
		useTopology = canUseTopology(endpoints, svcInfo, nodeLabels)
 | 
						|
		clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
 | 
						|
			if !ep.IsReady() {
 | 
						|
				return false
 | 
						|
			}
 | 
						|
			if useTopology && !availableForTopology(ep, nodeLabels) {
 | 
						|
				return false
 | 
						|
			}
 | 
						|
			return true
 | 
						|
		})
 | 
						|
 | 
						|
		// if there are 0 cluster-wide endpoints, we can try to fallback to any terminating endpoints that are ready.
 | 
						|
		// When falling back to terminating endpoints, we do NOT consider topology aware routing since this is a best
 | 
						|
		// effort attempt to avoid dropping connections.
 | 
						|
		if len(clusterEndpoints) == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
 | 
						|
			clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
 | 
						|
				if ep.IsServing() && ep.IsTerminating() {
 | 
						|
					return true
 | 
						|
				}
 | 
						|
 | 
						|
				return false
 | 
						|
			})
 | 
						|
		}
 | 
						|
 | 
						|
		// If there are any Ready endpoints anywhere in the cluster, we are
 | 
						|
		// guaranteed to get one in clusterEndpoints.
 | 
						|
		if len(clusterEndpoints) > 0 {
 | 
						|
			hasAnyEndpoints = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !svcInfo.UsesLocalEndpoints() {
 | 
						|
		allReachableEndpoints = clusterEndpoints
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Pre-scan the endpoints, to figure out which type of endpoint Local
 | 
						|
	// traffic policy will use, and also to see if there are any usable
 | 
						|
	// endpoints anywhere in the cluster.
 | 
						|
	var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
 | 
						|
	for _, ep := range endpoints {
 | 
						|
		if ep.IsReady() {
 | 
						|
			hasAnyEndpoints = true
 | 
						|
			if ep.GetIsLocal() {
 | 
						|
				hasLocalReadyEndpoints = true
 | 
						|
			}
 | 
						|
		} else if ep.IsServing() && ep.IsTerminating() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
 | 
						|
			hasAnyEndpoints = true
 | 
						|
			if ep.GetIsLocal() {
 | 
						|
				hasLocalServingTerminatingEndpoints = true
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if hasLocalReadyEndpoints {
 | 
						|
		localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
 | 
						|
			return ep.GetIsLocal() && ep.IsReady()
 | 
						|
		})
 | 
						|
	} else if hasLocalServingTerminatingEndpoints {
 | 
						|
		useServingTerminatingEndpoints = true
 | 
						|
		localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
 | 
						|
			return ep.GetIsLocal() && ep.IsServing() && ep.IsTerminating()
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	if !svcInfo.UsesClusterEndpoints() {
 | 
						|
		allReachableEndpoints = localEndpoints
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if !useTopology && !useServingTerminatingEndpoints {
 | 
						|
		// !useServingTerminatingEndpoints means that localEndpoints contains only
 | 
						|
		// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
 | 
						|
		// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
 | 
						|
		allReachableEndpoints = clusterEndpoints
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while
 | 
						|
	// localEndpoints may contain terminating or topologically-unavailable local endpoints
 | 
						|
	// that aren't in clusterEndpoints. So we have to merge the two lists.
 | 
						|
	endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
 | 
						|
	for _, ep := range clusterEndpoints {
 | 
						|
		endpointsMap[ep.String()] = ep
 | 
						|
	}
 | 
						|
	for _, ep := range localEndpoints {
 | 
						|
		endpointsMap[ep.String()] = ep
 | 
						|
	}
 | 
						|
	allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
 | 
						|
	for _, ep := range endpointsMap {
 | 
						|
		allReachableEndpoints = append(allReachableEndpoints, ep)
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// canUseTopology returns true if topology aware routing is enabled and properly configured
 | 
						|
// in this cluster. That is, it checks that:
 | 
						|
// * The TopologyAwareHints feature is enabled
 | 
						|
// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto"
 | 
						|
// * The node's labels include "topology.kubernetes.io/zone"
 | 
						|
// * All of the endpoints for this Service have a topology hint
 | 
						|
// * At least one endpoint for this Service is hinted for this node's zone.
 | 
						|
func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
 | 
						|
	hintsAnnotation := svcInfo.HintsAnnotation()
 | 
						|
	if hintsAnnotation != "Auto" && hintsAnnotation != "auto" {
 | 
						|
		if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" {
 | 
						|
			klog.InfoS("Skipping topology aware endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation)
 | 
						|
		}
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	zone, ok := nodeLabels[v1.LabelTopologyZone]
 | 
						|
	if !ok || zone == "" {
 | 
						|
		klog.InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	hasEndpointForZone := false
 | 
						|
	for _, endpoint := range endpoints {
 | 
						|
		if !endpoint.IsReady() {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if endpoint.GetZoneHints().Len() == 0 {
 | 
						|
			klog.InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint")
 | 
						|
			return false
 | 
						|
		}
 | 
						|
 | 
						|
		if endpoint.GetZoneHints().Has(zone) {
 | 
						|
			hasEndpointForZone = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !hasEndpointForZone {
 | 
						|
		klog.InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// availableForTopology checks if this endpoint is available for use on this node, given
 | 
						|
// topology constraints. (It assumes that canUseTopology() returned true.)
 | 
						|
func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
 | 
						|
	zone := nodeLabels[v1.LabelTopologyZone]
 | 
						|
	return endpoint.GetZoneHints().Has(zone)
 | 
						|
}
 | 
						|
 | 
						|
// filterEndpoints filters endpoints according to predicate
 | 
						|
func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
 | 
						|
	filteredEndpoints := make([]Endpoint, 0, len(endpoints))
 | 
						|
 | 
						|
	for _, ep := range endpoints {
 | 
						|
		if predicate(ep) {
 | 
						|
			filteredEndpoints = append(filteredEndpoints, ep)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return filteredEndpoints
 | 
						|
}
 |