mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1641 lines
		
	
	
		
			59 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1641 lines
		
	
	
		
			59 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| //go:build windows
 | |
| // +build windows
 | |
| 
 | |
| /*
 | |
| Copyright 2017 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package winkernel
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"os"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/Microsoft/hcsshim"
 | |
| 	"github.com/Microsoft/hcsshim/hcn"
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	discovery "k8s.io/api/discovery/v1"
 | |
| 	"k8s.io/apimachinery/pkg/util/intstr"
 | |
| 	apiutil "k8s.io/apimachinery/pkg/util/net"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	"k8s.io/client-go/tools/events"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	kubefeatures "k8s.io/kubernetes/pkg/features"
 | |
| 	"k8s.io/kubernetes/pkg/proxy"
 | |
| 	"k8s.io/kubernetes/pkg/proxy/apis/config"
 | |
| 	proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
 | |
| 	"k8s.io/kubernetes/pkg/proxy/healthcheck"
 | |
| 	"k8s.io/kubernetes/pkg/proxy/metaproxier"
 | |
| 	"k8s.io/kubernetes/pkg/proxy/metrics"
 | |
| 	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
 | |
| 	"k8s.io/kubernetes/pkg/util/async"
 | |
| 	netutils "k8s.io/utils/net"
 | |
| )
 | |
| 
 | |
| // KernelCompatTester tests whether the required kernel capabilities are
 | |
| // present to run the windows kernel proxier.
 | |
| type KernelCompatTester interface {
 | |
| 	IsCompatible() error
 | |
| }
 | |
| 
 | |
| // CanUseWinKernelProxier returns true if we should use the Kernel Proxier
 | |
| // instead of the "classic" userspace Proxier.  This is determined by checking
 | |
| // the windows kernel version and for the existence of kernel features.
 | |
| func CanUseWinKernelProxier(kcompat KernelCompatTester) (bool, error) {
 | |
| 	// Check that the kernel supports what we need.
 | |
| 	if err := kcompat.IsCompatible(); err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	return true, nil
 | |
| }
 | |
| 
 | |
| type WindowsKernelCompatTester struct{}
 | |
| 
 | |
| // IsCompatible returns true if winkernel can support this mode of proxy
 | |
| func (lkct WindowsKernelCompatTester) IsCompatible() error {
 | |
| 	_, err := hcsshim.HNSListPolicyListRequest()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("Windows kernel is not compatible for Kernel mode")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type externalIPInfo struct {
 | |
| 	ip    string
 | |
| 	hnsID string
 | |
| }
 | |
| 
 | |
| type loadBalancerIngressInfo struct {
 | |
| 	ip               string
 | |
| 	hnsID            string
 | |
| 	healthCheckHnsID string
 | |
| }
 | |
| 
 | |
| type loadBalancerInfo struct {
 | |
| 	hnsID string
 | |
| }
 | |
| 
 | |
| type loadBalancerIdentifier struct {
 | |
| 	protocol      uint16
 | |
| 	internalPort  uint16
 | |
| 	externalPort  uint16
 | |
| 	vip           string
 | |
| 	endpointsHash [20]byte
 | |
| }
 | |
| 
 | |
| type loadBalancerFlags struct {
 | |
| 	isILB           bool
 | |
| 	isDSR           bool
 | |
| 	isVipExternalIP bool
 | |
| 	localRoutedVIP  bool
 | |
| 	useMUX          bool
 | |
| 	preserveDIP     bool
 | |
| 	sessionAffinity bool
 | |
| 	isIPv6          bool
 | |
| }
 | |
| 
 | |
| // internal struct for string service information
 | |
| type serviceInfo struct {
 | |
| 	*proxy.BaseServicePortInfo
 | |
| 	targetPort             int
 | |
| 	externalIPs            []*externalIPInfo
 | |
| 	loadBalancerIngressIPs []*loadBalancerIngressInfo
 | |
| 	hnsID                  string
 | |
| 	nodePorthnsID          string
 | |
| 	policyApplied          bool
 | |
| 	remoteEndpoint         *endpointInfo
 | |
| 	hns                    HostNetworkService
 | |
| 	preserveDIP            bool
 | |
| 	localTrafficDSR        bool
 | |
| 	internalTrafficLocal   bool
 | |
| 	winProxyOptimization   bool
 | |
| }
 | |
| 
 | |
| type hnsNetworkInfo struct {
 | |
| 	name          string
 | |
| 	id            string
 | |
| 	networkType   string
 | |
| 	remoteSubnets []*remoteSubnetInfo
 | |
| }
 | |
| 
 | |
| type remoteSubnetInfo struct {
 | |
| 	destinationPrefix string
 | |
| 	isolationID       uint16
 | |
| 	providerAddress   string
 | |
| 	drMacAddress      string
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	NETWORK_TYPE_OVERLAY = "overlay"
 | |
| 	// MAX_COUNT_STALE_LOADBALANCERS is the maximum number of stale loadbalancers which cleanedup in single syncproxyrules.
 | |
| 	// If there are more stale loadbalancers to clean, it will go to next iteration of syncproxyrules.
 | |
| 	MAX_COUNT_STALE_LOADBALANCERS = 20
 | |
| )
 | |
| 
 | |
| func newHostNetworkService(hcnImpl HcnService) (HostNetworkService, hcn.SupportedFeatures) {
 | |
| 	var h HostNetworkService
 | |
| 	supportedFeatures := hcnImpl.GetSupportedFeatures()
 | |
| 	if supportedFeatures.Api.V2 {
 | |
| 		h = hns{
 | |
| 			hcn: hcnImpl,
 | |
| 		}
 | |
| 	} else {
 | |
| 		panic("Windows HNS Api V2 required. This version of windows does not support API V2")
 | |
| 	}
 | |
| 	return h, supportedFeatures
 | |
| }
 | |
| 
 | |
| // logFormattedEndpoints will log all endpoints and its states which are taking part in endpointmap change.
 | |
| // This mostly for debugging purpose and verbosity is set to 5.
 | |
| func logFormattedEndpoints(logMsg string, logLevel klog.Level, svcPortName proxy.ServicePortName, eps []proxy.Endpoint) {
 | |
| 	if klog.V(logLevel).Enabled() {
 | |
| 		var epInfo string
 | |
| 		for _, v := range eps {
 | |
| 			epInfo = epInfo + fmt.Sprintf("\n  %s={Ready:%v,Serving:%v,Terminating:%v,IsRemote:%v}", v.String(), v.IsReady(), v.IsServing(), v.IsTerminating(), !v.IsLocal())
 | |
| 		}
 | |
| 		klog.V(logLevel).InfoS(logMsg, "svcPortName", svcPortName, "endpoints", epInfo)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // This will cleanup stale load balancers which are pending delete
 | |
| // in last iteration. This function will act like a self healing of stale
 | |
| // loadbalancer entries.
 | |
| func (proxier *Proxier) cleanupStaleLoadbalancers() {
 | |
| 	i := 0
 | |
| 	countStaleLB := len(proxier.mapStaleLoadbalancers)
 | |
| 	if countStaleLB == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(3).InfoS("Cleanup of stale loadbalancers triggered", "LB Count", countStaleLB)
 | |
| 	for lbID := range proxier.mapStaleLoadbalancers {
 | |
| 		i++
 | |
| 		if err := proxier.hns.deleteLoadBalancer(lbID); err == nil {
 | |
| 			delete(proxier.mapStaleLoadbalancers, lbID)
 | |
| 		}
 | |
| 		if i == MAX_COUNT_STALE_LOADBALANCERS {
 | |
| 			// The remaining stale loadbalancers will be cleaned up in next iteration
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	countStaleLB = len(proxier.mapStaleLoadbalancers)
 | |
| 	if countStaleLB > 0 {
 | |
| 		klog.V(3).InfoS("Stale loadbalancers still remaining", "LB Count", countStaleLB, "stale_lb_ids", proxier.mapStaleLoadbalancers)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func getNetworkName(hnsNetworkName string) (string, error) {
 | |
| 	if len(hnsNetworkName) == 0 {
 | |
| 		klog.V(3).InfoS("Flag --network-name not set, checking environment variable")
 | |
| 		hnsNetworkName = os.Getenv("KUBE_NETWORK")
 | |
| 		if len(hnsNetworkName) == 0 {
 | |
| 			return "", fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
 | |
| 		}
 | |
| 	}
 | |
| 	return hnsNetworkName, nil
 | |
| }
 | |
| 
 | |
| func getNetworkInfo(hns HostNetworkService, hnsNetworkName string) (*hnsNetworkInfo, error) {
 | |
| 	hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
 | |
| 	for err != nil {
 | |
| 		klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
 | |
| 		time.Sleep(1 * time.Second)
 | |
| 		hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
 | |
| 	}
 | |
| 	return hnsNetworkInfo, err
 | |
| }
 | |
| 
 | |
| func isOverlay(hnsNetworkInfo *hnsNetworkInfo) bool {
 | |
| 	return strings.EqualFold(hnsNetworkInfo.networkType, NETWORK_TYPE_OVERLAY)
 | |
| }
 | |
| 
 | |
| // StackCompatTester tests whether the required kernel and network are dualstack capable
 | |
| type StackCompatTester interface {
 | |
| 	DualStackCompatible(networkName string) bool
 | |
| }
 | |
| 
 | |
| type DualStackCompatTester struct{}
 | |
| 
 | |
| func (t DualStackCompatTester) DualStackCompatible(networkName string) bool {
 | |
| 	hcnImpl := newHcnImpl()
 | |
| 	// First tag of hcsshim that has a proper check for dual stack support is v0.8.22 due to a bug.
 | |
| 	if err := hcnImpl.Ipv6DualStackSupported(); err != nil {
 | |
| 		// Hcn *can* fail the query to grab the version of hcn itself (which this call will do internally before parsing
 | |
| 		// to see if dual stack is supported), but the only time this can happen, at least that can be discerned, is if the host
 | |
| 		// is pre-1803 and hcn didn't exist. hcsshim should truthfully return a known error if this happened that we can
 | |
| 		// check against, and the case where 'err != this known error' would be the 'this feature isn't supported' case, as is being
 | |
| 		// used here. For now, seeming as how nothing before ws2019 (1809) is listed as supported for k8s we can pretty much assume
 | |
| 		// any error here isn't because the query failed, it's just that dualstack simply isn't supported on the host. With all
 | |
| 		// that in mind, just log as info and not error to let the user know we're falling back.
 | |
| 		klog.InfoS("This version of Windows does not support dual-stack, falling back to single-stack", "err", err.Error())
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// check if network is using overlay
 | |
| 	hns, _ := newHostNetworkService(hcnImpl)
 | |
| 	networkName, err := getNetworkName(networkName)
 | |
| 	if err != nil {
 | |
| 		klog.ErrorS(err, "Unable to determine dual-stack status, falling back to single-stack")
 | |
| 		return false
 | |
| 	}
 | |
| 	networkInfo, err := getNetworkInfo(hns, networkName)
 | |
| 	if err != nil {
 | |
| 		klog.ErrorS(err, "Unable to determine dual-stack status, falling back to single-stack")
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) && isOverlay(networkInfo) {
 | |
| 		// Overlay (VXLAN) networks on Windows do not support dual-stack networking today
 | |
| 		klog.InfoS("Winoverlay does not support dual-stack, falling back to single-stack")
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // internal struct for endpoints information
 | |
| type endpointInfo struct {
 | |
| 	ip              string
 | |
| 	port            uint16
 | |
| 	isLocal         bool
 | |
| 	macAddress      string
 | |
| 	hnsID           string
 | |
| 	refCount        *uint16
 | |
| 	providerAddress string
 | |
| 	hns             HostNetworkService
 | |
| 
 | |
| 	// conditions
 | |
| 	ready       bool
 | |
| 	serving     bool
 | |
| 	terminating bool
 | |
| }
 | |
| 
 | |
| // String is part of proxy.Endpoint interface.
 | |
| func (info *endpointInfo) String() string {
 | |
| 	return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port)))
 | |
| }
 | |
| 
 | |
| // IsLocal is part of proxy.Endpoint interface.
 | |
| func (info *endpointInfo) IsLocal() bool {
 | |
| 	return info.isLocal
 | |
| }
 | |
| 
 | |
| // IsReady returns true if an endpoint is ready and not terminating.
 | |
| func (info *endpointInfo) IsReady() bool {
 | |
| 	return info.ready
 | |
| }
 | |
| 
 | |
| // IsServing returns true if an endpoint is ready, regardless of it's terminating state.
 | |
| func (info *endpointInfo) IsServing() bool {
 | |
| 	return info.serving
 | |
| }
 | |
| 
 | |
| // IsTerminating returns true if an endpoint is terminating.
 | |
| func (info *endpointInfo) IsTerminating() bool {
 | |
| 	return info.terminating
 | |
| }
 | |
| 
 | |
| // ZoneHints returns the zone hints for the endpoint.
 | |
| func (info *endpointInfo) ZoneHints() sets.Set[string] {
 | |
| 	return sets.Set[string]{}
 | |
| }
 | |
| 
 | |
| // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
 | |
| func (info *endpointInfo) IP() string {
 | |
| 	return info.ip
 | |
| }
 | |
| 
 | |
| // Port returns just the Port part of the endpoint.
 | |
| func (info *endpointInfo) Port() int {
 | |
| 	return int(info.port)
 | |
| }
 | |
| 
 | |
| // Uses mac prefix and IPv4 address to return a mac address
 | |
| // This ensures mac addresses are unique for proper load balancing
 | |
| // There is a possibility of MAC collisions but this Mac address is used for remote endpoints only
 | |
| // and not sent on the wire.
 | |
| func conjureMac(macPrefix string, ip net.IP) string {
 | |
| 	if ip4 := ip.To4(); ip4 != nil {
 | |
| 		a, b, c, d := ip4[0], ip4[1], ip4[2], ip4[3]
 | |
| 		return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
 | |
| 	} else if ip6 := ip.To16(); ip6 != nil {
 | |
| 		a, b, c, d := ip6[15], ip6[14], ip6[13], ip6[12]
 | |
| 		return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
 | |
| 	}
 | |
| 	return "02-11-22-33-44-55"
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) {
 | |
| 	// This will optimize remote endpoint and loadbalancer deletion based on the annotation
 | |
| 	var svcPortMap = make(map[proxy.ServicePortName]bool)
 | |
| 	var logLevel klog.Level = 5
 | |
| 	for svcPortName, eps := range oldEndpointsMap {
 | |
| 		logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps)
 | |
| 		svcPortMap[svcPortName] = true
 | |
| 		proxier.onEndpointsMapChange(&svcPortName, false)
 | |
| 	}
 | |
| 
 | |
| 	for svcPortName, eps := range newEndpointsMap {
 | |
| 		logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps)
 | |
| 		// redundantCleanup true means cleanup is called second time on the same svcPort
 | |
| 		redundantCleanup := svcPortMap[svcPortName]
 | |
| 		proxier.onEndpointsMapChange(&svcPortName, redundantCleanup)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, redundantCleanup bool) {
 | |
| 
 | |
| 	svc, exists := proxier.svcPortMap[*svcPortName]
 | |
| 
 | |
| 	if exists {
 | |
| 		svcInfo, ok := svc.(*serviceInfo)
 | |
| 
 | |
| 		if !ok {
 | |
| 			klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		if svcInfo.winProxyOptimization && redundantCleanup {
 | |
| 			// This is a second cleanup call.
 | |
| 			// Second cleanup on the same svcPort will be ignored if the
 | |
| 			// winProxyOptimization is Enabled
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		klog.V(3).InfoS("Endpoints are modified. Service is stale", "servicePortName", svcPortName)
 | |
| 		svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, true)
 | |
| 	} else {
 | |
| 		// If no service exists, just cleanup the remote endpoints
 | |
| 		klog.V(3).InfoS("Endpoints are orphaned, cleaning up")
 | |
| 		// Cleanup Endpoints references
 | |
| 		epInfos, exists := proxier.endpointsMap[*svcPortName]
 | |
| 
 | |
| 		if exists {
 | |
| 			// Cleanup Endpoints references
 | |
| 			for _, ep := range epInfos {
 | |
| 				epInfo, ok := ep.(*endpointInfo)
 | |
| 
 | |
| 				if ok {
 | |
| 					epInfo.Cleanup()
 | |
| 				}
 | |
| 
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) serviceMapChange(previous, current proxy.ServicePortMap) {
 | |
| 	for svcPortName := range current {
 | |
| 		proxier.onServiceMapChange(&svcPortName)
 | |
| 	}
 | |
| 
 | |
| 	for svcPortName := range previous {
 | |
| 		if _, ok := current[svcPortName]; ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		proxier.onServiceMapChange(&svcPortName)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
 | |
| 
 | |
| 	svc, exists := proxier.svcPortMap[*svcPortName]
 | |
| 
 | |
| 	if exists {
 | |
| 		svcInfo, ok := svc.(*serviceInfo)
 | |
| 
 | |
| 		if !ok {
 | |
| 			klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		klog.V(3).InfoS("Updating existing service port", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol())
 | |
| 		svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, false)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // returns a new proxy.Endpoint which abstracts a endpointInfo
 | |
| func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
 | |
| 
 | |
| 	info := &endpointInfo{
 | |
| 		ip:         baseInfo.IP(),
 | |
| 		port:       uint16(baseInfo.Port()),
 | |
| 		isLocal:    baseInfo.IsLocal(),
 | |
| 		macAddress: conjureMac("02-11", netutils.ParseIPSloppy(baseInfo.IP())),
 | |
| 		refCount:   new(uint16),
 | |
| 		hnsID:      "",
 | |
| 		hns:        proxier.hns,
 | |
| 
 | |
| 		ready:       baseInfo.IsReady(),
 | |
| 		serving:     baseInfo.IsServing(),
 | |
| 		terminating: baseInfo.IsTerminating(),
 | |
| 	}
 | |
| 
 | |
| 	return info
 | |
| }
 | |
| 
 | |
| func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) {
 | |
| 	hnsEndpoint := &endpointInfo{
 | |
| 		ip:              ip,
 | |
| 		isLocal:         true,
 | |
| 		macAddress:      mac,
 | |
| 		providerAddress: providerAddress,
 | |
| 
 | |
| 		ready:       true,
 | |
| 		serving:     true,
 | |
| 		terminating: false,
 | |
| 	}
 | |
| 	ep, err := hns.createEndpoint(hnsEndpoint, network)
 | |
| 	return ep, err
 | |
| }
 | |
| 
 | |
| func (ep *endpointInfo) DecrementRefCount() {
 | |
| 	klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep)
 | |
| 	if !ep.IsLocal() && ep.refCount != nil && *ep.refCount > 0 {
 | |
| 		*ep.refCount--
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ep *endpointInfo) Cleanup() {
 | |
| 	klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep)
 | |
| 	if !ep.IsLocal() && ep.refCount != nil {
 | |
| 		*ep.refCount--
 | |
| 
 | |
| 		// Remove the remote hns endpoint, if no service is referring it
 | |
| 		// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
 | |
| 		// Remove only remote endpoints created by this service
 | |
| 		if *ep.refCount <= 0 && !ep.IsLocal() {
 | |
| 			klog.V(4).InfoS("Removing endpoints, since no one is referencing it", "endpoint", ep)
 | |
| 			err := ep.hns.deleteEndpoint(ep.hnsID)
 | |
| 			if err == nil {
 | |
| 				ep.hnsID = ""
 | |
| 			} else {
 | |
| 				klog.ErrorS(err, "Endpoint deletion failed", "ip", ep.IP())
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		ep.refCount = nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 {
 | |
| 	refCount, exists := refCountMap[hnsID]
 | |
| 	if !exists {
 | |
| 		refCountMap[hnsID] = new(uint16)
 | |
| 		refCount = refCountMap[hnsID]
 | |
| 	}
 | |
| 	return refCount
 | |
| }
 | |
| 
 | |
| // returns a new proxy.ServicePort which abstracts a serviceInfo
 | |
| func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
 | |
| 	info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo}
 | |
| 	preserveDIP := service.Annotations["preserve-destination"] == "true"
 | |
| 	// Annotation introduced to enable optimized loadbalancing
 | |
| 	winProxyOptimization := !(strings.ToUpper(service.Annotations["winProxyOptimization"]) == "DISABLED")
 | |
| 	localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal
 | |
| 	var internalTrafficLocal bool
 | |
| 	if service.Spec.InternalTrafficPolicy != nil {
 | |
| 		internalTrafficLocal = *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal
 | |
| 	}
 | |
| 	hcnImpl := proxier.hcn
 | |
| 	err := hcnImpl.DsrSupported()
 | |
| 	if err != nil {
 | |
| 		preserveDIP = false
 | |
| 		localTrafficDSR = false
 | |
| 	}
 | |
| 	// targetPort is zero if it is specified as a name in port.TargetPort.
 | |
| 	// Its real value would be got later from endpoints.
 | |
| 	targetPort := 0
 | |
| 	if port.TargetPort.Type == intstr.Int {
 | |
| 		targetPort = port.TargetPort.IntValue()
 | |
| 	}
 | |
| 
 | |
| 	info.preserveDIP = preserveDIP
 | |
| 	info.targetPort = targetPort
 | |
| 	info.hns = proxier.hns
 | |
| 	info.localTrafficDSR = localTrafficDSR
 | |
| 	info.internalTrafficLocal = internalTrafficLocal
 | |
| 	info.winProxyOptimization = winProxyOptimization
 | |
| 	klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "internalTrafficLocal", internalTrafficLocal, "preserveDIP", preserveDIP, "winProxyOptimization", winProxyOptimization)
 | |
| 
 | |
| 	for _, eip := range service.Spec.ExternalIPs {
 | |
| 		info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
 | |
| 	}
 | |
| 
 | |
| 	for _, ingress := range service.Status.LoadBalancer.Ingress {
 | |
| 		if netutils.ParseIPSloppy(ingress.IP) != nil {
 | |
| 			info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
 | |
| 		}
 | |
| 	}
 | |
| 	return info
 | |
| }
 | |
| 
 | |
| func (network hnsNetworkInfo) findRemoteSubnetProviderAddress(ip string) string {
 | |
| 	var providerAddress string
 | |
| 	for _, rs := range network.remoteSubnets {
 | |
| 		_, ipNet, err := netutils.ParseCIDRSloppy(rs.destinationPrefix)
 | |
| 		if err != nil {
 | |
| 			klog.ErrorS(err, "Failed to parse CIDR")
 | |
| 		}
 | |
| 		if ipNet.Contains(netutils.ParseIPSloppy(ip)) {
 | |
| 			providerAddress = rs.providerAddress
 | |
| 		}
 | |
| 		if ip == rs.providerAddress {
 | |
| 			providerAddress = rs.providerAddress
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return providerAddress
 | |
| }
 | |
| 
 | |
| type endPointsReferenceCountMap map[string]*uint16
 | |
| 
 | |
| // Proxier is an hns based proxy for connections between a localhost:lport
 | |
| // and services that provide the actual backends.
 | |
| type Proxier struct {
 | |
| 	// ipFamily defines the IP family which this proxier is tracking.
 | |
| 	ipFamily v1.IPFamily
 | |
| 	// TODO(imroc): implement node handler for winkernel proxier.
 | |
| 	proxyconfig.NoopNodeHandler
 | |
| 
 | |
| 	// endpointsChanges and serviceChanges contains all changes to endpoints and
 | |
| 	// services that happened since policies were synced. For a single object,
 | |
| 	// changes are accumulated, i.e. previous is state from before all of them,
 | |
| 	// current is state after applying all of those.
 | |
| 	endpointsChanges  *proxy.EndpointsChangeTracker
 | |
| 	serviceChanges    *proxy.ServiceChangeTracker
 | |
| 	endPointsRefCount endPointsReferenceCountMap
 | |
| 	mu                sync.Mutex // protects the following fields
 | |
| 	svcPortMap        proxy.ServicePortMap
 | |
| 	endpointsMap      proxy.EndpointsMap
 | |
| 	// endpointSlicesSynced and servicesSynced are set to true when corresponding
 | |
| 	// objects are synced after startup. This is used to avoid updating hns policies
 | |
| 	// with some partial data after kube-proxy restart.
 | |
| 	endpointSlicesSynced bool
 | |
| 	servicesSynced       bool
 | |
| 	initialized          int32
 | |
| 	syncRunner           *async.BoundedFrequencyRunner // governs calls to syncProxyRules
 | |
| 	// These are effectively const and do not need the mutex to be held.
 | |
| 	clusterCIDR string
 | |
| 	hostname    string
 | |
| 	nodeIP      net.IP
 | |
| 	recorder    events.EventRecorder
 | |
| 
 | |
| 	serviceHealthServer healthcheck.ServiceHealthServer
 | |
| 	healthzServer       *healthcheck.ProxierHealthServer
 | |
| 
 | |
| 	hns               HostNetworkService
 | |
| 	hcn               HcnService
 | |
| 	network           hnsNetworkInfo
 | |
| 	sourceVip         string
 | |
| 	hostMac           string
 | |
| 	isDSR             bool
 | |
| 	supportedFeatures hcn.SupportedFeatures
 | |
| 	healthzPort       int
 | |
| 
 | |
| 	forwardHealthCheckVip bool
 | |
| 	rootHnsEndpointName   string
 | |
| 	mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration
 | |
| }
 | |
| 
 | |
| type localPort struct {
 | |
| 	desc     string
 | |
| 	ip       string
 | |
| 	port     int
 | |
| 	protocol string
 | |
| }
 | |
| 
 | |
| func (lp *localPort) String() string {
 | |
| 	return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
 | |
| }
 | |
| 
 | |
| func Enum(p v1.Protocol) uint16 {
 | |
| 	if p == v1.ProtocolTCP {
 | |
| 		return 6
 | |
| 	}
 | |
| 	if p == v1.ProtocolUDP {
 | |
| 		return 17
 | |
| 	}
 | |
| 	if p == v1.ProtocolSCTP {
 | |
| 		return 132
 | |
| 	}
 | |
| 	return 0
 | |
| }
 | |
| 
 | |
| type closeable interface {
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| // Proxier implements proxy.Provider
 | |
| var _ proxy.Provider = &Proxier{}
 | |
| 
 | |
| // NewProxier returns a new Proxier
 | |
| func NewProxier(
 | |
| 	ipFamily v1.IPFamily,
 | |
| 	syncPeriod time.Duration,
 | |
| 	minSyncPeriod time.Duration,
 | |
| 	clusterCIDR string,
 | |
| 	hostname string,
 | |
| 	nodeIP net.IP,
 | |
| 	recorder events.EventRecorder,
 | |
| 	healthzServer *healthcheck.ProxierHealthServer,
 | |
| 	config config.KubeProxyWinkernelConfiguration,
 | |
| 	healthzPort int,
 | |
| ) (*Proxier, error) {
 | |
| 	if nodeIP == nil {
 | |
| 		klog.InfoS("Invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
 | |
| 		nodeIP = netutils.ParseIPSloppy("127.0.0.1")
 | |
| 	}
 | |
| 
 | |
| 	if len(clusterCIDR) == 0 {
 | |
| 		klog.InfoS("ClusterCIDR not specified, unable to distinguish between internal and external traffic")
 | |
| 	}
 | |
| 
 | |
| 	// windows listens to all node addresses
 | |
| 	nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil, nil)
 | |
| 	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
 | |
| 
 | |
| 	hcnImpl := newHcnImpl()
 | |
| 	hns, supportedFeatures := newHostNetworkService(hcnImpl)
 | |
| 	hnsNetworkName, err := getNetworkName(config.NetworkName)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	klog.V(3).InfoS("Cleaning up old HNS policy lists")
 | |
| 	hcnImpl.DeleteAllHnsLoadBalancerPolicy()
 | |
| 
 | |
| 	// Get HNS network information
 | |
| 	hnsNetworkInfo, err := getNetworkInfo(hns, hnsNetworkName)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Network could have been detected before Remote Subnet Routes are applied or ManagementIP is updated
 | |
| 	// Sleep and update the network to include new information
 | |
| 	if isOverlay(hnsNetworkInfo) {
 | |
| 		time.Sleep(10 * time.Second)
 | |
| 		hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("could not find HNS network %s", hnsNetworkName)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	klog.V(1).InfoS("Hns Network loaded", "hnsNetworkInfo", hnsNetworkInfo)
 | |
| 	isDSR := config.EnableDSR
 | |
| 	if isDSR && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinDSR) {
 | |
| 		return nil, fmt.Errorf("WinDSR feature gate not enabled")
 | |
| 	}
 | |
| 
 | |
| 	err = hcnImpl.DsrSupported()
 | |
| 	if isDSR && err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var sourceVip string
 | |
| 	var hostMac string
 | |
| 	if isOverlay(hnsNetworkInfo) {
 | |
| 		if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) {
 | |
| 			return nil, fmt.Errorf("WinOverlay feature gate not enabled")
 | |
| 		}
 | |
| 		err = hcn.RemoteSubnetSupported()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		sourceVip = config.SourceVip
 | |
| 		if len(sourceVip) == 0 {
 | |
| 			return nil, fmt.Errorf("source-vip flag not set")
 | |
| 		}
 | |
| 
 | |
| 		if nodeIP.IsUnspecified() {
 | |
| 			// attempt to get the correct ip address
 | |
| 			klog.V(2).InfoS("Node ip was unspecified, attempting to find node ip")
 | |
| 			nodeIP, err = apiutil.ResolveBindAddress(nodeIP)
 | |
| 			if err != nil {
 | |
| 				klog.InfoS("Failed to find an ip. You may need set the --bind-address flag", "err", err)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		interfaces, _ := net.Interfaces() //TODO create interfaces
 | |
| 		for _, inter := range interfaces {
 | |
| 			addresses, _ := inter.Addrs()
 | |
| 			for _, addr := range addresses {
 | |
| 				addrIP, _, _ := netutils.ParseCIDRSloppy(addr.String())
 | |
| 				if addrIP.String() == nodeIP.String() {
 | |
| 					klog.V(2).InfoS("Record Host MAC address", "addr", inter.HardwareAddr)
 | |
| 					hostMac = inter.HardwareAddr.String()
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		if len(hostMac) == 0 {
 | |
| 			return nil, fmt.Errorf("could not find host mac address for %s", nodeIP)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	proxier := &Proxier{
 | |
| 		ipFamily:              ipFamily,
 | |
| 		endPointsRefCount:     make(endPointsReferenceCountMap),
 | |
| 		svcPortMap:            make(proxy.ServicePortMap),
 | |
| 		endpointsMap:          make(proxy.EndpointsMap),
 | |
| 		clusterCIDR:           clusterCIDR,
 | |
| 		hostname:              hostname,
 | |
| 		nodeIP:                nodeIP,
 | |
| 		recorder:              recorder,
 | |
| 		serviceHealthServer:   serviceHealthServer,
 | |
| 		healthzServer:         healthzServer,
 | |
| 		hns:                   hns,
 | |
| 		hcn:                   hcnImpl,
 | |
| 		network:               *hnsNetworkInfo,
 | |
| 		sourceVip:             sourceVip,
 | |
| 		hostMac:               hostMac,
 | |
| 		isDSR:                 isDSR,
 | |
| 		supportedFeatures:     supportedFeatures,
 | |
| 		healthzPort:           healthzPort,
 | |
| 		rootHnsEndpointName:   config.RootHnsEndpointName,
 | |
| 		forwardHealthCheckVip: config.ForwardHealthCheckVip,
 | |
| 		mapStaleLoadbalancers: make(map[string]bool),
 | |
| 	}
 | |
| 
 | |
| 	serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
 | |
| 	endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
 | |
| 	proxier.endpointsChanges = endPointChangeTracker
 | |
| 	proxier.serviceChanges = serviceChanges
 | |
| 
 | |
| 	burstSyncs := 2
 | |
| 	klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
 | |
| 	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
 | |
| 	return proxier, nil
 | |
| }
 | |
| 
 | |
| func NewDualStackProxier(
 | |
| 	syncPeriod time.Duration,
 | |
| 	minSyncPeriod time.Duration,
 | |
| 	clusterCIDR string,
 | |
| 	hostname string,
 | |
| 	nodeIPs map[v1.IPFamily]net.IP,
 | |
| 	recorder events.EventRecorder,
 | |
| 	healthzServer *healthcheck.ProxierHealthServer,
 | |
| 	config config.KubeProxyWinkernelConfiguration,
 | |
| 	healthzPort int,
 | |
| ) (proxy.Provider, error) {
 | |
| 
 | |
| 	// Create an ipv4 instance of the single-stack proxier
 | |
| 	ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
 | |
| 		clusterCIDR, hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
 | |
| 		config, healthzPort)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv4Protocol])
 | |
| 	}
 | |
| 
 | |
| 	ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
 | |
| 		clusterCIDR, hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
 | |
| 		config, healthzPort)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIPs[v1.IPv6Protocol])
 | |
| 	}
 | |
| 
 | |
| 	// Return a meta-proxier that dispatch calls between the two
 | |
| 	// single-stack proxier instances
 | |
| 	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
 | |
| }
 | |
| 
 | |
| // CleanupLeftovers removes all hns rules created by the Proxier
 | |
| // It returns true if an error was encountered. Errors are logged.
 | |
| func CleanupLeftovers() (encounteredError bool) {
 | |
| 	// Delete all Hns Load Balancer Policies
 | |
| 	newHcnImpl().DeleteAllHnsLoadBalancerPolicy()
 | |
| 	// TODO
 | |
| 	// Delete all Hns Remote endpoints
 | |
| 
 | |
| 	return encounteredError
 | |
| }
 | |
| 
 | |
| func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapStaleLoadbalancers map[string]bool, isEndpointChange bool) {
 | |
| 	klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo)
 | |
| 	// if it's an endpoint change and winProxyOptimization annotation enable, skip lb deletion and remoteEndpoint deletion
 | |
| 	winProxyOptimization := isEndpointChange && svcInfo.winProxyOptimization
 | |
| 	if winProxyOptimization {
 | |
| 		klog.V(3).InfoS("Skipped loadbalancer deletion.", "hnsID", svcInfo.hnsID, "nodePorthnsID", svcInfo.nodePorthnsID, "winProxyOptimization", svcInfo.winProxyOptimization, "isEndpointChange", isEndpointChange)
 | |
| 	} else {
 | |
| 		// Skip the svcInfo.policyApplied check to remove all the policies
 | |
| 		svcInfo.deleteLoadBalancerPolicy(mapStaleLoadbalancers)
 | |
| 	}
 | |
| 	// Cleanup Endpoints references
 | |
| 	for _, ep := range endpoints {
 | |
| 		epInfo, ok := ep.(*endpointInfo)
 | |
| 		if ok {
 | |
| 			if winProxyOptimization {
 | |
| 				epInfo.DecrementRefCount()
 | |
| 			} else {
 | |
| 				epInfo.Cleanup()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if svcInfo.remoteEndpoint != nil {
 | |
| 		svcInfo.remoteEndpoint.Cleanup()
 | |
| 	}
 | |
| 
 | |
| 	svcInfo.policyApplied = false
 | |
| }
 | |
| 
 | |
| func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[string]bool) {
 | |
| 	// Remove the Hns Policy corresponding to this service
 | |
| 	hns := svcInfo.hns
 | |
| 	if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil {
 | |
| 		mapStaleLoadbalancer[svcInfo.hnsID] = true
 | |
| 		klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP())
 | |
| 	} else {
 | |
| 		// On successful delete, remove hnsId
 | |
| 		svcInfo.hnsID = ""
 | |
| 	}
 | |
| 
 | |
| 	if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil {
 | |
| 		mapStaleLoadbalancer[svcInfo.nodePorthnsID] = true
 | |
| 		klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort())
 | |
| 	} else {
 | |
| 		// On successful delete, remove hnsId
 | |
| 		svcInfo.nodePorthnsID = ""
 | |
| 	}
 | |
| 
 | |
| 	for _, externalIP := range svcInfo.externalIPs {
 | |
| 		mapStaleLoadbalancer[externalIP.hnsID] = true
 | |
| 		if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil {
 | |
| 			klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip)
 | |
| 		} else {
 | |
| 			// On successful delete, remove hnsId
 | |
| 			externalIP.hnsID = ""
 | |
| 		}
 | |
| 	}
 | |
| 	for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
 | |
| 		klog.V(3).InfoS("Loadbalancer Hns LoadBalancer delete triggered for loadBalancer Ingress resources in cleanup", "lbIngressIP", lbIngressIP)
 | |
| 		if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil {
 | |
| 			mapStaleLoadbalancer[lbIngressIP.hnsID] = true
 | |
| 			klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip)
 | |
| 		} else {
 | |
| 			// On successful delete, remove hnsId
 | |
| 			lbIngressIP.hnsID = ""
 | |
| 		}
 | |
| 
 | |
| 		if lbIngressIP.healthCheckHnsID != "" {
 | |
| 			if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil {
 | |
| 				mapStaleLoadbalancer[lbIngressIP.healthCheckHnsID] = true
 | |
| 				klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip)
 | |
| 			} else {
 | |
| 				// On successful delete, remove hnsId
 | |
| 				lbIngressIP.healthCheckHnsID = ""
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Sync is called to synchronize the proxier state to hns as soon as possible.
 | |
| func (proxier *Proxier) Sync() {
 | |
| 	if proxier.healthzServer != nil {
 | |
| 		proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
 | |
| 	}
 | |
| 	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
 | |
| 	proxier.syncRunner.Run()
 | |
| }
 | |
| 
 | |
| // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
 | |
| func (proxier *Proxier) SyncLoop() {
 | |
| 	// Update healthz timestamp at beginning in case Sync() never succeeds.
 | |
| 	if proxier.healthzServer != nil {
 | |
| 		proxier.healthzServer.Updated(proxier.ipFamily)
 | |
| 	}
 | |
| 	// synthesize "last change queued" time as the informers are syncing.
 | |
| 	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
 | |
| 	proxier.syncRunner.Loop(wait.NeverStop)
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) setInitialized(value bool) {
 | |
| 	var initialized int32
 | |
| 	if value {
 | |
| 		initialized = 1
 | |
| 	}
 | |
| 	atomic.StoreInt32(&proxier.initialized, initialized)
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) isInitialized() bool {
 | |
| 	return atomic.LoadInt32(&proxier.initialized) > 0
 | |
| }
 | |
| 
 | |
| // OnServiceAdd is called whenever creation of new service object
 | |
| // is observed.
 | |
| func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
 | |
| 	proxier.OnServiceUpdate(nil, service)
 | |
| }
 | |
| 
 | |
| // OnServiceUpdate is called whenever modification of an existing
 | |
| // service object is observed.
 | |
| func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
 | |
| 	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
 | |
| 		proxier.Sync()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnServiceDelete is called whenever deletion of an existing service
 | |
| // object is observed.
 | |
| func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
 | |
| 	proxier.OnServiceUpdate(service, nil)
 | |
| }
 | |
| 
 | |
| // OnServiceSynced is called once all the initial event handlers were
 | |
| // called and the state is fully propagated to local cache.
 | |
| func (proxier *Proxier) OnServiceSynced() {
 | |
| 	proxier.mu.Lock()
 | |
| 	proxier.servicesSynced = true
 | |
| 	proxier.setInitialized(proxier.endpointSlicesSynced)
 | |
| 	proxier.mu.Unlock()
 | |
| 
 | |
| 	// Sync unconditionally - this is called once per lifetime.
 | |
| 	proxier.syncProxyRules()
 | |
| }
 | |
| 
 | |
| // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
 | |
| // is observed.
 | |
| func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
 | |
| 	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
 | |
| 		proxier.Sync()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
 | |
| // slice object is observed.
 | |
| func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
 | |
| 	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
 | |
| 		proxier.Sync()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
 | |
| // object is observed.
 | |
| func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
 | |
| 	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
 | |
| 		proxier.Sync()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OnEndpointSlicesSynced is called once all the initial event handlers were
 | |
| // called and the state is fully propagated to local cache.
 | |
| func (proxier *Proxier) OnEndpointSlicesSynced() {
 | |
| 	proxier.mu.Lock()
 | |
| 	proxier.endpointSlicesSynced = true
 | |
| 	proxier.setInitialized(proxier.servicesSynced)
 | |
| 	proxier.mu.Unlock()
 | |
| 
 | |
| 	// Sync unconditionally - this is called once per lifetime.
 | |
| 	proxier.syncProxyRules()
 | |
| }
 | |
| 
 | |
| // OnServiceCIDRsChanged is called whenever a change is observed
 | |
| // in any of the ServiceCIDRs, and provides complete list of service cidrs.
 | |
| func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
 | |
| 
 | |
| func (proxier *Proxier) cleanupAllPolicies() {
 | |
| 	for svcName, svc := range proxier.svcPortMap {
 | |
| 		svcInfo, ok := svc.(*serviceInfo)
 | |
| 		if !ok {
 | |
| 			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
 | |
| 			continue
 | |
| 		}
 | |
| 		svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName], proxier.mapStaleLoadbalancers, false)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func isNetworkNotFoundError(err error) bool {
 | |
| 	if err == nil {
 | |
| 		return false
 | |
| 	}
 | |
| 	if _, ok := err.(hcn.NetworkNotFoundError); ok {
 | |
| 		return true
 | |
| 	}
 | |
| 	if _, ok := err.(hcsshim.NetworkNotFoundError); ok {
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // isAllEndpointsTerminating function will return true if all the endpoints are terminating.
 | |
| // If atleast one is not terminating, then return false
 | |
| func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
 | |
| 	for _, epInfo := range proxier.endpointsMap[svcName] {
 | |
| 		ep, ok := epInfo.(*endpointInfo)
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		if isLocalTrafficDSR && !ep.IsLocal() {
 | |
| 			// KEP-1669: Ignore remote endpoints when the ExternalTrafficPolicy is Local (DSR Mode)
 | |
| 			continue
 | |
| 		}
 | |
| 		// If Readiness Probe fails and pod is not under delete, then
 | |
| 		// the state of the endpoint will be - Ready:False, Serving:False, Terminating:False
 | |
| 		if !ep.IsReady() && !ep.IsTerminating() {
 | |
| 			// Ready:false, Terminating:False, ignore
 | |
| 			continue
 | |
| 		}
 | |
| 		if !ep.IsTerminating() {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // isAllEndpointsNonServing function will return true if all the endpoints are non serving.
 | |
| // If atleast one is serving, then return false
 | |
| func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
 | |
| 	for _, epInfo := range proxier.endpointsMap[svcName] {
 | |
| 		ep, ok := epInfo.(*endpointInfo)
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		if isLocalTrafficDSR && !ep.IsLocal() {
 | |
| 			continue
 | |
| 		}
 | |
| 		if ep.IsServing() {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details
 | |
| func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) {
 | |
| 	// store newly created endpoints in queriedEndpoints
 | |
| 	queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
 | |
| 	queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
 | |
| }
 | |
| 
 | |
| // This is where all of the hns save/restore calls happen.
 | |
| // assumes proxier.mu is held
 | |
| func (proxier *Proxier) syncProxyRules() {
 | |
| 	proxier.mu.Lock()
 | |
| 	defer proxier.mu.Unlock()
 | |
| 
 | |
| 	// don't sync rules till we've received services and endpoints
 | |
| 	if !proxier.isInitialized() {
 | |
| 		klog.V(2).InfoS("Not syncing hns until Services and Endpoints have been received from master")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Keep track of how long syncs take.
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
 | |
| 		klog.V(4).InfoS("Syncing proxy rules complete", "elapsed", time.Since(start))
 | |
| 	}()
 | |
| 
 | |
| 	hnsNetworkName := proxier.network.name
 | |
| 	hns := proxier.hns
 | |
| 
 | |
| 	var gatewayHnsendpoint *endpointInfo
 | |
| 	if proxier.forwardHealthCheckVip {
 | |
| 		gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
 | |
| 	}
 | |
| 
 | |
| 	prevNetworkID := proxier.network.id
 | |
| 	updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
 | |
| 	if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
 | |
| 		klog.InfoS("The HNS network is not present or has changed since the last sync, please check the CNI deployment", "hnsNetworkName", hnsNetworkName)
 | |
| 		proxier.cleanupAllPolicies()
 | |
| 		if updatedNetwork != nil {
 | |
| 			proxier.network = *updatedNetwork
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// We assume that if this was called, we really want to sync them,
 | |
| 	// even if nothing changed in the meantime. In other words, callers are
 | |
| 	// responsible for detecting no-op changes and not calling this function.
 | |
| 	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
 | |
| 	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
 | |
| 
 | |
| 	deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
 | |
| 	// merge stale services gathered from EndpointsMap.Update
 | |
| 	for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
 | |
| 		if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
 | |
| 			klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
 | |
| 			deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
 | |
| 		}
 | |
| 	}
 | |
| 	// Query HNS for endpoints and load balancers
 | |
| 	queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
 | |
| 	if err != nil {
 | |
| 		klog.ErrorS(err, "Querying HNS for endpoints failed")
 | |
| 		return
 | |
| 	}
 | |
| 	if queriedEndpoints == nil {
 | |
| 		klog.V(4).InfoS("No existing endpoints found in HNS")
 | |
| 		queriedEndpoints = make(map[string]*(endpointInfo))
 | |
| 	}
 | |
| 	queriedLoadBalancers, err := hns.getAllLoadBalancers()
 | |
| 	if queriedLoadBalancers == nil {
 | |
| 		klog.V(4).InfoS("No existing load balancers found in HNS")
 | |
| 		queriedLoadBalancers = make(map[loadBalancerIdentifier]*(loadBalancerInfo))
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		klog.ErrorS(err, "Querying HNS for load balancers failed")
 | |
| 		return
 | |
| 	}
 | |
| 	if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
 | |
| 		if _, ok := queriedEndpoints[proxier.sourceVip]; !ok {
 | |
| 			_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
 | |
| 			if err != nil {
 | |
| 				klog.ErrorS(err, "Source Vip endpoint creation failed")
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	klog.V(3).InfoS("Syncing Policies")
 | |
| 
 | |
| 	// Program HNS by adding corresponding policies for each service.
 | |
| 	for svcName, svc := range proxier.svcPortMap {
 | |
| 		svcInfo, ok := svc.(*serviceInfo)
 | |
| 		if !ok {
 | |
| 			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if svcInfo.policyApplied {
 | |
| 			klog.V(4).InfoS("Policy already applied", "serviceInfo", svcInfo)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
 | |
| 			serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
 | |
| 			if serviceVipEndpoint == nil {
 | |
| 				klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
 | |
| 				hnsEndpoint := &endpointInfo{
 | |
| 					ip:              svcInfo.ClusterIP().String(),
 | |
| 					isLocal:         false,
 | |
| 					macAddress:      proxier.hostMac,
 | |
| 					providerAddress: proxier.nodeIP.String(),
 | |
| 				}
 | |
| 
 | |
| 				newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
 | |
| 				if err != nil {
 | |
| 					klog.ErrorS(err, "Remote endpoint creation failed for service VIP")
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
 | |
| 				*newHnsEndpoint.refCount++
 | |
| 				svcInfo.remoteEndpoint = newHnsEndpoint
 | |
| 				updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		var hnsEndpoints []endpointInfo
 | |
| 		var hnsLocalEndpoints []endpointInfo
 | |
| 		klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName)
 | |
| 		// Create Remote endpoints for every endpoint, corresponding to the service
 | |
| 		containsPublicIP := false
 | |
| 		containsNodeIP := false
 | |
| 		var allEndpointsTerminating, allEndpointsNonServing bool
 | |
| 		someEndpointsServing := true
 | |
| 
 | |
| 		if len(svcInfo.loadBalancerIngressIPs) > 0 {
 | |
| 			// Check should be done only if comes under the feature gate or enabled
 | |
| 			// The check should be done only if Spec.Type == Loadbalancer.
 | |
| 			allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR)
 | |
| 			allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR)
 | |
| 			someEndpointsServing = !allEndpointsNonServing
 | |
| 			klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR)
 | |
| 		} else {
 | |
| 			klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs))
 | |
| 		}
 | |
| 
 | |
| 		for _, epInfo := range proxier.endpointsMap[svcName] {
 | |
| 			ep, ok := epInfo.(*endpointInfo)
 | |
| 			if !ok {
 | |
| 				klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if svcInfo.internalTrafficLocal && svcInfo.localTrafficDSR && !ep.IsLocal() {
 | |
| 				// No need to use or create remote endpoint when internal and external traffic policy is remote
 | |
| 				klog.V(3).InfoS("Skipping the endpoint. Both internalTraffic and external traffic policies are local", "EpIP", ep.ip, " EpPort", ep.port)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if someEndpointsServing {
 | |
| 
 | |
| 				if !allEndpointsTerminating && !ep.IsReady() {
 | |
| 					klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady())
 | |
| 					continue
 | |
| 				}
 | |
| 				if !ep.IsServing() {
 | |
| 					klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing())
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 			}
 | |
| 
 | |
| 			var newHnsEndpoint *endpointInfo
 | |
| 			hnsNetworkName := proxier.network.name
 | |
| 			var err error
 | |
| 
 | |
| 			// targetPort is zero if it is specified as a name in port.TargetPort, so the real port should be got from endpoints.
 | |
| 			// Note that hcsshim.AddLoadBalancer() doesn't support endpoints with different ports, so only port from first endpoint is used.
 | |
| 			// TODO(feiskyer): add support of different endpoint ports after hcsshim.AddLoadBalancer() add that.
 | |
| 			if svcInfo.targetPort == 0 {
 | |
| 				svcInfo.targetPort = int(ep.port)
 | |
| 			}
 | |
| 			// There is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address, so we need to check using endpoint ID first.
 | |
| 			// TODO: Remove lookup by endpoint ID, and use the IP address only, so we don't need to maintain multiple keys for lookup.
 | |
| 			if len(ep.hnsID) > 0 {
 | |
| 				newHnsEndpoint = queriedEndpoints[ep.hnsID]
 | |
| 			}
 | |
| 
 | |
| 			if newHnsEndpoint == nil {
 | |
| 				// First check if an endpoint resource exists for this IP, on the current host
 | |
| 				// A Local endpoint could exist here already
 | |
| 				// A remote endpoint was already created and proxy was restarted
 | |
| 				newHnsEndpoint = queriedEndpoints[ep.IP()]
 | |
| 			}
 | |
| 
 | |
| 			if newHnsEndpoint == nil {
 | |
| 				if ep.IsLocal() {
 | |
| 					klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
 | |
| 					klog.InfoS("Updating network to check for new remote subnet policies", "networkName", proxier.network.name)
 | |
| 					networkName := proxier.network.name
 | |
| 					updatedNetwork, err := hns.getNetworkByName(networkName)
 | |
| 					if err != nil {
 | |
| 						klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
 | |
| 						proxier.cleanupAllPolicies()
 | |
| 						return
 | |
| 					}
 | |
| 					proxier.network = *updatedNetwork
 | |
| 					providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
 | |
| 					if len(providerAddress) == 0 {
 | |
| 						klog.InfoS("Could not find provider address, assuming it is a public IP", "IP", ep.IP())
 | |
| 						providerAddress = proxier.nodeIP.String()
 | |
| 					}
 | |
| 
 | |
| 					hnsEndpoint := &endpointInfo{
 | |
| 						ip:              ep.ip,
 | |
| 						isLocal:         false,
 | |
| 						macAddress:      conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)),
 | |
| 						providerAddress: providerAddress,
 | |
| 					}
 | |
| 
 | |
| 					newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
 | |
| 					if err != nil {
 | |
| 						klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint)
 | |
| 						continue
 | |
| 					}
 | |
| 					updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
 | |
| 				} else {
 | |
| 
 | |
| 					hnsEndpoint := &endpointInfo{
 | |
| 						ip:         ep.ip,
 | |
| 						isLocal:    false,
 | |
| 						macAddress: ep.macAddress,
 | |
| 					}
 | |
| 
 | |
| 					newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
 | |
| 					if err != nil {
 | |
| 						klog.ErrorS(err, "Remote endpoint creation failed")
 | |
| 						continue
 | |
| 					}
 | |
| 					updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
 | |
| 				}
 | |
| 			}
 | |
| 			// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
 | |
| 			// a) Source VIP configured on kube-proxy (or)
 | |
| 			// b) Node IP of the current node
 | |
| 			//
 | |
| 			// For L2Bridge network the Source VIP is always the NodeIP of the current node and the same
 | |
| 			// would be configured on kube-proxy as SourceVIP
 | |
| 			//
 | |
| 			// The logic for choosing the SourceVIP in Overlay networks is based on the backend endpoints:
 | |
| 			// a) Endpoints are any IP's outside the cluster ==> Choose NodeIP as the SourceVIP
 | |
| 			// b) Endpoints are IP addresses of a remote node => Choose NodeIP as the SourceVIP
 | |
| 			// c) Everything else (Local POD's, Remote POD's, Node IP of current node) ==> Choose the configured SourceVIP
 | |
| 			if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) && !ep.IsLocal() {
 | |
| 				providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
 | |
| 
 | |
| 				isNodeIP := (ep.IP() == providerAddress)
 | |
| 				isPublicIP := (len(providerAddress) == 0)
 | |
| 				klog.InfoS("Endpoint on overlay network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName, "isNodeIP", isNodeIP, "isPublicIP", isPublicIP)
 | |
| 
 | |
| 				containsNodeIP = containsNodeIP || isNodeIP
 | |
| 				containsPublicIP = containsPublicIP || isPublicIP
 | |
| 			}
 | |
| 
 | |
| 			// Save the hnsId for reference
 | |
| 			klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint)
 | |
| 
 | |
| 			hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
 | |
| 			if newHnsEndpoint.IsLocal() {
 | |
| 				hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
 | |
| 			} else {
 | |
| 				// We only share the refCounts for remote endpoints
 | |
| 				ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
 | |
| 				*ep.refCount++
 | |
| 			}
 | |
| 
 | |
| 			ep.hnsID = newHnsEndpoint.hnsID
 | |
| 
 | |
| 			klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep)
 | |
| 		}
 | |
| 
 | |
| 		klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName)
 | |
| 
 | |
| 		if len(svcInfo.hnsID) > 0 {
 | |
| 			// This should not happen
 | |
| 			klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
 | |
| 		}
 | |
| 
 | |
| 		// In ETP:Cluster, if all endpoints are under termination,
 | |
| 		// it will have serving and terminating, else only ready and serving
 | |
| 		if len(hnsEndpoints) == 0 {
 | |
| 			if svcInfo.winProxyOptimization {
 | |
| 				// Deleting loadbalancers when there are no endpoints to serve.
 | |
| 				klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName)
 | |
| 				svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
 | |
| 			}
 | |
| 			klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		klog.V(4).InfoS("Trying to apply Policies for service", "serviceInfo", svcInfo)
 | |
| 		var hnsLoadBalancer *loadBalancerInfo
 | |
| 		var sourceVip = proxier.sourceVip
 | |
| 		if containsPublicIP || containsNodeIP {
 | |
| 			sourceVip = proxier.nodeIP.String()
 | |
| 		}
 | |
| 
 | |
| 		sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
 | |
| 		if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
 | |
| 			klog.InfoS("Session Affinity is not supported on this version of Windows")
 | |
| 		}
 | |
| 
 | |
| 		endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing
 | |
| 		proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
 | |
| 
 | |
| 		// clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer.
 | |
| 		clusterIPEndpoints := hnsEndpoints
 | |
| 		if svcInfo.internalTrafficLocal {
 | |
| 			// Take local endpoints for clusterip loadbalancer when internal traffic policy is local.
 | |
| 			clusterIPEndpoints = hnsLocalEndpoints
 | |
| 		}
 | |
| 
 | |
| 		if len(clusterIPEndpoints) > 0 {
 | |
| 
 | |
| 			// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer
 | |
| 			// Cluster IP LoadBalancer creation
 | |
| 			hnsLoadBalancer, err := hns.getLoadBalancer(
 | |
| 				clusterIPEndpoints,
 | |
| 				loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
 | |
| 				sourceVip,
 | |
| 				svcInfo.ClusterIP().String(),
 | |
| 				Enum(svcInfo.Protocol()),
 | |
| 				uint16(svcInfo.targetPort),
 | |
| 				uint16(svcInfo.Port()),
 | |
| 				queriedLoadBalancers,
 | |
| 			)
 | |
| 			if err != nil {
 | |
| 				klog.ErrorS(err, "Policy creation failed")
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			svcInfo.hnsID = hnsLoadBalancer.hnsID
 | |
| 			klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
 | |
| 
 | |
| 		} else {
 | |
| 			klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
 | |
| 		}
 | |
| 
 | |
| 		// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
 | |
| 		if svcInfo.NodePort() > 0 {
 | |
| 			// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
 | |
| 			// This means that health services can use Node Port without falsely getting results from a different node.
 | |
| 			nodePortEndpoints := hnsEndpoints
 | |
| 			if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
 | |
| 				nodePortEndpoints = hnsLocalEndpoints
 | |
| 			}
 | |
| 
 | |
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers)
 | |
| 
 | |
| 			if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
 | |
| 				// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
 | |
| 				hnsLoadBalancer, err := hns.getLoadBalancer(
 | |
| 					nodePortEndpoints,
 | |
| 					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
 | |
| 					sourceVip,
 | |
| 					"",
 | |
| 					Enum(svcInfo.Protocol()),
 | |
| 					uint16(svcInfo.targetPort),
 | |
| 					uint16(svcInfo.NodePort()),
 | |
| 					queriedLoadBalancers,
 | |
| 				)
 | |
| 				if err != nil {
 | |
| 					klog.ErrorS(err, "Policy creation failed")
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
 | |
| 				klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
 | |
| 			} else {
 | |
| 				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Create a Load Balancer Policy for each external IP
 | |
| 		for _, externalIP := range svcInfo.externalIPs {
 | |
| 			// Disable routing mesh if ExternalTrafficPolicy is set to local
 | |
| 			externalIPEndpoints := hnsEndpoints
 | |
| 			if svcInfo.localTrafficDSR {
 | |
| 				externalIPEndpoints = hnsLocalEndpoints
 | |
| 			}
 | |
| 
 | |
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
 | |
| 
 | |
| 			if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
 | |
| 				// If all endpoints are in terminating stage, then no need to External IP LoadBalancer
 | |
| 				// Try loading existing policies, if already available
 | |
| 				hnsLoadBalancer, err = hns.getLoadBalancer(
 | |
| 					externalIPEndpoints,
 | |
| 					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
 | |
| 					sourceVip,
 | |
| 					externalIP.ip,
 | |
| 					Enum(svcInfo.Protocol()),
 | |
| 					uint16(svcInfo.targetPort),
 | |
| 					uint16(svcInfo.Port()),
 | |
| 					queriedLoadBalancers,
 | |
| 				)
 | |
| 				if err != nil {
 | |
| 					klog.ErrorS(err, "Policy creation failed")
 | |
| 					continue
 | |
| 				}
 | |
| 				externalIP.hnsID = hnsLoadBalancer.hnsID
 | |
| 				klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
 | |
| 			} else {
 | |
| 				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
 | |
| 			}
 | |
| 		}
 | |
| 		// Create a Load Balancer Policy for each loadbalancer ingress
 | |
| 		for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
 | |
| 			// Try loading existing policies, if already available
 | |
| 			lbIngressEndpoints := hnsEndpoints
 | |
| 			if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
 | |
| 				lbIngressEndpoints = hnsLocalEndpoints
 | |
| 			}
 | |
| 
 | |
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
 | |
| 
 | |
| 			if len(lbIngressEndpoints) > 0 {
 | |
| 				hnsLoadBalancer, err := hns.getLoadBalancer(
 | |
| 					lbIngressEndpoints,
 | |
| 					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
 | |
| 					sourceVip,
 | |
| 					lbIngressIP.ip,
 | |
| 					Enum(svcInfo.Protocol()),
 | |
| 					uint16(svcInfo.targetPort),
 | |
| 					uint16(svcInfo.Port()),
 | |
| 					queriedLoadBalancers,
 | |
| 				)
 | |
| 				if err != nil {
 | |
| 					klog.ErrorS(err, "Policy creation failed")
 | |
| 					continue
 | |
| 				}
 | |
| 				lbIngressIP.hnsID = hnsLoadBalancer.hnsID
 | |
| 				klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
 | |
| 			} else {
 | |
| 				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
 | |
| 			}
 | |
| 
 | |
| 			if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB {
 | |
| 				// Avoid creating health check loadbalancer if all the endpoints are terminating
 | |
| 				nodeport := proxier.healthzPort
 | |
| 				if svcInfo.HealthCheckNodePort() != 0 {
 | |
| 					nodeport = svcInfo.HealthCheckNodePort()
 | |
| 				}
 | |
| 
 | |
| 				proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
 | |
| 
 | |
| 				hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
 | |
| 					[]endpointInfo{*gatewayHnsendpoint},
 | |
| 					loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
 | |
| 					sourceVip,
 | |
| 					lbIngressIP.ip,
 | |
| 					Enum(svcInfo.Protocol()),
 | |
| 					uint16(nodeport),
 | |
| 					uint16(nodeport),
 | |
| 					queriedLoadBalancers,
 | |
| 				)
 | |
| 				if err != nil {
 | |
| 					klog.ErrorS(err, "Policy creation failed")
 | |
| 					continue
 | |
| 				}
 | |
| 				lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
 | |
| 				klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
 | |
| 			} else {
 | |
| 				klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating)
 | |
| 			}
 | |
| 		}
 | |
| 		svcInfo.policyApplied = true
 | |
| 		klog.V(2).InfoS("Policy successfully applied for service", "serviceInfo", svcInfo)
 | |
| 	}
 | |
| 
 | |
| 	if proxier.healthzServer != nil {
 | |
| 		proxier.healthzServer.Updated(proxier.ipFamily)
 | |
| 	}
 | |
| 	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
 | |
| 
 | |
| 	// Update service healthchecks.  The endpoints list might include services that are
 | |
| 	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
 | |
| 	// will just drop those endpoints.
 | |
| 	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
 | |
| 		klog.ErrorS(err, "Error syncing healthcheck services")
 | |
| 	}
 | |
| 	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
 | |
| 		klog.ErrorS(err, "Error syncing healthcheck endpoints")
 | |
| 	}
 | |
| 
 | |
| 	// Finish housekeeping.
 | |
| 	// TODO: these could be made more consistent.
 | |
| 	for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
 | |
| 		// TODO : Check if this is required to cleanup stale services here
 | |
| 		klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
 | |
| 	}
 | |
| 
 | |
| 	// remove stale endpoint refcount entries
 | |
| 	for hnsID, referenceCount := range proxier.endPointsRefCount {
 | |
| 		if *referenceCount <= 0 {
 | |
| 			klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID)
 | |
| 			proxier.hns.deleteEndpoint(hnsID)
 | |
| 			delete(proxier.endPointsRefCount, hnsID)
 | |
| 		}
 | |
| 	}
 | |
| 	// This will cleanup stale load balancers which are pending delete
 | |
| 	// in last iteration
 | |
| 	proxier.cleanupStaleLoadbalancers()
 | |
| }
 | |
| 
 | |
| // deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not.
 | |
| // If it is needed, the function will delete the existing loadbalancer and return true, else false.
 | |
| func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
 | |
| 
 | |
| 	if !winProxyOptimization || *lbHnsID == "" {
 | |
| 		// Loadbalancer delete not needed
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	lbID, lbIdErr := findLoadBalancerID(
 | |
| 		endpoints,
 | |
| 		sourceVip,
 | |
| 		protocol,
 | |
| 		intPort,
 | |
| 		extPort,
 | |
| 	)
 | |
| 
 | |
| 	if lbIdErr != nil {
 | |
| 		return proxier.deleteLoadBalancer(hns, lbHnsID)
 | |
| 	}
 | |
| 
 | |
| 	if _, ok := queriedLoadBalancers[lbID]; ok {
 | |
| 		// The existing loadbalancer in the system is same as what we try to delete and recreate. So we skip deleting.
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	return proxier.deleteLoadBalancer(hns, lbHnsID)
 | |
| }
 | |
| 
 | |
| func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool {
 | |
| 	klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID)
 | |
| 	if err := hns.deleteLoadBalancer(*lbHnsID); err != nil {
 | |
| 		// This will be cleanup by cleanupStaleLoadbalancer fnction.
 | |
| 		proxier.mapStaleLoadbalancers[*lbHnsID] = true
 | |
| 	}
 | |
| 	*lbHnsID = ""
 | |
| 	return true
 | |
| }
 | 
