mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			241 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2019 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package proxy
 | 
						|
 | 
						|
import (
 | 
						|
	"sort"
 | 
						|
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	discovery "k8s.io/api/discovery/v1alpha1"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	"k8s.io/klog"
 | 
						|
	utilproxy "k8s.io/kubernetes/pkg/proxy/util"
 | 
						|
	utilnet "k8s.io/utils/net"
 | 
						|
)
 | 
						|
 | 
						|
// EndpointSliceCache is used as a cache of EndpointSlice information.
 | 
						|
type EndpointSliceCache struct {
 | 
						|
	// sliceByServiceMap is the basis of this cache. It contains endpoint slice
 | 
						|
	// info grouped by service name and endpoint slice name. The first key
 | 
						|
	// represents a namespaced service name while the second key represents
 | 
						|
	// an endpoint slice name. Since endpoints can move between slices, we
 | 
						|
	// require slice specific caching to prevent endpoints being removed from
 | 
						|
	// the cache when they may have just moved to a different slice.
 | 
						|
	sliceByServiceMap map[types.NamespacedName]map[string]*endpointSliceInfo
 | 
						|
	makeEndpointInfo  makeEndpointFunc
 | 
						|
	hostname          string
 | 
						|
	isIPv6Mode        *bool
 | 
						|
	recorder          record.EventRecorder
 | 
						|
}
 | 
						|
 | 
						|
// endpointSliceInfo contains just the attributes kube-proxy cares about.
 | 
						|
// Used for caching. Intentionally small to limit memory util.
 | 
						|
type endpointSliceInfo struct {
 | 
						|
	Ports     []discovery.EndpointPort
 | 
						|
	Endpoints []*endpointInfo
 | 
						|
}
 | 
						|
 | 
						|
// endpointInfo contains just the attributes kube-proxy cares about.
 | 
						|
// Used for caching. Intentionally small to limit memory util.
 | 
						|
// Addresses and Topology are copied from EndpointSlice Endpoints.
 | 
						|
type endpointInfo struct {
 | 
						|
	Addresses []string
 | 
						|
	Topology  map[string]string
 | 
						|
}
 | 
						|
 | 
						|
// NewEndpointSliceCache initializes an EndpointSliceCache.
 | 
						|
func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
 | 
						|
	if makeEndpointInfo == nil {
 | 
						|
		makeEndpointInfo = standardEndpointInfo
 | 
						|
	}
 | 
						|
	return &EndpointSliceCache{
 | 
						|
		sliceByServiceMap: map[types.NamespacedName]map[string]*endpointSliceInfo{},
 | 
						|
		hostname:          hostname,
 | 
						|
		isIPv6Mode:        isIPv6Mode,
 | 
						|
		makeEndpointInfo:  makeEndpointInfo,
 | 
						|
		recorder:          recorder,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// standardEndpointInfo is the default makeEndpointFunc.
 | 
						|
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
 | 
						|
	return ep
 | 
						|
}
 | 
						|
 | 
						|
// Update a slice in the cache.
 | 
						|
func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) {
 | 
						|
	serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice)
 | 
						|
	// This should never actually happen
 | 
						|
	if serviceKey.Name == "" || serviceKey.Namespace == "" || sliceKey == "" {
 | 
						|
		klog.Errorf("Invalid endpoint slice, name and owner reference required %v", endpointSlice)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	esInfo := &endpointSliceInfo{
 | 
						|
		Ports:     endpointSlice.Ports,
 | 
						|
		Endpoints: []*endpointInfo{},
 | 
						|
	}
 | 
						|
	for _, endpoint := range endpointSlice.Endpoints {
 | 
						|
		if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
 | 
						|
			esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
 | 
						|
				Addresses: endpoint.Addresses,
 | 
						|
				Topology:  endpoint.Topology,
 | 
						|
			})
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if _, exists := cache.sliceByServiceMap[serviceKey]; !exists {
 | 
						|
		cache.sliceByServiceMap[serviceKey] = map[string]*endpointSliceInfo{}
 | 
						|
	}
 | 
						|
	cache.sliceByServiceMap[serviceKey][sliceKey] = esInfo
 | 
						|
}
 | 
						|
 | 
						|
// Delete a slice from the cache.
 | 
						|
func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) {
 | 
						|
	serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice)
 | 
						|
	delete(cache.sliceByServiceMap[serviceKey], sliceKey)
 | 
						|
}
 | 
						|
 | 
						|
// EndpointsMap computes an EndpointsMap for a given service.
 | 
						|
func (cache *EndpointSliceCache) EndpointsMap(serviceNN types.NamespacedName) EndpointsMap {
 | 
						|
	endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN)
 | 
						|
	return endpointsMapFromEndpointInfo(endpointInfoBySP)
 | 
						|
}
 | 
						|
 | 
						|
// endpointInfoByServicePort groups endpoint info by service port name and address.
 | 
						|
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName) map[ServicePortName]map[string]Endpoint {
 | 
						|
	endpointInfoBySP := map[ServicePortName]map[string]Endpoint{}
 | 
						|
 | 
						|
	for _, sliceInfo := range cache.sliceByServiceMap[serviceNN] {
 | 
						|
		for _, port := range sliceInfo.Ports {
 | 
						|
			if port.Name == nil {
 | 
						|
				klog.Warningf("ignoring port with nil name %v", port)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			// TODO: handle nil ports to mean "all"
 | 
						|
			if port.Port == nil || *port.Port == int32(0) {
 | 
						|
				klog.Warningf("ignoring invalid endpoint port %s", *port.Name)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			svcPortName := ServicePortName{NamespacedName: serviceNN}
 | 
						|
			svcPortName.Port = *port.Name
 | 
						|
 | 
						|
			endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return endpointInfoBySP
 | 
						|
}
 | 
						|
 | 
						|
// addEndpointsByIP adds endpointInfo for each IP.
 | 
						|
func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName, portNum int, endpointsByIP map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
 | 
						|
	if endpointsByIP == nil {
 | 
						|
		endpointsByIP = map[string]Endpoint{}
 | 
						|
	}
 | 
						|
 | 
						|
	// iterate through endpoints to add them to endpointsByIP.
 | 
						|
	for _, endpoint := range endpoints {
 | 
						|
		if len(endpoint.Addresses) == 0 {
 | 
						|
			klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// Filter out the incorrect IP version case. Any endpoint port that
 | 
						|
		// contains incorrect IP version will be ignored.
 | 
						|
		if cache.isIPv6Mode != nil && utilnet.IsIPv6String(endpoint.Addresses[0]) != *cache.isIPv6Mode {
 | 
						|
			// Emit event on the corresponding service which had a different IP
 | 
						|
			// version than the endpoint.
 | 
						|
			utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], serviceNN.Name, serviceNN.Namespace, "")
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname])
 | 
						|
		endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal)
 | 
						|
 | 
						|
		// This logic ensures we're deduping potential overlapping endpoints
 | 
						|
		// isLocal should not vary between matching IPs, but if it does, we
 | 
						|
		// favor a true value here if it exists.
 | 
						|
		if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal {
 | 
						|
			endpointsByIP[endpointInfo.IP()] = cache.makeEndpointInfo(endpointInfo)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return endpointsByIP
 | 
						|
}
 | 
						|
 | 
						|
func (cache *EndpointSliceCache) isLocal(hostname string) bool {
 | 
						|
	return len(cache.hostname) > 0 && hostname == cache.hostname
 | 
						|
}
 | 
						|
 | 
						|
// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
 | 
						|
// has been grouped by service port and IP.
 | 
						|
func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
 | 
						|
	endpointsMap := EndpointsMap{}
 | 
						|
 | 
						|
	// transform endpointInfoByServicePort into an endpointsMap with sorted IPs.
 | 
						|
	for svcPortName, endpointInfoByIP := range endpointInfoBySP {
 | 
						|
		if len(endpointInfoByIP) > 0 {
 | 
						|
			endpointsMap[svcPortName] = []Endpoint{}
 | 
						|
			for _, endpointInfo := range endpointInfoByIP {
 | 
						|
				endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo)
 | 
						|
 | 
						|
			}
 | 
						|
			// Ensure IPs are always returned in the same order to simplify diffing.
 | 
						|
			sort.Sort(byIP(endpointsMap[svcPortName]))
 | 
						|
 | 
						|
			klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName]))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return endpointsMap
 | 
						|
}
 | 
						|
 | 
						|
// formatEndpointsList returns a string list converted from an endpoints list.
 | 
						|
func formatEndpointsList(endpoints []Endpoint) []string {
 | 
						|
	var formattedList []string
 | 
						|
	for _, ep := range endpoints {
 | 
						|
		formattedList = append(formattedList, ep.String())
 | 
						|
	}
 | 
						|
	return formattedList
 | 
						|
}
 | 
						|
 | 
						|
// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice.
 | 
						|
func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string) {
 | 
						|
	if len(endpointSlice.OwnerReferences) == 0 {
 | 
						|
		klog.Errorf("No owner reference set on endpoint slice: %s", endpointSlice.Name)
 | 
						|
		return types.NamespacedName{}, endpointSlice.Name
 | 
						|
	}
 | 
						|
	if len(endpointSlice.OwnerReferences) > 1 {
 | 
						|
		klog.Errorf("More than 1 owner reference set on endpoint slice: %s", endpointSlice.Name)
 | 
						|
	}
 | 
						|
	ownerRef := endpointSlice.OwnerReferences[0]
 | 
						|
	return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: ownerRef.Name}, endpointSlice.Name
 | 
						|
}
 | 
						|
 | 
						|
// byIP helps sort endpoints by IP
 | 
						|
type byIP []Endpoint
 | 
						|
 | 
						|
func (e byIP) Len() int {
 | 
						|
	return len(e)
 | 
						|
}
 | 
						|
func (e byIP) Swap(i, j int) {
 | 
						|
	e[i], e[j] = e[j], e[i]
 | 
						|
}
 | 
						|
func (e byIP) Less(i, j int) bool {
 | 
						|
	return e[i].IP() < e[j].IP()
 | 
						|
}
 |