mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #44756 from wojtek-t/faster_kube_proxy_2
Automatic merge from submit-queue (batch tested with PRs 44625, 43594, 44756, 44730) Don't rebuild service map in iptables kube-proxy all the time A sibling PR to https://github.com/kubernetes/kubernetes/pull/44494 (doing pretty much the same for services that we did for endpoints).
This commit is contained in:
		@@ -24,7 +24,6 @@ go_library(
 | 
			
		||||
        "//pkg/util/iptables:go_default_library",
 | 
			
		||||
        "//pkg/util/sysctl:go_default_library",
 | 
			
		||||
        "//pkg/util/version:go_default_library",
 | 
			
		||||
        "//vendor/github.com/davecgh/go-spew/spew:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -32,7 +32,6 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/davecgh/go-spew/spew"
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
@@ -199,8 +198,14 @@ type endpointsChange struct {
 | 
			
		||||
	previous *api.Endpoints
 | 
			
		||||
	current  *api.Endpoints
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type serviceChange struct {
 | 
			
		||||
	previous *api.Service
 | 
			
		||||
	current  *api.Service
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type endpointsChangeMap map[types.NamespacedName]*endpointsChange
 | 
			
		||||
type serviceMap map[types.NamespacedName]*api.Service
 | 
			
		||||
type serviceChangeMap map[types.NamespacedName]*serviceChange
 | 
			
		||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
 | 
			
		||||
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
 | 
			
		||||
 | 
			
		||||
@@ -219,20 +224,23 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
 | 
			
		||||
// Proxier is an iptables based proxy for connections between a localhost:lport
 | 
			
		||||
// and services that provide the actual backends.
 | 
			
		||||
type Proxier struct {
 | 
			
		||||
	mu           sync.Mutex // protects the following fields
 | 
			
		||||
	serviceMap   proxyServiceMap
 | 
			
		||||
	mu sync.Mutex // protects the following fields
 | 
			
		||||
 | 
			
		||||
	serviceMap proxyServiceMap
 | 
			
		||||
	// serviceChanges contains all changes to services that happened since
 | 
			
		||||
	// last syncProxyRules call. For a single object, changes are accumulated,
 | 
			
		||||
	// i.e. previous is state from before all of them, current is state after
 | 
			
		||||
	// applying all of those.
 | 
			
		||||
	serviceChanges serviceChangeMap
 | 
			
		||||
 | 
			
		||||
	endpointsMap proxyEndpointsMap
 | 
			
		||||
	// endpointsChanges contains all changes to endpoints that happened since
 | 
			
		||||
	// last syncProxyRules call. For a single object, changes are accumulated,
 | 
			
		||||
	// i.e. previous is state from before all of them, current is state after
 | 
			
		||||
	// applying all of those.
 | 
			
		||||
	endpointsChanges endpointsChangeMap
 | 
			
		||||
	portsMap         map[localPort]closeable
 | 
			
		||||
	// allServices should never be modified by proxier - the
 | 
			
		||||
	// pointers are shared with higher layers of kube-proxy. They are guaranteed
 | 
			
		||||
	// to not be modified in the meantime, but also require to be not modified
 | 
			
		||||
	// by Proxier.
 | 
			
		||||
	allServices serviceMap
 | 
			
		||||
 | 
			
		||||
	portsMap map[localPort]closeable
 | 
			
		||||
 | 
			
		||||
	// endpointsSynced and servicesSynced are set to true when corresponding
 | 
			
		||||
	// objects are synced after startup. This is used to avoid updating iptables
 | 
			
		||||
@@ -350,11 +358,11 @@ func NewProxier(ipt utiliptables.Interface,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &Proxier{
 | 
			
		||||
		portsMap:         make(map[localPort]closeable),
 | 
			
		||||
		serviceMap:       make(proxyServiceMap),
 | 
			
		||||
		serviceChanges:   make(serviceChangeMap),
 | 
			
		||||
		endpointsMap:     make(proxyEndpointsMap),
 | 
			
		||||
		endpointsChanges: make(endpointsChangeMap),
 | 
			
		||||
		portsMap:         make(map[localPort]closeable),
 | 
			
		||||
		allServices:      make(serviceMap),
 | 
			
		||||
		syncPeriod:       syncPeriod,
 | 
			
		||||
		minSyncPeriod:    minSyncPeriod,
 | 
			
		||||
		throttle:         throttle,
 | 
			
		||||
@@ -476,92 +484,37 @@ func (proxier *Proxier) SyncLoop() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Accepts a list of Services and the existing service map.  Returns the new
 | 
			
		||||
// service map, a map of healthcheck ports, and a set of stale UDP
 | 
			
		||||
// services.
 | 
			
		||||
func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
 | 
			
		||||
	newServiceMap := make(proxyServiceMap)
 | 
			
		||||
	hcPorts := make(map[types.NamespacedName]uint16)
 | 
			
		||||
 | 
			
		||||
	for _, service := range allServices {
 | 
			
		||||
		svcName := types.NamespacedName{
 | 
			
		||||
			Namespace: service.Namespace,
 | 
			
		||||
			Name:      service.Name,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// if ClusterIP is "None" or empty, skip proxying
 | 
			
		||||
		if !helper.IsServiceIPSet(service) {
 | 
			
		||||
			glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
 | 
			
		||||
		if service.Spec.Type == api.ServiceTypeExternalName {
 | 
			
		||||
			glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for i := range service.Spec.Ports {
 | 
			
		||||
			servicePort := &service.Spec.Ports[i]
 | 
			
		||||
 | 
			
		||||
			serviceName := proxy.ServicePortName{
 | 
			
		||||
				NamespacedName: svcName,
 | 
			
		||||
				Port:           servicePort.Name,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			info := newServiceInfo(serviceName, servicePort, service)
 | 
			
		||||
			oldInfo, exists := oldServiceMap[serviceName]
 | 
			
		||||
			equal := reflect.DeepEqual(info, oldInfo)
 | 
			
		||||
			if !exists {
 | 
			
		||||
				glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
 | 
			
		||||
			} else if !equal {
 | 
			
		||||
				glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if info.onlyNodeLocalEndpoints {
 | 
			
		||||
				hcPorts[svcName] = uint16(info.healthCheckNodePort)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			newServiceMap[serviceName] = info
 | 
			
		||||
			glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for nsn, port := range hcPorts {
 | 
			
		||||
		if port == 0 {
 | 
			
		||||
			glog.Errorf("Service %q has no healthcheck nodeport", nsn)
 | 
			
		||||
			delete(hcPorts, nsn)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	staleUDPServices := sets.NewString()
 | 
			
		||||
	// Remove serviceports missing from the update.
 | 
			
		||||
	for name, info := range oldServiceMap {
 | 
			
		||||
		if _, exists := newServiceMap[name]; !exists {
 | 
			
		||||
			glog.V(1).Infof("Removing service %q", name)
 | 
			
		||||
			if info.protocol == api.ProtocolUDP {
 | 
			
		||||
				staleUDPServices.Insert(info.clusterIP.String())
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return newServiceMap, hcPorts, staleUDPServices
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
 | 
			
		||||
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
			
		||||
 | 
			
		||||
	proxier.mu.Lock()
 | 
			
		||||
	defer proxier.mu.Unlock()
 | 
			
		||||
	proxier.allServices[namespacedName] = service
 | 
			
		||||
 | 
			
		||||
	change, exists := proxier.serviceChanges[namespacedName]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		change = &serviceChange{}
 | 
			
		||||
		change.previous = nil
 | 
			
		||||
		proxier.serviceChanges[namespacedName] = change
 | 
			
		||||
	}
 | 
			
		||||
	change.current = service
 | 
			
		||||
 | 
			
		||||
	proxier.syncProxyRules(syncReasonServices)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) {
 | 
			
		||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
 | 
			
		||||
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
			
		||||
 | 
			
		||||
	proxier.mu.Lock()
 | 
			
		||||
	defer proxier.mu.Unlock()
 | 
			
		||||
	proxier.allServices[namespacedName] = service
 | 
			
		||||
 | 
			
		||||
	change, exists := proxier.serviceChanges[namespacedName]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		change = &serviceChange{}
 | 
			
		||||
		change.previous = oldService
 | 
			
		||||
		proxier.serviceChanges[namespacedName] = change
 | 
			
		||||
	}
 | 
			
		||||
	change.current = service
 | 
			
		||||
 | 
			
		||||
	proxier.syncProxyRules(syncReasonServices)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -570,7 +523,15 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) {
 | 
			
		||||
 | 
			
		||||
	proxier.mu.Lock()
 | 
			
		||||
	defer proxier.mu.Unlock()
 | 
			
		||||
	delete(proxier.allServices, namespacedName)
 | 
			
		||||
 | 
			
		||||
	change, exists := proxier.serviceChanges[namespacedName]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		change = &serviceChange{}
 | 
			
		||||
		change.previous = service
 | 
			
		||||
		proxier.serviceChanges[namespacedName] = change
 | 
			
		||||
	}
 | 
			
		||||
	change.current = nil
 | 
			
		||||
 | 
			
		||||
	proxier.syncProxyRules(syncReasonServices)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -581,6 +542,114 @@ func (proxier *Proxier) OnServiceSynced() {
 | 
			
		||||
	proxier.syncProxyRules(syncReasonServices)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool {
 | 
			
		||||
	// if ClusterIP is "None" or empty, skip proxying
 | 
			
		||||
	if !helper.IsServiceIPSet(service) {
 | 
			
		||||
		glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
 | 
			
		||||
	if service.Spec.Type == api.ServiceTypeExternalName {
 | 
			
		||||
		glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) {
 | 
			
		||||
	if service == nil {
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
			
		||||
	if shouldSkipService(svcName, service) {
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
	syncRequired := false
 | 
			
		||||
	existingPorts := sets.NewString()
 | 
			
		||||
	for i := range service.Spec.Ports {
 | 
			
		||||
		servicePort := &service.Spec.Ports[i]
 | 
			
		||||
		serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
 | 
			
		||||
		existingPorts.Insert(servicePort.Name)
 | 
			
		||||
		info := newServiceInfo(serviceName, servicePort, service)
 | 
			
		||||
		oldInfo, exists := (*sm)[serviceName]
 | 
			
		||||
		equal := reflect.DeepEqual(info, oldInfo)
 | 
			
		||||
		if exists {
 | 
			
		||||
			glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
 | 
			
		||||
		} else if !equal {
 | 
			
		||||
			glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
 | 
			
		||||
		}
 | 
			
		||||
		if !equal {
 | 
			
		||||
			(*sm)[serviceName] = info
 | 
			
		||||
			syncRequired = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return syncRequired, existingPorts
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// <staleServices> are modified by this function with detected stale services.
 | 
			
		||||
func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool {
 | 
			
		||||
	if service == nil {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
			
		||||
	if shouldSkipService(svcName, service) {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	syncRequired := false
 | 
			
		||||
	for i := range service.Spec.Ports {
 | 
			
		||||
		servicePort := &service.Spec.Ports[i]
 | 
			
		||||
		if existingPorts.Has(servicePort.Name) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
 | 
			
		||||
		info, exists := (*sm)[serviceName]
 | 
			
		||||
		if exists {
 | 
			
		||||
			glog.V(1).Infof("Removing service %q", serviceName)
 | 
			
		||||
			if info.protocol == api.ProtocolUDP {
 | 
			
		||||
				staleServices.Insert(info.clusterIP.String())
 | 
			
		||||
			}
 | 
			
		||||
			delete(*sm, serviceName)
 | 
			
		||||
			syncRequired = true
 | 
			
		||||
		} else {
 | 
			
		||||
			glog.Errorf("Service %q removed, but doesn't exists", serviceName)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return syncRequired
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// <serviceMap> is updated by this function (based on the given changes).
 | 
			
		||||
// <changes> map is cleared after applying them.
 | 
			
		||||
func updateServiceMap(
 | 
			
		||||
	serviceMap proxyServiceMap,
 | 
			
		||||
	changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
 | 
			
		||||
	syncRequired = false
 | 
			
		||||
	staleServices = sets.NewString()
 | 
			
		||||
 | 
			
		||||
	for _, change := range *changes {
 | 
			
		||||
		mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
 | 
			
		||||
		unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
 | 
			
		||||
		syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
 | 
			
		||||
	}
 | 
			
		||||
	*changes = make(serviceChangeMap)
 | 
			
		||||
 | 
			
		||||
	// TODO: If this will appear to be computationally expensive, consider
 | 
			
		||||
	// computing this incrementally similarly to serviceMap.
 | 
			
		||||
	hcServices = make(map[types.NamespacedName]uint16)
 | 
			
		||||
	for svcPort, info := range serviceMap {
 | 
			
		||||
		if info.onlyNodeLocalEndpoints {
 | 
			
		||||
			hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for nsn, port := range hcServices {
 | 
			
		||||
		if port == 0 {
 | 
			
		||||
			glog.Errorf("Service %q has no healthcheck nodeport", nsn)
 | 
			
		||||
			delete(hcServices, nsn)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return syncRequired, hcServices, staleServices
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
 | 
			
		||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
			
		||||
 | 
			
		||||
@@ -849,10 +918,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Figure out the new services we need to activate.
 | 
			
		||||
	newServices, hcServices, staleServices := buildNewServiceMap(proxier.allServices, proxier.serviceMap)
 | 
			
		||||
	serviceSyncRequired, hcServices, staleServices := updateServiceMap(
 | 
			
		||||
		proxier.serviceMap, &proxier.serviceChanges)
 | 
			
		||||
 | 
			
		||||
	// If this was called because of a services update, but nothing actionable has changed, skip it.
 | 
			
		||||
	if reason == syncReasonServices && reflect.DeepEqual(newServices, proxier.serviceMap) {
 | 
			
		||||
	if reason == syncReasonServices && !serviceSyncRequired {
 | 
			
		||||
		glog.V(3).Infof("Skipping iptables sync because nothing changed")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
@@ -996,7 +1066,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
 | 
			
		||||
	replacementPortsMap := map[localPort]closeable{}
 | 
			
		||||
 | 
			
		||||
	// Build rules for each service.
 | 
			
		||||
	for svcName, svcInfo := range newServices {
 | 
			
		||||
	for svcName, svcInfo := range proxier.serviceMap {
 | 
			
		||||
		protocol := strings.ToLower(string(svcInfo.protocol))
 | 
			
		||||
		// Precompute svcNameString; with many services the many calls
 | 
			
		||||
		// to ServicePortName.String() show up in CPU profiles.
 | 
			
		||||
@@ -1437,8 +1507,6 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Finish housekeeping.
 | 
			
		||||
	proxier.serviceMap = newServices
 | 
			
		||||
 | 
			
		||||
	// TODO: these and clearUDPConntrackForPort() could be made more consistent.
 | 
			
		||||
	utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
 | 
			
		||||
	proxier.deleteEndpointConnections(staleEndpoints)
 | 
			
		||||
 
 | 
			
		||||
@@ -386,12 +386,11 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
 | 
			
		||||
	return &Proxier{
 | 
			
		||||
		exec:             &exec.FakeExec{},
 | 
			
		||||
		serviceMap:       make(proxyServiceMap),
 | 
			
		||||
		serviceChanges:   make(serviceChangeMap),
 | 
			
		||||
		endpointsMap:     make(proxyEndpointsMap),
 | 
			
		||||
		endpointsChanges: make(endpointsChangeMap),
 | 
			
		||||
		iptables:         ipt,
 | 
			
		||||
		clusterCIDR:      "10.0.0.0/24",
 | 
			
		||||
		allServices:      make(serviceMap),
 | 
			
		||||
		servicesSynced:   true,
 | 
			
		||||
		hostname:         testHostname,
 | 
			
		||||
		portsMap:         make(map[localPort]closeable),
 | 
			
		||||
		portMapper:       &fakePortOpener{[]*localPort{}},
 | 
			
		||||
@@ -569,7 +568,7 @@ func TestClusterIPReject(t *testing.T) {
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
			svc.Spec.Ports = []api.ServicePort{{
 | 
			
		||||
@@ -603,7 +602,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
			svc.Spec.Ports = []api.ServicePort{{
 | 
			
		||||
@@ -662,7 +661,7 @@ func TestLoadBalancer(t *testing.T) {
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = "LoadBalancer"
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
@@ -722,7 +721,7 @@ func TestNodePort(t *testing.T) {
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = "NodePort"
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
@@ -772,7 +771,7 @@ func TestNodePortReject(t *testing.T) {
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = "NodePort"
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
@@ -810,7 +809,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = "LoadBalancer"
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
@@ -904,7 +903,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
 | 
			
		||||
		Port:           "p80",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fp.allServices = makeServiceMap(
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = "NodePort"
 | 
			
		||||
			svc.Spec.ClusterIP = svcIP
 | 
			
		||||
@@ -996,7 +995,10 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
			
		||||
	services := makeServiceMap(
 | 
			
		||||
	ipt := iptablestest.NewFake()
 | 
			
		||||
	fp := NewFakeProxier(ipt)
 | 
			
		||||
 | 
			
		||||
	services := []*api.Service{
 | 
			
		||||
		makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = api.ServiceTypeClusterIP
 | 
			
		||||
			svc.Spec.ClusterIP = "172.16.55.4"
 | 
			
		||||
@@ -1037,11 +1039,14 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
 | 
			
		||||
	if len(serviceMap) != 8 {
 | 
			
		||||
		t.Errorf("expected service map length 8, got %v", serviceMap)
 | 
			
		||||
	for i := range services {
 | 
			
		||||
		fp.OnServiceAdd(services[i])
 | 
			
		||||
	}
 | 
			
		||||
	_, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if len(fp.serviceMap) != 8 {
 | 
			
		||||
		t.Errorf("expected service map length 8, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// The only-local-loadbalancer ones get added
 | 
			
		||||
@@ -1060,16 +1065,25 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Remove some stuff
 | 
			
		||||
	oneService := services[makeNSN("somewhere-else", "cluster-ip")]
 | 
			
		||||
	oneService.Spec.Ports = []api.ServicePort{oneService.Spec.Ports[1]}
 | 
			
		||||
	services = makeServiceMap(oneService)
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap)
 | 
			
		||||
	if len(serviceMap) != 1 {
 | 
			
		||||
		t.Errorf("expected service map length 1, got %v", serviceMap)
 | 
			
		||||
	// oneService is a modification of services[0] with removed first port.
 | 
			
		||||
	oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
 | 
			
		||||
		svc.Spec.Type = api.ServiceTypeClusterIP
 | 
			
		||||
		svc.Spec.ClusterIP = "172.16.55.4"
 | 
			
		||||
		svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	fp.OnServiceUpdate(services[0], oneService)
 | 
			
		||||
	fp.OnServiceDelete(services[1])
 | 
			
		||||
	fp.OnServiceDelete(services[2])
 | 
			
		||||
	fp.OnServiceDelete(services[3])
 | 
			
		||||
 | 
			
		||||
	_, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if len(fp.serviceMap) != 1 {
 | 
			
		||||
		t.Errorf("expected service map length 1, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(hcPorts) != 0 {
 | 
			
		||||
		t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
 | 
			
		||||
		t.Errorf("expected 0 healthcheck ports, got %v", hcPorts)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// All services but one were deleted. While you'd expect only the ClusterIPs
 | 
			
		||||
@@ -1087,7 +1101,10 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
 | 
			
		||||
	services := makeServiceMap(
 | 
			
		||||
	ipt := iptablestest.NewFake()
 | 
			
		||||
	fp := NewFakeProxier(ipt)
 | 
			
		||||
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService("somewhere-else", "headless", func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = api.ServiceTypeClusterIP
 | 
			
		||||
			svc.Spec.ClusterIP = api.ClusterIPNone
 | 
			
		||||
@@ -1096,9 +1113,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Headless service should be ignored
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
 | 
			
		||||
	if len(serviceMap) != 0 {
 | 
			
		||||
		t.Errorf("expected service map length 0, got %d", len(serviceMap))
 | 
			
		||||
	_, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if len(fp.serviceMap) != 0 {
 | 
			
		||||
		t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// No proxied services, so no healthchecks
 | 
			
		||||
@@ -1112,7 +1129,10 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
 | 
			
		||||
	services := makeServiceMap(
 | 
			
		||||
	ipt := iptablestest.NewFake()
 | 
			
		||||
	fp := NewFakeProxier(ipt)
 | 
			
		||||
 | 
			
		||||
	makeServiceMap(fp,
 | 
			
		||||
		makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = api.ServiceTypeExternalName
 | 
			
		||||
			svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
 | 
			
		||||
@@ -1121,9 +1141,9 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
 | 
			
		||||
	if len(serviceMap) != 0 {
 | 
			
		||||
		t.Errorf("expected service map length 0, got %v", serviceMap)
 | 
			
		||||
	_, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if len(fp.serviceMap) != 0 {
 | 
			
		||||
		t.Errorf("expected service map length 0, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
	// No proxied services, so no healthchecks
 | 
			
		||||
	if len(hcPorts) != 0 {
 | 
			
		||||
@@ -1135,37 +1155,40 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
			
		||||
	first := makeServiceMap(
 | 
			
		||||
		makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
			
		||||
			svc.Spec.Type = api.ServiceTypeClusterIP
 | 
			
		||||
			svc.Spec.ClusterIP = "172.16.55.4"
 | 
			
		||||
			svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
 | 
			
		||||
			svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
	ipt := iptablestest.NewFake()
 | 
			
		||||
	fp := NewFakeProxier(ipt)
 | 
			
		||||
 | 
			
		||||
	second := makeServiceMap(
 | 
			
		||||
		makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
			
		||||
			svc.ObjectMeta.Annotations = map[string]string{
 | 
			
		||||
				service.BetaAnnotationExternalTraffic:     service.AnnotationValueExternalTrafficLocal,
 | 
			
		||||
				service.BetaAnnotationHealthCheckNodePort: "345",
 | 
			
		||||
			}
 | 
			
		||||
			svc.Spec.Type = api.ServiceTypeLoadBalancer
 | 
			
		||||
			svc.Spec.ClusterIP = "172.16.55.4"
 | 
			
		||||
			svc.Spec.LoadBalancerIP = "5.6.7.8"
 | 
			
		||||
			svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
 | 
			
		||||
			svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
 | 
			
		||||
			svc.Status.LoadBalancer = api.LoadBalancerStatus{
 | 
			
		||||
				Ingress: []api.LoadBalancerIngress{
 | 
			
		||||
					{IP: "10.1.2.3"},
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
		}),
 | 
			
		||||
	)
 | 
			
		||||
	servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
			
		||||
		svc.Spec.Type = api.ServiceTypeClusterIP
 | 
			
		||||
		svc.Spec.ClusterIP = "172.16.55.4"
 | 
			
		||||
		svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
 | 
			
		||||
		svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
 | 
			
		||||
	})
 | 
			
		||||
	servicev2 := makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
			
		||||
		svc.ObjectMeta.Annotations = map[string]string{
 | 
			
		||||
			service.BetaAnnotationExternalTraffic:     service.AnnotationValueExternalTrafficLocal,
 | 
			
		||||
			service.BetaAnnotationHealthCheckNodePort: "345",
 | 
			
		||||
		}
 | 
			
		||||
		svc.Spec.Type = api.ServiceTypeLoadBalancer
 | 
			
		||||
		svc.Spec.ClusterIP = "172.16.55.4"
 | 
			
		||||
		svc.Spec.LoadBalancerIP = "5.6.7.8"
 | 
			
		||||
		svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
 | 
			
		||||
		svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
 | 
			
		||||
		svc.Status.LoadBalancer = api.LoadBalancerStatus{
 | 
			
		||||
			Ingress: []api.LoadBalancerIngress{
 | 
			
		||||
				{IP: "10.1.2.3"},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap))
 | 
			
		||||
	if len(serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", serviceMap)
 | 
			
		||||
	fp.OnServiceAdd(servicev1)
 | 
			
		||||
 | 
			
		||||
	syncRequired, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if !syncRequired {
 | 
			
		||||
		t.Errorf("expected sync required, got %t", syncRequired)
 | 
			
		||||
	}
 | 
			
		||||
	if len(fp.serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
	if len(hcPorts) != 0 {
 | 
			
		||||
		t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
 | 
			
		||||
@@ -1176,9 +1199,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Change service to load-balancer
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap)
 | 
			
		||||
	if len(serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", serviceMap)
 | 
			
		||||
	fp.OnServiceUpdate(servicev1, servicev2)
 | 
			
		||||
	syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if !syncRequired {
 | 
			
		||||
		t.Errorf("expected sync required, got %t", syncRequired)
 | 
			
		||||
	}
 | 
			
		||||
	if len(fp.serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
	if len(hcPorts) != 1 {
 | 
			
		||||
		t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
 | 
			
		||||
@@ -1189,9 +1216,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	// No change; make sure the service map stays the same and there are
 | 
			
		||||
	// no health-check changes
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap)
 | 
			
		||||
	if len(serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", serviceMap)
 | 
			
		||||
	fp.OnServiceUpdate(servicev2, servicev2)
 | 
			
		||||
	syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if syncRequired {
 | 
			
		||||
		t.Errorf("not expected sync required, got %t", syncRequired)
 | 
			
		||||
	}
 | 
			
		||||
	if len(fp.serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
	if len(hcPorts) != 1 {
 | 
			
		||||
		t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
 | 
			
		||||
@@ -1201,9 +1232,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// And back to ClusterIP
 | 
			
		||||
	serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(first, serviceMap)
 | 
			
		||||
	if len(serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", serviceMap)
 | 
			
		||||
	fp.OnServiceUpdate(servicev2, servicev1)
 | 
			
		||||
	syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
 | 
			
		||||
	if !syncRequired {
 | 
			
		||||
		t.Errorf("expected sync required, got %t", syncRequired)
 | 
			
		||||
	}
 | 
			
		||||
	if len(fp.serviceMap) != 2 {
 | 
			
		||||
		t.Errorf("expected service map length 2, got %v", fp.serviceMap)
 | 
			
		||||
	}
 | 
			
		||||
	if len(hcPorts) != 0 {
 | 
			
		||||
		t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
 | 
			
		||||
@@ -1509,13 +1544,14 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeServiceMap(allServices ...*api.Service) serviceMap {
 | 
			
		||||
	result := make(serviceMap)
 | 
			
		||||
	for _, service := range allServices {
 | 
			
		||||
		namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
			
		||||
		result[namespacedName] = service
 | 
			
		||||
func makeServiceMap(proxier *Proxier, allServices ...*api.Service) {
 | 
			
		||||
	for i := range allServices {
 | 
			
		||||
		proxier.OnServiceAdd(allServices[i])
 | 
			
		||||
	}
 | 
			
		||||
	return result
 | 
			
		||||
 | 
			
		||||
	proxier.mu.Lock()
 | 
			
		||||
	defer proxier.mu.Unlock()
 | 
			
		||||
	proxier.servicesSynced = true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user