mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #43937 from thockin/proxy-defer-on-update-events
Automatic merge from submit-queue (batch tested with PRs 42674, 43937) kube-proxy: OnServiceUpdate []*api.Service This signature is more consistent with OnEndpointsUpdate and removes a copy loop. This is part on ongoing cleanup to rate-limit iptables calls.
This commit is contained in:
		@@ -218,6 +218,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
				
			|||||||
	recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
 | 
						recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var proxier proxy.ProxyProvider
 | 
						var proxier proxy.ProxyProvider
 | 
				
			||||||
 | 
						var servicesHandler proxyconfig.ServiceConfigHandler
 | 
				
			||||||
	var endpointsHandler proxyconfig.EndpointsConfigHandler
 | 
						var endpointsHandler proxyconfig.EndpointsConfigHandler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
 | 
						proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
 | 
				
			||||||
@@ -244,22 +245,20 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
				
			|||||||
			glog.Fatalf("Unable to create proxier: %v", err)
 | 
								glog.Fatalf("Unable to create proxier: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		proxier = proxierIPTables
 | 
							proxier = proxierIPTables
 | 
				
			||||||
 | 
							servicesHandler = proxierIPTables
 | 
				
			||||||
		endpointsHandler = proxierIPTables
 | 
							endpointsHandler = proxierIPTables
 | 
				
			||||||
		// No turning back. Remove artifacts that might still exist from the userspace Proxier.
 | 
							// No turning back. Remove artifacts that might still exist from the userspace Proxier.
 | 
				
			||||||
		glog.V(0).Info("Tearing down userspace rules.")
 | 
							glog.V(0).Info("Tearing down userspace rules.")
 | 
				
			||||||
		userspace.CleanupLeftovers(iptInterface)
 | 
							userspace.CleanupLeftovers(iptInterface)
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		glog.V(0).Info("Using userspace Proxier.")
 | 
							glog.V(0).Info("Using userspace Proxier.")
 | 
				
			||||||
 | 
					 | 
				
			||||||
		var proxierUserspace proxy.ProxyProvider
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if runtime.GOOS == "windows" {
 | 
							if runtime.GOOS == "windows" {
 | 
				
			||||||
			// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
 | 
								// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
 | 
				
			||||||
			// our config.EndpointsConfigHandler.
 | 
								// our config.EndpointsConfigHandler.
 | 
				
			||||||
			loadBalancer := winuserspace.NewLoadBalancerRR()
 | 
								loadBalancer := winuserspace.NewLoadBalancerRR()
 | 
				
			||||||
			// set EndpointsConfigHandler to our loadBalancer
 | 
								// set EndpointsConfigHandler to our loadBalancer
 | 
				
			||||||
			endpointsHandler = loadBalancer
 | 
								endpointsHandler = loadBalancer
 | 
				
			||||||
			proxierUserspace, err = winuserspace.NewProxier(
 | 
								proxierUserspace, err := winuserspace.NewProxier(
 | 
				
			||||||
				loadBalancer,
 | 
									loadBalancer,
 | 
				
			||||||
				net.ParseIP(config.BindAddress),
 | 
									net.ParseIP(config.BindAddress),
 | 
				
			||||||
				netshInterface,
 | 
									netshInterface,
 | 
				
			||||||
@@ -268,13 +267,18 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
				
			|||||||
				config.IPTablesSyncPeriod.Duration,
 | 
									config.IPTablesSyncPeriod.Duration,
 | 
				
			||||||
				config.UDPIdleTimeout.Duration,
 | 
									config.UDPIdleTimeout.Duration,
 | 
				
			||||||
			)
 | 
								)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									glog.Fatalf("Unable to create proxier: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								servicesHandler = proxierUserspace
 | 
				
			||||||
 | 
								proxier = proxierUserspace
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
 | 
								// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
 | 
				
			||||||
			// our config.EndpointsConfigHandler.
 | 
								// our config.EndpointsConfigHandler.
 | 
				
			||||||
			loadBalancer := userspace.NewLoadBalancerRR()
 | 
								loadBalancer := userspace.NewLoadBalancerRR()
 | 
				
			||||||
			// set EndpointsConfigHandler to our loadBalancer
 | 
								// set EndpointsConfigHandler to our loadBalancer
 | 
				
			||||||
			endpointsHandler = loadBalancer
 | 
								endpointsHandler = loadBalancer
 | 
				
			||||||
			proxierUserspace, err = userspace.NewProxier(
 | 
								proxierUserspace, err := userspace.NewProxier(
 | 
				
			||||||
				loadBalancer,
 | 
									loadBalancer,
 | 
				
			||||||
				net.ParseIP(config.BindAddress),
 | 
									net.ParseIP(config.BindAddress),
 | 
				
			||||||
				iptInterface,
 | 
									iptInterface,
 | 
				
			||||||
@@ -284,11 +288,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
				
			|||||||
				config.IPTablesMinSyncPeriod.Duration,
 | 
									config.IPTablesMinSyncPeriod.Duration,
 | 
				
			||||||
				config.UDPIdleTimeout.Duration,
 | 
									config.UDPIdleTimeout.Duration,
 | 
				
			||||||
			)
 | 
								)
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				glog.Fatalf("Unable to create proxier: %v", err)
 | 
									glog.Fatalf("Unable to create proxier: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								servicesHandler = proxierUserspace
 | 
				
			||||||
			proxier = proxierUserspace
 | 
								proxier = proxierUserspace
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		// Remove artifacts from the pure-iptables Proxier, if not on Windows.
 | 
							// Remove artifacts from the pure-iptables Proxier, if not on Windows.
 | 
				
			||||||
		if runtime.GOOS != "windows" {
 | 
							if runtime.GOOS != "windows" {
 | 
				
			||||||
			glog.V(0).Info("Tearing down pure-iptables proxy rules.")
 | 
								glog.V(0).Info("Tearing down pure-iptables proxy rules.")
 | 
				
			||||||
@@ -306,7 +311,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
 | 
				
			|||||||
	// only notify on changes, and the initial update (on process start) may be lost if no handlers
 | 
						// only notify on changes, and the initial update (on process start) may be lost if no handlers
 | 
				
			||||||
	// are registered yet.
 | 
						// are registered yet.
 | 
				
			||||||
	serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
 | 
						serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
 | 
				
			||||||
	serviceConfig.RegisterHandler(proxier)
 | 
						serviceConfig.RegisterHandler(servicesHandler)
 | 
				
			||||||
	go serviceConfig.Run(wait.NeverStop)
 | 
						go serviceConfig.Run(wait.NeverStop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
 | 
						endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,12 +39,12 @@ type HollowProxy struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type FakeProxyHandler struct{}
 | 
					type FakeProxyHandler struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (*FakeProxyHandler) OnServiceUpdate(services []api.Service)       {}
 | 
					func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service)      {}
 | 
				
			||||||
func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {}
 | 
					func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type FakeProxier struct{}
 | 
					type FakeProxier struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (*FakeProxier) OnServiceUpdate(services []api.Service) {}
 | 
					func (*FakeProxier) OnServiceUpdate(services []*api.Service) {}
 | 
				
			||||||
func (*FakeProxier) Sync()                                   {}
 | 
					func (*FakeProxier) Sync()                                   {}
 | 
				
			||||||
func (*FakeProxier) SyncLoop() {
 | 
					func (*FakeProxier) SyncLoop() {
 | 
				
			||||||
	select {}
 | 
						select {}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,10 +14,7 @@ go_library(
 | 
				
			|||||||
        "types.go",
 | 
					        "types.go",
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
    tags = ["automanaged"],
 | 
					    tags = ["automanaged"],
 | 
				
			||||||
    deps = [
 | 
					    deps = ["//vendor:k8s.io/apimachinery/pkg/types"],
 | 
				
			||||||
        "//pkg/api:go_default_library",
 | 
					 | 
				
			||||||
        "//vendor:k8s.io/apimachinery/pkg/types",
 | 
					 | 
				
			||||||
    ],
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
filegroup(
 | 
					filegroup(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -74,27 +74,27 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
 | 
				
			|||||||
	go serviceConfig.Run(stopCh)
 | 
						go serviceConfig.Run(stopCh)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Add the first service
 | 
						// Add the first service
 | 
				
			||||||
	handler.expected = []api.Service{*service1v1}
 | 
						handler.expected = []*api.Service{service1v1}
 | 
				
			||||||
	fakeWatch.Add(service1v1)
 | 
						fakeWatch.Add(service1v1)
 | 
				
			||||||
	<-ch
 | 
						<-ch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Add another service
 | 
						// Add another service
 | 
				
			||||||
	handler.expected = []api.Service{*service1v1, *service2}
 | 
						handler.expected = []*api.Service{service1v1, service2}
 | 
				
			||||||
	fakeWatch.Add(service2)
 | 
						fakeWatch.Add(service2)
 | 
				
			||||||
	<-ch
 | 
						<-ch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Modify service1
 | 
						// Modify service1
 | 
				
			||||||
	handler.expected = []api.Service{*service1v2, *service2}
 | 
						handler.expected = []*api.Service{service1v2, service2}
 | 
				
			||||||
	fakeWatch.Modify(service1v2)
 | 
						fakeWatch.Modify(service1v2)
 | 
				
			||||||
	<-ch
 | 
						<-ch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Delete service1
 | 
						// Delete service1
 | 
				
			||||||
	handler.expected = []api.Service{*service2}
 | 
						handler.expected = []*api.Service{service2}
 | 
				
			||||||
	fakeWatch.Delete(service1v2)
 | 
						fakeWatch.Delete(service1v2)
 | 
				
			||||||
	<-ch
 | 
						<-ch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Delete service2
 | 
						// Delete service2
 | 
				
			||||||
	handler.expected = []api.Service{}
 | 
						handler.expected = []*api.Service{}
 | 
				
			||||||
	fakeWatch.Delete(service2)
 | 
						fakeWatch.Delete(service2)
 | 
				
			||||||
	<-ch
 | 
						<-ch
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -174,15 +174,15 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type svcHandler struct {
 | 
					type svcHandler struct {
 | 
				
			||||||
	t        *testing.T
 | 
						t        *testing.T
 | 
				
			||||||
	expected []api.Service
 | 
						expected []*api.Service
 | 
				
			||||||
	done     func()
 | 
						done     func()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newSvcHandler(t *testing.T, svcs []api.Service, done func()) *svcHandler {
 | 
					func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler {
 | 
				
			||||||
	return &svcHandler{t: t, expected: svcs, done: done}
 | 
						return &svcHandler{t: t, expected: svcs, done: done}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *svcHandler) OnServiceUpdate(services []api.Service) {
 | 
					func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
 | 
				
			||||||
	defer s.done()
 | 
						defer s.done()
 | 
				
			||||||
	sort.Sort(sortedServices(services))
 | 
						sort.Sort(sortedServices(services))
 | 
				
			||||||
	if !reflect.DeepEqual(s.expected, services) {
 | 
						if !reflect.DeepEqual(s.expected, services) {
 | 
				
			||||||
@@ -242,7 +242,7 @@ func TestInitialSync(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	svcConfig := newServiceConfig(svcLW, time.Minute)
 | 
						svcConfig := newServiceConfig(svcLW, time.Minute)
 | 
				
			||||||
	epsConfig := newEndpointsConfig(epsLW, time.Minute)
 | 
						epsConfig := newEndpointsConfig(epsLW, time.Minute)
 | 
				
			||||||
	svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done)
 | 
						svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
 | 
				
			||||||
	svcConfig.RegisterHandler(svcHandler)
 | 
						svcConfig.RegisterHandler(svcHandler)
 | 
				
			||||||
	epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
 | 
						epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
 | 
				
			||||||
	epsConfig.RegisterHandler(epsHandler)
 | 
						epsConfig.RegisterHandler(epsHandler)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -33,9 +33,17 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
 | 
					// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
 | 
				
			||||||
type ServiceConfigHandler interface {
 | 
					type ServiceConfigHandler interface {
 | 
				
			||||||
	// OnServiceUpdate gets called when a configuration has been changed by one of the sources.
 | 
						// OnServiceUpdate gets called when a service is created, removed or changed
 | 
				
			||||||
	// This is the union of all the configuration sources.
 | 
						// on any of the configuration sources. An example is when a new service
 | 
				
			||||||
	OnServiceUpdate(services []api.Service)
 | 
						// comes up.
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						// NOTE: For efficiency, services are being passed by reference, thus,
 | 
				
			||||||
 | 
						// OnServiceUpdate should NOT modify pointers of a given slice.
 | 
				
			||||||
 | 
						// Those service objects are shared with other layers of the system and
 | 
				
			||||||
 | 
						// are guaranteed to be immutable with the assumption that are also
 | 
				
			||||||
 | 
						// not mutated by those handlers. Make a deep copy if you need to modify
 | 
				
			||||||
 | 
						// them in your code.
 | 
				
			||||||
 | 
						OnServiceUpdate(services []*api.Service)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
 | 
					// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
 | 
				
			||||||
@@ -208,24 +216,23 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// We hanve synced informers. Now we can start delivering updates
 | 
						// We have synced informers. Now we can start delivering updates
 | 
				
			||||||
	// to the registered handler.
 | 
						// to the registered handler.
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		for range c.updates {
 | 
							for range c.updates {
 | 
				
			||||||
			services, err := c.lister.List(labels.Everything())
 | 
								services, err := c.lister.List(labels.Everything())
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				glog.Errorf("Error while listing services from cache: %v", err)
 | 
									glog.Errorf("Error while listing services from cache: %v", err)
 | 
				
			||||||
				// This will cause a retry (if there isnt' any other trigger in-flight).
 | 
									// This will cause a retry (if there isn't any other trigger in-flight).
 | 
				
			||||||
				c.dispatchUpdate()
 | 
									c.dispatchUpdate()
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			svcs := make([]api.Service, 0, len(services))
 | 
								if services == nil {
 | 
				
			||||||
			for i := range services {
 | 
									services = []*api.Service{}
 | 
				
			||||||
				svcs = append(svcs, *services[i])
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			for i := range c.handlers {
 | 
								for i := range c.handlers {
 | 
				
			||||||
				glog.V(3).Infof("Calling handler.OnServiceUpdate()")
 | 
									glog.V(3).Infof("Calling handler.OnServiceUpdate()")
 | 
				
			||||||
				c.handlers[i].OnServiceUpdate(svcs)
 | 
									c.handlers[i].OnServiceUpdate(services)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,7 +28,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type sortedServices []api.Service
 | 
					type sortedServices []*api.Service
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s sortedServices) Len() int {
 | 
					func (s sortedServices) Len() int {
 | 
				
			||||||
	return len(s)
 | 
						return len(s)
 | 
				
			||||||
@@ -41,24 +41,24 @@ func (s sortedServices) Less(i, j int) bool {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type ServiceHandlerMock struct {
 | 
					type ServiceHandlerMock struct {
 | 
				
			||||||
	updated chan []api.Service
 | 
						updated chan []*api.Service
 | 
				
			||||||
	waits   int
 | 
						waits   int
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewServiceHandlerMock() *ServiceHandlerMock {
 | 
					func NewServiceHandlerMock() *ServiceHandlerMock {
 | 
				
			||||||
	return &ServiceHandlerMock{updated: make(chan []api.Service, 5)}
 | 
						return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) {
 | 
					func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) {
 | 
				
			||||||
	sort.Sort(sortedServices(services))
 | 
						sort.Sort(sortedServices(services))
 | 
				
			||||||
	h.updated <- services
 | 
						h.updated <- services
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) {
 | 
					func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) {
 | 
				
			||||||
	// We might get 1 or more updates for N service updates, because we
 | 
						// We might get 1 or more updates for N service updates, because we
 | 
				
			||||||
	// over write older snapshots of services from the producer go-routine
 | 
						// over write older snapshots of services from the producer go-routine
 | 
				
			||||||
	// if the consumer falls behind.
 | 
						// if the consumer falls behind.
 | 
				
			||||||
	var services []api.Service
 | 
						var services []*api.Service
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case services = <-h.updated:
 | 
							case services = <-h.updated:
 | 
				
			||||||
@@ -139,7 +139,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
 | 
				
			|||||||
		Spec:       api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
							Spec:       api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeWatch.Add(service)
 | 
						fakeWatch.Add(service)
 | 
				
			||||||
	handler.ValidateServices(t, []api.Service{*service})
 | 
						handler.ValidateServices(t, []*api.Service{service})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
 | 
					func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
 | 
				
			||||||
@@ -161,18 +161,18 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
 | 
				
			|||||||
		Spec:       api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
							Spec:       api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeWatch.Add(service1)
 | 
						fakeWatch.Add(service1)
 | 
				
			||||||
	handler.ValidateServices(t, []api.Service{*service1})
 | 
						handler.ValidateServices(t, []*api.Service{service1})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	service2 := &api.Service{
 | 
						service2 := &api.Service{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
 | 
							ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
 | 
				
			||||||
		Spec:       api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
 | 
							Spec:       api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeWatch.Add(service2)
 | 
						fakeWatch.Add(service2)
 | 
				
			||||||
	services := []api.Service{*service2, *service1}
 | 
						services := []*api.Service{service2, service1}
 | 
				
			||||||
	handler.ValidateServices(t, services)
 | 
						handler.ValidateServices(t, services)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fakeWatch.Delete(service1)
 | 
						fakeWatch.Delete(service1)
 | 
				
			||||||
	services = []api.Service{*service2}
 | 
						services = []*api.Service{service2}
 | 
				
			||||||
	handler.ValidateServices(t, services)
 | 
						handler.ValidateServices(t, services)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -203,7 +203,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
 | 
				
			|||||||
	fakeWatch.Add(service1)
 | 
						fakeWatch.Add(service1)
 | 
				
			||||||
	fakeWatch.Add(service2)
 | 
						fakeWatch.Add(service2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	services := []api.Service{*service2, *service1}
 | 
						services := []*api.Service{service2, service1}
 | 
				
			||||||
	handler.ValidateServices(t, services)
 | 
						handler.ValidateServices(t, services)
 | 
				
			||||||
	handler2.ValidateServices(t, services)
 | 
						handler2.ValidateServices(t, services)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -457,13 +457,12 @@ type healthCheckPort struct {
 | 
				
			|||||||
// Accepts a list of Services and the existing service map.  Returns the new
 | 
					// Accepts a list of Services and the existing service map.  Returns the new
 | 
				
			||||||
// service map, a list of healthcheck ports to add to or remove from the health
 | 
					// service map, a list of healthcheck ports to add to or remove from the health
 | 
				
			||||||
// checking listener service, and a set of stale UDP services.
 | 
					// checking listener service, and a set of stale UDP services.
 | 
				
			||||||
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
 | 
					func buildServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
 | 
				
			||||||
	newServiceMap := make(proxyServiceMap)
 | 
						newServiceMap := make(proxyServiceMap)
 | 
				
			||||||
	healthCheckAdd := make([]healthCheckPort, 0)
 | 
						healthCheckAdd := make([]healthCheckPort, 0)
 | 
				
			||||||
	healthCheckDel := make([]healthCheckPort, 0)
 | 
						healthCheckDel := make([]healthCheckPort, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for i := range allServices {
 | 
						for _, service := range allServices {
 | 
				
			||||||
		service := &allServices[i]
 | 
					 | 
				
			||||||
		svcName := types.NamespacedName{
 | 
							svcName := types.NamespacedName{
 | 
				
			||||||
			Namespace: service.Namespace,
 | 
								Namespace: service.Namespace,
 | 
				
			||||||
			Name:      service.Name,
 | 
								Name:      service.Name,
 | 
				
			||||||
@@ -529,7 +528,7 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// OnServiceUpdate tracks the active set of service proxies.
 | 
					// OnServiceUpdate tracks the active set of service proxies.
 | 
				
			||||||
// They will be synchronized using syncProxyRules()
 | 
					// They will be synchronized using syncProxyRules()
 | 
				
			||||||
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
 | 
					func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
 | 
				
			||||||
	start := time.Now()
 | 
						start := time.Now()
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
		glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
 | 
							glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -858,8 +858,8 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Service {
 | 
					func makeTestService(namespace, name string, svcFunc func(*api.Service)) *api.Service {
 | 
				
			||||||
	svc := api.Service{
 | 
						svc := &api.Service{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
			Name:      name,
 | 
								Name:      name,
 | 
				
			||||||
			Namespace: namespace,
 | 
								Namespace: namespace,
 | 
				
			||||||
@@ -867,7 +867,7 @@ func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Ser
 | 
				
			|||||||
		Spec:   api.ServiceSpec{},
 | 
							Spec:   api.ServiceSpec{},
 | 
				
			||||||
		Status: api.ServiceStatus{},
 | 
							Status: api.ServiceStatus{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	svcFunc(&svc)
 | 
						svcFunc(svc)
 | 
				
			||||||
	return svc
 | 
						return svc
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -883,7 +883,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
					func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
				
			||||||
	services := []api.Service{
 | 
						services := []*api.Service{
 | 
				
			||||||
		makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
 | 
							makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
 | 
				
			||||||
			svc.Spec.Type = api.ServiceTypeClusterIP
 | 
								svc.Spec.Type = api.ServiceTypeClusterIP
 | 
				
			||||||
			svc.Spec.ClusterIP = "172.16.55.4"
 | 
								svc.Spec.ClusterIP = "172.16.55.4"
 | 
				
			||||||
@@ -959,7 +959,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Remove some stuff
 | 
						// Remove some stuff
 | 
				
			||||||
	services = []api.Service{services[0]}
 | 
						services = []*api.Service{services[0]}
 | 
				
			||||||
	services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
 | 
						services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
 | 
				
			||||||
	serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap)
 | 
						serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap)
 | 
				
			||||||
	if len(serviceMap) != 1 {
 | 
						if len(serviceMap) != 1 {
 | 
				
			||||||
@@ -999,7 +999,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
 | 
					func TestBuildServiceMapServiceHeadless(t *testing.T) {
 | 
				
			||||||
	services := []api.Service{
 | 
						services := []*api.Service{
 | 
				
			||||||
		makeTestService("somewhere-else", "headless", func(svc *api.Service) {
 | 
							makeTestService("somewhere-else", "headless", func(svc *api.Service) {
 | 
				
			||||||
			svc.Spec.Type = api.ServiceTypeClusterIP
 | 
								svc.Spec.Type = api.ServiceTypeClusterIP
 | 
				
			||||||
			svc.Spec.ClusterIP = api.ClusterIPNone
 | 
								svc.Spec.ClusterIP = api.ClusterIPNone
 | 
				
			||||||
@@ -1027,7 +1027,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
 | 
					func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
 | 
				
			||||||
	services := []api.Service{
 | 
						services := []*api.Service{
 | 
				
			||||||
		makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
 | 
							makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
 | 
				
			||||||
			svc.Spec.Type = api.ServiceTypeExternalName
 | 
								svc.Spec.Type = api.ServiceTypeExternalName
 | 
				
			||||||
			svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
 | 
								svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
 | 
				
			||||||
@@ -1053,7 +1053,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
					func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
				
			||||||
	first := []api.Service{
 | 
						first := []*api.Service{
 | 
				
			||||||
		makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
							makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
				
			||||||
			svc.Spec.Type = api.ServiceTypeClusterIP
 | 
								svc.Spec.Type = api.ServiceTypeClusterIP
 | 
				
			||||||
			svc.Spec.ClusterIP = "172.16.55.4"
 | 
								svc.Spec.ClusterIP = "172.16.55.4"
 | 
				
			||||||
@@ -1062,7 +1062,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
 | 
				
			|||||||
		}),
 | 
							}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	second := []api.Service{
 | 
						second := []*api.Service{
 | 
				
			||||||
		makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
							makeTestService("somewhere", "some-service", func(svc *api.Service) {
 | 
				
			||||||
			svc.ObjectMeta.Annotations = map[string]string{
 | 
								svc.ObjectMeta.Annotations = map[string]string{
 | 
				
			||||||
				service.BetaAnnotationExternalTraffic:     service.AnnotationValueExternalTrafficLocal,
 | 
									service.BetaAnnotationExternalTraffic:     service.AnnotationValueExternalTrafficLocal,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,15 +20,10 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/types"
 | 
						"k8s.io/apimachinery/pkg/types"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ProxyProvider is the interface provided by proxier implementations.
 | 
					// ProxyProvider is the interface provided by proxier implementations.
 | 
				
			||||||
type ProxyProvider interface {
 | 
					type ProxyProvider interface {
 | 
				
			||||||
	// OnServiceUpdate manages the active set of service proxies.
 | 
					 | 
				
			||||||
	// Active service proxies are reinitialized if found in the update set or
 | 
					 | 
				
			||||||
	// removed if missing from the update set.
 | 
					 | 
				
			||||||
	OnServiceUpdate(services []api.Service)
 | 
					 | 
				
			||||||
	// Sync immediately synchronizes the ProxyProvider's current state to iptables.
 | 
						// Sync immediately synchronizes the ProxyProvider's current state to iptables.
 | 
				
			||||||
	Sync()
 | 
						Sync()
 | 
				
			||||||
	// SyncLoop runs periodic work.
 | 
						// SyncLoop runs periodic work.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -400,12 +400,10 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR
 | 
				
			|||||||
// OnServiceUpdate manages the active set of service proxies.
 | 
					// OnServiceUpdate manages the active set of service proxies.
 | 
				
			||||||
// Active service proxies are reinitialized if found in the update set or
 | 
					// Active service proxies are reinitialized if found in the update set or
 | 
				
			||||||
// shutdown if missing from the update set.
 | 
					// shutdown if missing from the update set.
 | 
				
			||||||
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
 | 
					func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
 | 
				
			||||||
	glog.V(4).Infof("Received update notice: %+v", services)
 | 
						glog.V(4).Infof("Received update notice: %+v", services)
 | 
				
			||||||
	activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
 | 
						activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
 | 
				
			||||||
	for i := range services {
 | 
						for _, service := range services {
 | 
				
			||||||
		service := &services[i]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// if ClusterIP is "None" or empty, skip proxying
 | 
							// if ClusterIP is "None" or empty, skip proxying
 | 
				
			||||||
		if !api.IsServiceIPSet(service) {
 | 
							if !api.IsServiceIPSet(service) {
 | 
				
			||||||
			glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
 | 
								glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -350,7 +350,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 0)
 | 
						waitForNumProxyLoops(t, p, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -515,7 +515,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
 | 
						if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -555,7 +555,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
 | 
						if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -594,7 +594,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
 | 
						if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -602,7 +602,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// need to add endpoint here because it got clean up during service delete
 | 
						// need to add endpoint here because it got clean up during service delete
 | 
				
			||||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
						lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -650,7 +650,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
 | 
						if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -658,7 +658,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// need to add endpoint here because it got clean up during service delete
 | 
						// need to add endpoint here because it got clean up during service delete
 | 
				
			||||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
						lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -703,7 +703,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
 | 
				
			|||||||
	testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
 | 
						testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -753,7 +753,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -802,7 +802,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
 | 
				
			|||||||
	testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
 | 
						testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{
 | 
							Spec: api.ServiceSpec{
 | 
				
			||||||
			Ports: []api.ServicePort{{
 | 
								Ports: []api.ServicePort{{
 | 
				
			||||||
@@ -856,7 +856,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
				
			|||||||
	testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
 | 
						testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -869,7 +869,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("service with empty ClusterIP should not be included in the proxy")
 | 
							t.Fatalf("service with empty ClusterIP should not be included in the proxy")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -882,7 +882,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
 | 
							t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -317,12 +317,10 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[
 | 
				
			|||||||
// OnServiceUpdate manages the active set of service proxies.
 | 
					// OnServiceUpdate manages the active set of service proxies.
 | 
				
			||||||
// Active service proxies are reinitialized if found in the update set or
 | 
					// Active service proxies are reinitialized if found in the update set or
 | 
				
			||||||
// shutdown if missing from the update set.
 | 
					// shutdown if missing from the update set.
 | 
				
			||||||
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
 | 
					func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
 | 
				
			||||||
	glog.V(4).Infof("Received update notice: %+v", services)
 | 
						glog.V(4).Infof("Received update notice: %+v", services)
 | 
				
			||||||
	activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set
 | 
						activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set
 | 
				
			||||||
	for i := range services {
 | 
						for _, service := range services {
 | 
				
			||||||
		service := &services[i]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// if ClusterIP is "None" or empty, skip proxying
 | 
							// if ClusterIP is "None" or empty, skip proxying
 | 
				
			||||||
		if !api.IsServiceIPSet(service) {
 | 
							if !api.IsServiceIPSet(service) {
 | 
				
			||||||
			glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
 | 
								glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -359,7 +359,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 0)
 | 
						waitForNumProxyLoops(t, p, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -526,7 +526,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
						if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -565,7 +565,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
						if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -603,7 +603,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
						if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -611,7 +611,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// need to add endpoint here because it got clean up during service delete
 | 
						// need to add endpoint here because it got clean up during service delete
 | 
				
			||||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
						lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -658,7 +658,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
	conn.Close()
 | 
						conn.Close()
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{})
 | 
						p.OnServiceUpdate([]*api.Service{})
 | 
				
			||||||
	if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
						if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -666,7 +666,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// need to add endpoint here because it got clean up during service delete
 | 
						// need to add endpoint here because it got clean up during service delete
 | 
				
			||||||
	lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
						lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -710,7 +710,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
 | 
				
			|||||||
	testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
 | 
						testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -759,7 +759,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -807,7 +807,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
 | 
				
			|||||||
	testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
 | 
						testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{
 | 
							Spec: api.ServiceSpec{
 | 
				
			||||||
			Ports: []api.ServicePort{{
 | 
								Ports: []api.ServicePort{{
 | 
				
			||||||
@@ -860,7 +860,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
				
			|||||||
	testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
 | 
						testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
 | 
				
			||||||
	waitForNumProxyLoops(t, p, 1)
 | 
						waitForNumProxyLoops(t, p, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -873,7 +873,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("service with empty ClusterIP should not be included in the proxy")
 | 
							t.Fatalf("service with empty ClusterIP should not be included in the proxy")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
@@ -886,7 +886,7 @@ func TestProxyUpdatePortal(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
 | 
							t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	p.OnServiceUpdate([]api.Service{{
 | 
						p.OnServiceUpdate([]*api.Service{{
 | 
				
			||||||
		ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
							ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
 | 
				
			||||||
		Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
							Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
 | 
				
			||||||
			Name:     "p",
 | 
								Name:     "p",
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user