mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-10-31 18:28:13 +00:00
ServiceTrafficDistribution feature-gate is GA'd and enabled by default since 1.33. Since it is also locked-to-default, we can remove flag-usages in kube-proxy. NOTE that as per https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/feature-gates.md#disablement-tests: _"Disablement tests are only required to be preserved for components and libraries that support compatibility version. Tests for node and kubelet are unaffected by compatibility version."_
194 lines
6.7 KiB
Go
194 lines
6.7 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package proxy
|
|
|
|
import (
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
// CategorizeEndpoints returns:
|
|
//
|
|
// - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if
|
|
// relevant). This will be nil if the service does not ever use Cluster traffic policy.
|
|
//
|
|
// - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if
|
|
// relevant). This will be nil if the service does not ever use Local traffic policy.
|
|
//
|
|
// - The combined list of all endpoints reachable from this node (which is the union of the
|
|
// previous two lists, but in the case where it is identical to one or the other, we avoid
|
|
// allocating a separate list).
|
|
//
|
|
// - An indication of whether the service has any endpoints reachable from anywhere in the
|
|
// cluster. (This may be true even if allReachableEndpoints is empty.)
|
|
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
|
|
var useTopology, useServingTerminatingEndpoints bool
|
|
|
|
if svcInfo.UsesClusterEndpoints() {
|
|
useTopology = canUseTopology(endpoints, nodeLabels)
|
|
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
if !ep.IsReady() {
|
|
return false
|
|
}
|
|
if useTopology && !availableForTopology(ep, nodeLabels) {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
// if there are 0 cluster-wide endpoints, we can try to fallback to any terminating endpoints that are ready.
|
|
// When falling back to terminating endpoints, we do NOT consider topology aware routing since this is a best
|
|
// effort attempt to avoid dropping connections.
|
|
if len(clusterEndpoints) == 0 {
|
|
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
if ep.IsServing() && ep.IsTerminating() {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
})
|
|
}
|
|
|
|
// If there are any Ready endpoints anywhere in the cluster, we are
|
|
// guaranteed to get one in clusterEndpoints.
|
|
if len(clusterEndpoints) > 0 {
|
|
hasAnyEndpoints = true
|
|
}
|
|
}
|
|
|
|
if !svcInfo.UsesLocalEndpoints() {
|
|
allReachableEndpoints = clusterEndpoints
|
|
return
|
|
}
|
|
|
|
// Pre-scan the endpoints, to figure out which type of endpoint Local
|
|
// traffic policy will use, and also to see if there are any usable
|
|
// endpoints anywhere in the cluster.
|
|
var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
|
|
for _, ep := range endpoints {
|
|
if ep.IsReady() {
|
|
hasAnyEndpoints = true
|
|
if ep.IsLocal() {
|
|
hasLocalReadyEndpoints = true
|
|
}
|
|
} else if ep.IsServing() && ep.IsTerminating() {
|
|
hasAnyEndpoints = true
|
|
if ep.IsLocal() {
|
|
hasLocalServingTerminatingEndpoints = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if hasLocalReadyEndpoints {
|
|
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
return ep.IsLocal() && ep.IsReady()
|
|
})
|
|
} else if hasLocalServingTerminatingEndpoints {
|
|
useServingTerminatingEndpoints = true
|
|
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
return ep.IsLocal() && ep.IsServing() && ep.IsTerminating()
|
|
})
|
|
}
|
|
|
|
if !svcInfo.UsesClusterEndpoints() {
|
|
allReachableEndpoints = localEndpoints
|
|
return
|
|
}
|
|
|
|
if !useTopology && !useServingTerminatingEndpoints {
|
|
// !useServingTerminatingEndpoints means that localEndpoints contains only
|
|
// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
|
|
// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
|
|
allReachableEndpoints = clusterEndpoints
|
|
return
|
|
}
|
|
|
|
// clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while
|
|
// localEndpoints may contain terminating or topologically-unavailable local endpoints
|
|
// that aren't in clusterEndpoints. So we have to merge the two lists.
|
|
endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
|
|
for _, ep := range clusterEndpoints {
|
|
endpointsMap[ep.String()] = ep
|
|
}
|
|
for _, ep := range localEndpoints {
|
|
endpointsMap[ep.String()] = ep
|
|
}
|
|
allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
|
|
for _, ep := range endpointsMap {
|
|
allReachableEndpoints = append(allReachableEndpoints, ep)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// canUseTopology returns true if all of the following is true:
|
|
// - The node's labels include "topology.kubernetes.io/zone".
|
|
// - All of the endpoints for this Service have a topology hint.
|
|
// - At least one endpoint for this Service is hinted for this node's zone.
|
|
func canUseTopology(endpoints []Endpoint, nodeLabels map[string]string) bool {
|
|
zone, foundZone := nodeLabels[v1.LabelTopologyZone]
|
|
hasEndpointForZone := false
|
|
for _, endpoint := range endpoints {
|
|
if !endpoint.IsReady() {
|
|
continue
|
|
}
|
|
|
|
// If any of the endpoints do not have zone hints, we bail out
|
|
if endpoint.ZoneHints().Len() == 0 {
|
|
klog.V(7).InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint", "endpoint", endpoint)
|
|
return false
|
|
}
|
|
|
|
// If we've made it this far, we have endpoints with hints set. Now we check if there is a
|
|
// zone label, if there isn't one we log a warning and bail out
|
|
if !foundZone || zone == "" {
|
|
klog.V(2).InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
|
|
return false
|
|
}
|
|
|
|
if endpoint.ZoneHints().Has(zone) {
|
|
hasEndpointForZone = true
|
|
}
|
|
}
|
|
|
|
if !hasEndpointForZone {
|
|
klog.V(7).InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// availableForTopology checks if this endpoint is available for use on this node, given
|
|
// topology constraints. (It assumes that canUseTopology() returned true.)
|
|
func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
|
|
zone := nodeLabels[v1.LabelTopologyZone]
|
|
return endpoint.ZoneHints().Has(zone)
|
|
}
|
|
|
|
// filterEndpoints filters endpoints according to predicate
|
|
func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
|
|
filteredEndpoints := make([]Endpoint, 0, len(endpoints))
|
|
|
|
for _, ep := range endpoints {
|
|
if predicate(ep) {
|
|
filteredEndpoints = append(filteredEndpoints, ep)
|
|
}
|
|
}
|
|
|
|
return filteredEndpoints
|
|
}
|