mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	Merge pull request #44318 from wojtek-t/edge_based_proxy_2
Automatic merge from submit-queue (batch tested with PRs 44414, 44318) Finish migration to edge-based for endpoints in KubeProxy Ref #43702
This commit is contained in:
		@@ -220,9 +220,6 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
			
		||||
 | 
			
		||||
	var proxier proxy.ProxyProvider
 | 
			
		||||
	var servicesHandler proxyconfig.ServiceConfigHandler
 | 
			
		||||
	// TODO: Migrate all handlers to EndpointsHandler type and
 | 
			
		||||
	// get rid of this one.
 | 
			
		||||
	var endpointsHandler proxyconfig.EndpointsConfigHandler
 | 
			
		||||
	var endpointsEventHandler proxyconfig.EndpointsHandler
 | 
			
		||||
 | 
			
		||||
	proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
 | 
			
		||||
@@ -261,7 +258,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
			
		||||
			// our config.EndpointsConfigHandler.
 | 
			
		||||
			loadBalancer := winuserspace.NewLoadBalancerRR()
 | 
			
		||||
			// set EndpointsHandler to our loadBalancer
 | 
			
		||||
			endpointsHandler = loadBalancer
 | 
			
		||||
			endpointsEventHandler = loadBalancer
 | 
			
		||||
			proxierUserspace, err := winuserspace.NewProxier(
 | 
			
		||||
				loadBalancer,
 | 
			
		||||
				net.ParseIP(config.BindAddress),
 | 
			
		||||
@@ -321,12 +318,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
			
		||||
	go serviceConfig.Run(wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
 | 
			
		||||
	if endpointsHandler != nil {
 | 
			
		||||
		endpointsConfig.RegisterHandler(endpointsHandler)
 | 
			
		||||
	}
 | 
			
		||||
	if endpointsEventHandler != nil {
 | 
			
		||||
	endpointsConfig.RegisterEventHandler(endpointsEventHandler)
 | 
			
		||||
	}
 | 
			
		||||
	go endpointsConfig.Run(wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
 | 
			
		||||
 
 | 
			
		||||
@@ -143,7 +143,7 @@ func main() {
 | 
			
		||||
		serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
 | 
			
		||||
 | 
			
		||||
		endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod)
 | 
			
		||||
		endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
 | 
			
		||||
		endpointsConfig.RegisterEventHandler(&kubemark.FakeProxyHandler{})
 | 
			
		||||
 | 
			
		||||
		eventClient, err := clientgoclientset.NewForConfig(clientConfig)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -41,7 +41,10 @@ type HollowProxy struct {
 | 
			
		||||
type FakeProxyHandler struct{}
 | 
			
		||||
 | 
			
		||||
func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service)                  {}
 | 
			
		||||
func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {}
 | 
			
		||||
func (*FakeProxyHandler) OnEndpointsAdd(endpoints *api.Endpoints)                  {}
 | 
			
		||||
func (*FakeProxyHandler) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {}
 | 
			
		||||
func (*FakeProxyHandler) OnEndpointsDelete(endpoints *api.Endpoints)               {}
 | 
			
		||||
func (*FakeProxyHandler) OnEndpointsSynced()                                       {}
 | 
			
		||||
 | 
			
		||||
type FakeProxier struct{}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -40,6 +40,7 @@ go_test(
 | 
			
		||||
        "//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
 | 
			
		||||
        "//pkg/client/informers/informers_generated/internalversion:go_default_library",
 | 
			
		||||
        "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
 | 
			
		||||
        "//vendor:k8s.io/apimachinery/pkg/types",
 | 
			
		||||
        "//vendor:k8s.io/apimachinery/pkg/util/wait",
 | 
			
		||||
        "//vendor:k8s.io/apimachinery/pkg/watch",
 | 
			
		||||
        "//vendor:k8s.io/client-go/testing",
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	ktesting "k8s.io/client-go/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
@@ -124,40 +125,34 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	defer close(stopCh)
 | 
			
		||||
 | 
			
		||||
	ch := make(chan struct{})
 | 
			
		||||
	handler := newEpsHandler(t, nil, func() { ch <- struct{}{} })
 | 
			
		||||
	handler := NewEndpointsHandlerMock()
 | 
			
		||||
 | 
			
		||||
	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
 | 
			
		||||
 | 
			
		||||
	endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
 | 
			
		||||
	endpointsConfig.RegisterHandler(handler)
 | 
			
		||||
	endpointsConfig.RegisterEventHandler(handler)
 | 
			
		||||
	go sharedInformers.Start(stopCh)
 | 
			
		||||
	go endpointsConfig.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	// Add the first endpoints
 | 
			
		||||
	handler.expected = []*api.Endpoints{endpoints1v1}
 | 
			
		||||
	fakeWatch.Add(endpoints1v1)
 | 
			
		||||
	<-ch
 | 
			
		||||
	handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1})
 | 
			
		||||
 | 
			
		||||
	// Add another endpoints
 | 
			
		||||
	handler.expected = []*api.Endpoints{endpoints1v1, endpoints2}
 | 
			
		||||
	fakeWatch.Add(endpoints2)
 | 
			
		||||
	<-ch
 | 
			
		||||
	handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1, endpoints2})
 | 
			
		||||
 | 
			
		||||
	// Modify endpoints1
 | 
			
		||||
	handler.expected = []*api.Endpoints{endpoints1v2, endpoints2}
 | 
			
		||||
	fakeWatch.Modify(endpoints1v2)
 | 
			
		||||
	<-ch
 | 
			
		||||
	handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v2, endpoints2})
 | 
			
		||||
 | 
			
		||||
	// Delete endpoints1
 | 
			
		||||
	handler.expected = []*api.Endpoints{endpoints2}
 | 
			
		||||
	fakeWatch.Delete(endpoints1v2)
 | 
			
		||||
	<-ch
 | 
			
		||||
	handler.ValidateEndpoints(t, []*api.Endpoints{endpoints2})
 | 
			
		||||
 | 
			
		||||
	// Delete endpoints2
 | 
			
		||||
	handler.expected = []*api.Endpoints{}
 | 
			
		||||
	fakeWatch.Delete(endpoints2)
 | 
			
		||||
	<-ch
 | 
			
		||||
	handler.ValidateEndpoints(t, []*api.Endpoints{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type svcHandler struct {
 | 
			
		||||
@@ -178,22 +173,17 @@ func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type epsHandler struct {
 | 
			
		||||
	t        *testing.T
 | 
			
		||||
	expected []*api.Endpoints
 | 
			
		||||
	done     func()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) *epsHandler {
 | 
			
		||||
	return &epsHandler{t: t, expected: eps, done: done}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e *epsHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {
 | 
			
		||||
	defer e.done()
 | 
			
		||||
	sort.Sort(sortedEndpoints(endpoints))
 | 
			
		||||
	if !reflect.DeepEqual(e.expected, endpoints) {
 | 
			
		||||
		e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected)
 | 
			
		||||
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
 | 
			
		||||
	ehm := &EndpointsHandlerMock{
 | 
			
		||||
		state: make(map[types.NamespacedName]*api.Endpoints),
 | 
			
		||||
	}
 | 
			
		||||
	ehm.process = func(endpoints []*api.Endpoints) {
 | 
			
		||||
		defer done()
 | 
			
		||||
		if !reflect.DeepEqual(eps, endpoints) {
 | 
			
		||||
			t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ehm
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestInitialSync(t *testing.T) {
 | 
			
		||||
@@ -225,7 +215,7 @@ func TestInitialSync(t *testing.T) {
 | 
			
		||||
	svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
 | 
			
		||||
	svcConfig.RegisterHandler(svcHandler)
 | 
			
		||||
	epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
 | 
			
		||||
	epsConfig.RegisterHandler(epsHandler)
 | 
			
		||||
	epsConfig.RegisterEventHandler(epsHandler)
 | 
			
		||||
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	defer close(stopCh)
 | 
			
		||||
 
 | 
			
		||||
@@ -45,21 +45,6 @@ type ServiceConfigHandler interface {
 | 
			
		||||
	OnServiceUpdate(services []*api.Service)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
 | 
			
		||||
type EndpointsConfigHandler interface {
 | 
			
		||||
	// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
 | 
			
		||||
	// service on any of the configuration sources. An example is when a new
 | 
			
		||||
	// service comes up, or when containers come up or down for an existing service.
 | 
			
		||||
	//
 | 
			
		||||
	// NOTE: For efficiency, endpoints are being passed by reference, thus,
 | 
			
		||||
	// OnEndpointsUpdate should NOT modify pointers of a given slice.
 | 
			
		||||
	// Those endpoints objects are shared with other layers of the system and
 | 
			
		||||
	// are guaranteed to be immutable with the assumption that are also
 | 
			
		||||
	// not mutated by those handlers. Make a deep copy if you need to modify
 | 
			
		||||
	// them in your code.
 | 
			
		||||
	OnEndpointsUpdate(endpoints []*api.Endpoints)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EndpointsHandler is an abstract interface o objects which receive
 | 
			
		||||
// notifications about endpoints object changes.
 | 
			
		||||
type EndpointsHandler interface {
 | 
			
		||||
@@ -83,11 +68,6 @@ type EndpointsConfig struct {
 | 
			
		||||
	lister        listers.EndpointsLister
 | 
			
		||||
	listerSynced  cache.InformerSynced
 | 
			
		||||
	eventHandlers []EndpointsHandler
 | 
			
		||||
	// TODO: Remove handlers by switching them to eventHandlers.
 | 
			
		||||
	handlers []EndpointsConfigHandler
 | 
			
		||||
	// updates channel is used to trigger registered handlers.
 | 
			
		||||
	updates chan struct{}
 | 
			
		||||
	stop    chan struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewEndpointsConfig creates a new EndpointsConfig.
 | 
			
		||||
@@ -95,12 +75,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn
 | 
			
		||||
	result := &EndpointsConfig{
 | 
			
		||||
		lister:       endpointsInformer.Lister(),
 | 
			
		||||
		listerSynced: endpointsInformer.Informer().HasSynced,
 | 
			
		||||
		// The updates channel is used to send interrupts to the Endpoints handler.
 | 
			
		||||
		// It's buffered because we never want to block for as long as there is a
 | 
			
		||||
		// pending interrupt, but don't want to drop them if the handler is doing
 | 
			
		||||
		// work.
 | 
			
		||||
		updates: make(chan struct{}, 1),
 | 
			
		||||
		stop:    make(chan struct{}),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
 | 
			
		||||
@@ -115,11 +89,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RegisterHandler registers a handler which is called on every endpoints change.
 | 
			
		||||
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
 | 
			
		||||
	c.handlers = append(c.handlers, handler)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RegisterEventHandler registers a handler which is called on every endpoints change.
 | 
			
		||||
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
 | 
			
		||||
	c.eventHandlers = append(c.eventHandlers, handler)
 | 
			
		||||
@@ -132,40 +101,12 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// We have synced informers. Now we can start delivering updates
 | 
			
		||||
	// to the registered handler.
 | 
			
		||||
	go func() {
 | 
			
		||||
	for i := range c.eventHandlers {
 | 
			
		||||
		glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
 | 
			
		||||
		c.eventHandlers[i].OnEndpointsSynced()
 | 
			
		||||
	}
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case <-c.updates:
 | 
			
		||||
				endpoints, err := c.lister.List(labels.Everything())
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					glog.Errorf("Error while listing endpoints from cache: %v", err)
 | 
			
		||||
					// This will cause a retry (if there isn't any other trigger in-flight).
 | 
			
		||||
					c.dispatchUpdate()
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				if endpoints == nil {
 | 
			
		||||
					endpoints = []*api.Endpoints{}
 | 
			
		||||
				}
 | 
			
		||||
				for i := range c.handlers {
 | 
			
		||||
					glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
 | 
			
		||||
					c.handlers[i].OnEndpointsUpdate(endpoints)
 | 
			
		||||
				}
 | 
			
		||||
			case <-c.stop:
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	// Close updates channel when stopCh is closed.
 | 
			
		||||
	go func() {
 | 
			
		||||
 | 
			
		||||
	<-stopCh
 | 
			
		||||
		close(c.stop)
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
 | 
			
		||||
@@ -178,7 +119,6 @@ func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
 | 
			
		||||
		glog.V(4).Infof("Calling handler.OnEndpointsAdd")
 | 
			
		||||
		c.eventHandlers[i].OnEndpointsAdd(endpoints)
 | 
			
		||||
	}
 | 
			
		||||
	c.dispatchUpdate()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
 | 
			
		||||
@@ -196,7 +136,6 @@ func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
 | 
			
		||||
		glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
 | 
			
		||||
		c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
 | 
			
		||||
	}
 | 
			
		||||
	c.dispatchUpdate()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
 | 
			
		||||
@@ -216,18 +155,6 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
 | 
			
		||||
		glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
 | 
			
		||||
		c.eventHandlers[i].OnEndpointsDelete(endpoints)
 | 
			
		||||
	}
 | 
			
		||||
	c.dispatchUpdate()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *EndpointsConfig) dispatchUpdate() {
 | 
			
		||||
	select {
 | 
			
		||||
	case c.updates <- struct{}{}:
 | 
			
		||||
		// Work enqueued successfully
 | 
			
		||||
	case <-c.stop:
 | 
			
		||||
		// We're shut down / avoid logging the message below
 | 
			
		||||
	default:
 | 
			
		||||
		glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ServiceConfig tracks a set of service configurations.
 | 
			
		||||
 
 | 
			
		||||
@@ -19,10 +19,12 @@ package config
 | 
			
		||||
import (
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	ktesting "k8s.io/client-go/testing"
 | 
			
		||||
@@ -45,7 +47,6 @@ func (s sortedServices) Less(i, j int) bool {
 | 
			
		||||
 | 
			
		||||
type ServiceHandlerMock struct {
 | 
			
		||||
	updated chan []*api.Service
 | 
			
		||||
	waits   int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewServiceHandlerMock() *ServiceHandlerMock {
 | 
			
		||||
@@ -90,17 +91,66 @@ func (s sortedEndpoints) Less(i, j int) bool {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type EndpointsHandlerMock struct {
 | 
			
		||||
	lock sync.Mutex
 | 
			
		||||
 | 
			
		||||
	state   map[types.NamespacedName]*api.Endpoints
 | 
			
		||||
	synced  bool
 | 
			
		||||
	updated chan []*api.Endpoints
 | 
			
		||||
	waits   int
 | 
			
		||||
	process func([]*api.Endpoints)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewEndpointsHandlerMock() *EndpointsHandlerMock {
 | 
			
		||||
	return &EndpointsHandlerMock{updated: make(chan []*api.Endpoints, 5)}
 | 
			
		||||
	ehm := &EndpointsHandlerMock{
 | 
			
		||||
		state:   make(map[types.NamespacedName]*api.Endpoints),
 | 
			
		||||
		updated: make(chan []*api.Endpoints, 5),
 | 
			
		||||
	}
 | 
			
		||||
	ehm.process = func(endpoints []*api.Endpoints) {
 | 
			
		||||
		ehm.updated <- endpoints
 | 
			
		||||
	}
 | 
			
		||||
	return ehm
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []*api.Endpoints) {
 | 
			
		||||
func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *api.Endpoints) {
 | 
			
		||||
	h.lock.Lock()
 | 
			
		||||
	defer h.lock.Unlock()
 | 
			
		||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
			
		||||
	h.state[namespacedName] = endpoints
 | 
			
		||||
	h.sendEndpoints()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
 | 
			
		||||
	h.lock.Lock()
 | 
			
		||||
	defer h.lock.Unlock()
 | 
			
		||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
			
		||||
	h.state[namespacedName] = endpoints
 | 
			
		||||
	h.sendEndpoints()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *api.Endpoints) {
 | 
			
		||||
	h.lock.Lock()
 | 
			
		||||
	defer h.lock.Unlock()
 | 
			
		||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
			
		||||
	delete(h.state, namespacedName)
 | 
			
		||||
	h.sendEndpoints()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EndpointsHandlerMock) OnEndpointsSynced() {
 | 
			
		||||
	h.lock.Lock()
 | 
			
		||||
	defer h.lock.Unlock()
 | 
			
		||||
	h.synced = true
 | 
			
		||||
	h.sendEndpoints()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EndpointsHandlerMock) sendEndpoints() {
 | 
			
		||||
	if !h.synced {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 0, len(h.state))
 | 
			
		||||
	for _, eps := range h.state {
 | 
			
		||||
		endpoints = append(endpoints, eps)
 | 
			
		||||
	}
 | 
			
		||||
	sort.Sort(sortedEndpoints(endpoints))
 | 
			
		||||
	h.updated <- endpoints
 | 
			
		||||
	h.process(endpoints)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*api.Endpoints) {
 | 
			
		||||
@@ -230,8 +280,8 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
 | 
			
		||||
	config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
 | 
			
		||||
	handler := NewEndpointsHandlerMock()
 | 
			
		||||
	handler2 := NewEndpointsHandlerMock()
 | 
			
		||||
	config.RegisterHandler(handler)
 | 
			
		||||
	config.RegisterHandler(handler2)
 | 
			
		||||
	config.RegisterEventHandler(handler)
 | 
			
		||||
	config.RegisterEventHandler(handler2)
 | 
			
		||||
	go sharedInformers.Start(stopCh)
 | 
			
		||||
	go config.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
@@ -270,8 +320,8 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
 | 
			
		||||
	config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
 | 
			
		||||
	handler := NewEndpointsHandlerMock()
 | 
			
		||||
	handler2 := NewEndpointsHandlerMock()
 | 
			
		||||
	config.RegisterHandler(handler)
 | 
			
		||||
	config.RegisterHandler(handler2)
 | 
			
		||||
	config.RegisterEventHandler(handler)
 | 
			
		||||
	config.RegisterEventHandler(handler2)
 | 
			
		||||
	go sharedInformers.Start(stopCh)
 | 
			
		||||
	go config.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -216,14 +216,12 @@ func getPortNum(t *testing.T, addr string) int {
 | 
			
		||||
func TestTCPProxy(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -245,14 +243,12 @@ func TestTCPProxy(t *testing.T) {
 | 
			
		||||
func TestUDPProxy(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -274,14 +270,12 @@ func TestUDPProxy(t *testing.T) {
 | 
			
		||||
func TestUDPProxyTimeout(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -308,19 +302,20 @@ func TestMultiPortProxy(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
 | 
			
		||||
	serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
	}, {
 | 
			
		||||
	})
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
	}})
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
@@ -410,14 +405,12 @@ func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error {
 | 
			
		||||
func TestTCPProxyStop(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -456,14 +449,12 @@ func TestTCPProxyStop(t *testing.T) {
 | 
			
		||||
func TestUDPProxyStop(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -496,14 +487,12 @@ func TestUDPProxyStop(t *testing.T) {
 | 
			
		||||
func TestTCPProxyUpdateDelete(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -536,14 +525,12 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
 | 
			
		||||
func TestUDPProxyUpdateDelete(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -582,7 +569,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
			
		||||
	lb.OnEndpointsAdd(endpoint)
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
@@ -610,7 +597,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
	waitForNumProxyLoops(t, p, 0)
 | 
			
		||||
 | 
			
		||||
	// need to add endpoint here because it got clean up during service delete
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
			
		||||
	lb.OnEndpointsAdd(endpoint)
 | 
			
		||||
	p.OnServiceUpdate([]*api.Service{{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
			
		||||
@@ -637,7 +624,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
			
		||||
	lb.OnEndpointsAdd(endpoint)
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
@@ -665,7 +652,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
	waitForNumProxyLoops(t, p, 0)
 | 
			
		||||
 | 
			
		||||
	// need to add endpoint here because it got clean up during service delete
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
			
		||||
	lb.OnEndpointsAdd(endpoint)
 | 
			
		||||
	p.OnServiceUpdate([]*api.Service{{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
			
		||||
@@ -685,14 +672,12 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
			
		||||
func TestTCPProxyUpdatePort(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -735,14 +720,12 @@ func TestTCPProxyUpdatePort(t *testing.T) {
 | 
			
		||||
func TestUDPProxyUpdatePort(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: udpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -782,14 +765,12 @@ func TestUDPProxyUpdatePort(t *testing.T) {
 | 
			
		||||
func TestProxyUpdatePublicIPs(t *testing.T) {
 | 
			
		||||
	lb := NewLoadBalancerRR()
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{
 | 
			
		||||
		{
 | 
			
		||||
	lb.OnEndpointsAdd(&api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
@@ -843,7 +824,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
			
		||||
	lb.OnEndpointsAdd(endpoint)
 | 
			
		||||
 | 
			
		||||
	listenIP := "0.0.0.0"
 | 
			
		||||
	p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
 | 
			
		||||
@@ -894,7 +875,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
			
		||||
			Protocol: "TCP",
 | 
			
		||||
		}}},
 | 
			
		||||
	}})
 | 
			
		||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
			
		||||
	lb.OnEndpointsAdd(endpoint)
 | 
			
		||||
	svcInfo, exists = p.getServiceInfo(servicePortPortalName)
 | 
			
		||||
	if !exists {
 | 
			
		||||
		t.Fatalf("service with ClusterIP set not found in the proxy")
 | 
			
		||||
 
 | 
			
		||||
@@ -233,24 +233,12 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OnEndpointsUpdate manages the registered service endpoints.
 | 
			
		||||
// Registered endpoints are updated if found in the update set or
 | 
			
		||||
// unregistered if missing from the update set.
 | 
			
		||||
func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
 | 
			
		||||
	registeredEndpoints := make(map[proxy.ServicePortName]bool)
 | 
			
		||||
	lb.lock.Lock()
 | 
			
		||||
	defer lb.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	// Update endpoints for services.
 | 
			
		||||
	for i := range allEndpoints {
 | 
			
		||||
		// svcEndpoints should NOT be modified.
 | 
			
		||||
		svcEndpoints := allEndpoints[i]
 | 
			
		||||
 | 
			
		||||
		// We need to build a map of portname -> all ip:ports for that
 | 
			
		||||
		// portname.  Explode Endpoints.Subsets[*] into this structure.
 | 
			
		||||
// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that
 | 
			
		||||
// portname. Explode Endpoints.Subsets[*] into this structure.
 | 
			
		||||
func buildPortsToEndpointsMap(endpoints *api.Endpoints) map[string][]hostPortPair {
 | 
			
		||||
	portsToEndpoints := map[string][]hostPortPair{}
 | 
			
		||||
		for i := range svcEndpoints.Subsets {
 | 
			
		||||
			ss := &svcEndpoints.Subsets[i]
 | 
			
		||||
	for i := range endpoints.Subsets {
 | 
			
		||||
		ss := &endpoints.Subsets[i]
 | 
			
		||||
		for i := range ss.Ports {
 | 
			
		||||
			port := &ss.Ports[i]
 | 
			
		||||
			for i := range ss.Addresses {
 | 
			
		||||
@@ -260,15 +248,53 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return portsToEndpoints
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *api.Endpoints) {
 | 
			
		||||
	portsToEndpoints := buildPortsToEndpointsMap(endpoints)
 | 
			
		||||
 | 
			
		||||
	lb.lock.Lock()
 | 
			
		||||
	defer lb.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	for portname := range portsToEndpoints {
 | 
			
		||||
			svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
 | 
			
		||||
		svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
 | 
			
		||||
		newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
 | 
			
		||||
		state, exists := lb.services[svcPort]
 | 
			
		||||
 | 
			
		||||
		if !exists || state == nil || len(newEndpoints) > 0 {
 | 
			
		||||
			glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
 | 
			
		||||
			lb.updateAffinityMap(svcPort, newEndpoints)
 | 
			
		||||
			// OnEndpointsAdd can be called without NewService being called externally.
 | 
			
		||||
			// To be safe we will call it here.  A new service will only be created
 | 
			
		||||
			// if one does not already exist.  The affinity will be updated
 | 
			
		||||
			// later, once NewService is called.
 | 
			
		||||
			state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0)
 | 
			
		||||
			state.endpoints = slice.ShuffleStrings(newEndpoints)
 | 
			
		||||
 | 
			
		||||
			// Reset the round-robin index.
 | 
			
		||||
			state.index = 0
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
 | 
			
		||||
	portsToEndpoints := buildPortsToEndpointsMap(endpoints)
 | 
			
		||||
	oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints)
 | 
			
		||||
	registeredEndpoints := make(map[proxy.ServicePortName]bool)
 | 
			
		||||
 | 
			
		||||
	lb.lock.Lock()
 | 
			
		||||
	defer lb.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	for portname := range portsToEndpoints {
 | 
			
		||||
		svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
 | 
			
		||||
		newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
 | 
			
		||||
		state, exists := lb.services[svcPort]
 | 
			
		||||
 | 
			
		||||
		curEndpoints := []string{}
 | 
			
		||||
		if state != nil {
 | 
			
		||||
			curEndpoints = state.endpoints
 | 
			
		||||
		}
 | 
			
		||||
			newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
 | 
			
		||||
 | 
			
		||||
		if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
 | 
			
		||||
			glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
 | 
			
		||||
@@ -285,13 +311,13 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
 | 
			
		||||
		}
 | 
			
		||||
		registeredEndpoints[svcPort] = true
 | 
			
		||||
	}
 | 
			
		||||
	}
 | 
			
		||||
	// Remove endpoints missing from the update.
 | 
			
		||||
	for k := range lb.services {
 | 
			
		||||
		if _, exists := registeredEndpoints[k]; !exists {
 | 
			
		||||
			glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k)
 | 
			
		||||
 | 
			
		||||
	for portname := range oldPortsToEndpoints {
 | 
			
		||||
		svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
 | 
			
		||||
		if _, exists := registeredEndpoints[svcPort]; !exists {
 | 
			
		||||
			glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort)
 | 
			
		||||
			// Reset but don't delete.
 | 
			
		||||
			state := lb.services[k]
 | 
			
		||||
			state := lb.services[svcPort]
 | 
			
		||||
			state.endpoints = []string{}
 | 
			
		||||
			state.index = 0
 | 
			
		||||
			state.affinity.affinityMap = map[string]*affinityState{}
 | 
			
		||||
@@ -299,6 +325,27 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *api.Endpoints) {
 | 
			
		||||
	portsToEndpoints := buildPortsToEndpointsMap(endpoints)
 | 
			
		||||
 | 
			
		||||
	lb.lock.Lock()
 | 
			
		||||
	defer lb.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	for portname := range portsToEndpoints {
 | 
			
		||||
		svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
 | 
			
		||||
		glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort)
 | 
			
		||||
		// If the service is still around, reset but don't delete.
 | 
			
		||||
		if state, ok := lb.services[svcPort]; ok {
 | 
			
		||||
			state.endpoints = []string{}
 | 
			
		||||
			state.index = 0
 | 
			
		||||
			state.affinity.affinityMap = map[string]*affinityState{}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lb *LoadBalancerRR) OnEndpointsSynced() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Tests whether two slices are equivalent.  This sorts both slices in-place.
 | 
			
		||||
func slicesEquiv(lhs, rhs []string) bool {
 | 
			
		||||
	if len(lhs) != len(rhs) {
 | 
			
		||||
 
 | 
			
		||||
@@ -67,8 +67,6 @@ func TestFilterWorks(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
 | 
			
		||||
	loadBalancer := NewLoadBalancerRR()
 | 
			
		||||
	var endpoints []*api.Endpoints
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
 | 
			
		||||
	endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
@@ -106,15 +104,14 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
	}
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: 40}},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
 | 
			
		||||
@@ -144,15 +141,14 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
	}
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{{
 | 
			
		||||
			Addresses: []api.EndpointAddress{{IP: "endpoint"}},
 | 
			
		||||
			Ports:     []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints)
 | 
			
		||||
 | 
			
		||||
	shuffledEndpoints := loadBalancer.services[service].endpoints
 | 
			
		||||
	if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
 | 
			
		||||
@@ -172,8 +168,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
	}
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -186,7 +181,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints)
 | 
			
		||||
 | 
			
		||||
	shuffledEndpoints := loadBalancer.services[serviceP].endpoints
 | 
			
		||||
	if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
 | 
			
		||||
@@ -215,8 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
	}
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv1 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -233,7 +227,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpointsv1)
 | 
			
		||||
 | 
			
		||||
	shuffledEndpoints := loadBalancer.services[serviceP].endpoints
 | 
			
		||||
	if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
 | 
			
		||||
@@ -255,7 +249,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	// Then update the configuration with one fewer endpoints, make sure
 | 
			
		||||
	// we start in the beginning again
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv2 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -268,7 +262,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
 | 
			
		||||
 | 
			
		||||
	shuffledEndpoints = loadBalancer.services[serviceP].endpoints
 | 
			
		||||
	if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
 | 
			
		||||
@@ -289,8 +283,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
 | 
			
		||||
 | 
			
		||||
	// Clear endpoints
 | 
			
		||||
	endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
 | 
			
		||||
 | 
			
		||||
	endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
@@ -306,8 +300,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
	}
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 2)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints1 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -316,7 +309,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	endpoints[1] = &api.Endpoints{
 | 
			
		||||
	endpoints2 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -325,7 +318,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints1)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints2)
 | 
			
		||||
	shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
 | 
			
		||||
	expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
 | 
			
		||||
@@ -341,7 +335,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
 | 
			
		||||
 | 
			
		||||
	// Then update the configuration by removing foo
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints[1:])
 | 
			
		||||
	loadBalancer.OnEndpointsDelete(endpoints1)
 | 
			
		||||
	endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
@@ -364,8 +358,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	// Call NewService() before OnEndpointsUpdate()
 | 
			
		||||
	loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
 | 
			
		||||
@@ -373,7 +366,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
 | 
			
		||||
			{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints)
 | 
			
		||||
 | 
			
		||||
	client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
 | 
			
		||||
	client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
 | 
			
		||||
@@ -420,15 +413,14 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Call OnEndpointsUpdate() before NewService()
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
 | 
			
		||||
			{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints)
 | 
			
		||||
	loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
 | 
			
		||||
	client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
 | 
			
		||||
@@ -482,8 +474,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv1 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -492,7 +483,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpointsv1)
 | 
			
		||||
	shuffledEndpoints := loadBalancer.services[service].endpoints
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
 | 
			
		||||
	client1Endpoint := shuffledEndpoints[0]
 | 
			
		||||
@@ -503,7 +494,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
 | 
			
		||||
	client3Endpoint := shuffledEndpoints[2]
 | 
			
		||||
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv2 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -512,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
 | 
			
		||||
	shuffledEndpoints = loadBalancer.services[service].endpoints
 | 
			
		||||
	if client1Endpoint == "endpoint:3" {
 | 
			
		||||
		client1Endpoint = shuffledEndpoints[0]
 | 
			
		||||
@@ -525,7 +516,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
 | 
			
		||||
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv3 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -534,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
 | 
			
		||||
	shuffledEndpoints = loadBalancer.services[service].endpoints
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
 | 
			
		||||
@@ -556,8 +547,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv1 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -566,7 +556,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpointsv1)
 | 
			
		||||
	shuffledEndpoints := loadBalancer.services[service].endpoints
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
 | 
			
		||||
@@ -577,7 +567,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
 | 
			
		||||
	// Then update the configuration with one fewer endpoints, make sure
 | 
			
		||||
	// we start in the beginning again
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpointsv2 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -586,7 +576,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2)
 | 
			
		||||
	shuffledEndpoints = loadBalancer.services[service].endpoints
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
 | 
			
		||||
@@ -596,8 +586,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
 | 
			
		||||
 | 
			
		||||
	// Clear endpoints
 | 
			
		||||
	endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3)
 | 
			
		||||
 | 
			
		||||
	endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
@@ -616,8 +606,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 2)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints1 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -628,7 +617,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""}
 | 
			
		||||
	loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
	endpoints[1] = &api.Endpoints{
 | 
			
		||||
	endpoints2 := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{
 | 
			
		||||
@@ -637,7 +626,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints1)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints2)
 | 
			
		||||
 | 
			
		||||
	shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
 | 
			
		||||
	expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
 | 
			
		||||
@@ -659,7 +649,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
 | 
			
		||||
	expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
 | 
			
		||||
 | 
			
		||||
	// Then update the configuration by removing foo
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints[1:])
 | 
			
		||||
	loadBalancer.OnEndpointsDelete(endpoints1)
 | 
			
		||||
	endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
 | 
			
		||||
	if err == nil || len(endpoint) != 0 {
 | 
			
		||||
		t.Errorf("Didn't fail with non-existent service")
 | 
			
		||||
@@ -685,8 +675,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	// Call NewService() before OnEndpointsUpdate()
 | 
			
		||||
	loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
 | 
			
		||||
	endpoints := make([]*api.Endpoints, 1)
 | 
			
		||||
	endpoints[0] = &api.Endpoints{
 | 
			
		||||
	endpoints := &api.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
			
		||||
		Subsets: []api.EndpointSubset{
 | 
			
		||||
			{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
 | 
			
		||||
@@ -694,7 +683,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
 | 
			
		||||
			{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	loadBalancer.OnEndpointsUpdate(endpoints)
 | 
			
		||||
	loadBalancer.OnEndpointsAdd(endpoints)
 | 
			
		||||
 | 
			
		||||
	client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
 | 
			
		||||
	client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user