From 9052eddaf602fbbcfaaab226e290d593b5aaa225 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 7 May 2016 21:23:50 -0700 Subject: [PATCH 1/2] Don't delete affinity when endpoints are empty This only affects the userspace kube-proxy. --- pkg/proxy/userspace/loadbalancer.go | 1 + pkg/proxy/userspace/proxier.go | 1 + pkg/proxy/userspace/roundrobin.go | 14 +++++++++++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index f32f05fc8a1..711c9f1d5a5 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -29,5 +29,6 @@ type LoadBalancer interface { // service-port and source address. NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr) (string, error) NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error + DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) } diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 0ee5185943c..fb5d00e1eb6 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -447,6 +447,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { if err != nil { glog.Errorf("Failed to stop service %q: %v", name, err) } + proxier.loadBalancer.DeleteService(name) } } } diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index 86a93e25618..fa4735c5a7c 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -82,6 +82,7 @@ func NewLoadBalancerRR() *LoadBalancerRR { } func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { + glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) lb.lock.Lock() defer lb.lock.Unlock() lb.newServiceInternal(svcPort, affinityType, ttlMinutes) @@ -103,6 +104,13 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi return lb.services[svcPort] } +func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) { + glog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort) + lb.lock.Lock() + defer lb.lock.Unlock() + delete(lb.services, svcPort) +} + // return true if this service is using some form of session affinity. func isSessionAffinity(affinity *affinityPolicy) bool { // Should never be empty string, but checking for it to be safe. @@ -279,7 +287,11 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) { for k := range lb.services { if _, exists := registeredEndpoints[k]; !exists { glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) - delete(lb.services, k) + // Reset but don't delete. + state := lb.services[k] + state.endpoints = []string{} + state.index = 0 + state.affinity.affinityMap = map[string]*affinityState{} } } } From e1df5c8b30aef55ecbcacc4409b23f096525ae24 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Thu, 7 Jul 2016 17:43:22 -0700 Subject: [PATCH 2/2] fix proxy unit tests --- pkg/proxy/userspace/proxier_test.go | 56 +++++++++++++++-------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 17e230727d5..294fc9b86e0 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -536,15 +536,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ - { - ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, - }) + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + } + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { @@ -569,6 +568,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) + // need to add endpoint here because it got clean up during service delete + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ @@ -588,15 +589,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ - { - ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, - }) + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + } + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { @@ -621,6 +621,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) + // need to add endpoint here because it got clean up during service delete + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ @@ -785,15 +787,14 @@ func TestProxyUpdatePublicIPs(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ - { - ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, - }) + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + } + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { @@ -842,6 +843,7 @@ func TestProxyUpdatePortal(t *testing.T) { Protocol: "TCP", }}}, }}) + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) svcInfo, exists = p.getServiceInfo(service) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy")