mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1943 lines
		
	
	
		
			60 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1943 lines
		
	
	
		
			60 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package endpoint
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"net/http"
 | 
						|
	"net/http/httptest"
 | 
						|
	"strconv"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						|
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	clientset "k8s.io/client-go/kubernetes"
 | 
						|
	restclient "k8s.io/client-go/rest"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	utiltesting "k8s.io/client-go/util/testing"
 | 
						|
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
						|
	"k8s.io/kubernetes/pkg/api/testapi"
 | 
						|
	endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
 | 
						|
	api "k8s.io/kubernetes/pkg/apis/core"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
	endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
 | 
						|
	"k8s.io/kubernetes/pkg/features"
 | 
						|
)
 | 
						|
 | 
						|
var alwaysReady = func() bool { return true }
 | 
						|
var neverReady = func() bool { return false }
 | 
						|
var emptyNodeName string
 | 
						|
var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
 | 
						|
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
 | 
						|
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
 | 
						|
 | 
						|
func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod {
 | 
						|
	p := &v1.Pod{
 | 
						|
		TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Namespace: namespace,
 | 
						|
			Name:      fmt.Sprintf("pod%d", id),
 | 
						|
			Labels:    map[string]string{"foo": "bar"},
 | 
						|
		},
 | 
						|
		Spec: v1.PodSpec{
 | 
						|
			Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
 | 
						|
		},
 | 
						|
		Status: v1.PodStatus{
 | 
						|
			PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
 | 
						|
			Conditions: []v1.PodCondition{
 | 
						|
				{
 | 
						|
					Type:   v1.PodReady,
 | 
						|
					Status: v1.ConditionTrue,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	if !isReady {
 | 
						|
		p.Status.Conditions[0].Status = v1.ConditionFalse
 | 
						|
	}
 | 
						|
	for j := 0; j < nPorts; j++ {
 | 
						|
		p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
 | 
						|
			v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
 | 
						|
	}
 | 
						|
	if makeDualstack {
 | 
						|
		p.Status.PodIPs = []v1.PodIP{
 | 
						|
			{
 | 
						|
				IP: p.Status.PodIP,
 | 
						|
			},
 | 
						|
			{
 | 
						|
				IP: fmt.Sprintf("2000::%d", id),
 | 
						|
			},
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return p
 | 
						|
}
 | 
						|
 | 
						|
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) {
 | 
						|
	for i := 0; i < nPods+nNotReady; i++ {
 | 
						|
		isReady := i < nPods
 | 
						|
		pod := testPod(namespace, i, nPorts, isReady, makeDualstack)
 | 
						|
		store.Add(pod)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
 | 
						|
	for i := 0; i < nPods; i++ {
 | 
						|
		p := &v1.Pod{
 | 
						|
			TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
 | 
						|
			ObjectMeta: metav1.ObjectMeta{
 | 
						|
				Namespace: namespace,
 | 
						|
				Name:      fmt.Sprintf("pod%d", i),
 | 
						|
				Labels:    map[string]string{"foo": "bar"},
 | 
						|
			},
 | 
						|
			Spec: v1.PodSpec{
 | 
						|
				RestartPolicy: restartPolicy,
 | 
						|
				Containers:    []v1.Container{{Ports: []v1.ContainerPort{}}},
 | 
						|
			},
 | 
						|
			Status: v1.PodStatus{
 | 
						|
				PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
 | 
						|
				Phase: podPhase,
 | 
						|
				Conditions: []v1.PodCondition{
 | 
						|
					{
 | 
						|
						Type:   v1.PodReady,
 | 
						|
						Status: v1.ConditionFalse,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}
 | 
						|
		for j := 0; j < nPorts; j++ {
 | 
						|
			p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
 | 
						|
				v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
 | 
						|
		}
 | 
						|
		store.Add(p)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
 | 
						|
	fakeEndpointsHandler := utiltesting.FakeHandler{
 | 
						|
		StatusCode:   http.StatusOK,
 | 
						|
		ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}),
 | 
						|
	}
 | 
						|
	mux := http.NewServeMux()
 | 
						|
	mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
 | 
						|
	mux.Handle(testapi.Default.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
 | 
						|
	mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
 | 
						|
		t.Errorf("unexpected request: %v", req.RequestURI)
 | 
						|
		http.Error(res, "", http.StatusNotFound)
 | 
						|
	})
 | 
						|
	return httptest.NewServer(mux), &fakeEndpointsHandler
 | 
						|
}
 | 
						|
 | 
						|
type endpointController struct {
 | 
						|
	*EndpointController
 | 
						|
	podStore       cache.Store
 | 
						|
	serviceStore   cache.Store
 | 
						|
	endpointsStore cache.Store
 | 
						|
}
 | 
						|
 | 
						|
func newController(url string, batchPeriod time.Duration) *endpointController {
 | 
						|
	client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
 | 
						|
	endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
 | 
						|
		informerFactory.Core().V1().Endpoints(), client, batchPeriod)
 | 
						|
	endpoints.podsSynced = alwaysReady
 | 
						|
	endpoints.servicesSynced = alwaysReady
 | 
						|
	endpoints.endpointsSynced = alwaysReady
 | 
						|
	return &endpointController{
 | 
						|
		endpoints,
 | 
						|
		informerFactory.Core().V1().Pods().Informer().GetStore(),
 | 
						|
		informerFactory.Core().V1().Services().Informer().GetStore(),
 | 
						|
		informerFactory.Core().V1().Endpoints().Informer().GetStore(),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 0)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: nil,
 | 
						|
	})
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 0)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 0)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsNewNoSubsets(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
}
 | 
						|
 | 
						|
func TestCheckLeftoverEndpoints(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, _ := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpoints.checkLeftoverEndpoints()
 | 
						|
	if e, a := 1, endpoints.queue.Len(); e != a {
 | 
						|
		t.Fatalf("Expected %v, got %v", e, a)
 | 
						|
	}
 | 
						|
	got, _ := endpoints.queue.Get()
 | 
						|
	if e, a := ns+"/foo", got; e != a {
 | 
						|
		t.Errorf("Expected %v, got %v", e, a)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsProtocolTCP(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsProtocolUDP(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "UDP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsProtocolSCTP(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "SCTP"}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "SCTP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 0, 1, 1, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:             []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 1, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses:         []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
 | 
						|
			Ports:             []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
 | 
						|
	ns := "bar"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 0)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItems(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	addPods(endpoints.podStore, ns, 3, 2, 0, false)
 | 
						|
	addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found!
 | 
						|
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports: []v1.ServicePort{
 | 
						|
				{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
 | 
						|
				{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService("other/foo")
 | 
						|
 | 
						|
	expectedSubsets := []v1.EndpointSubset{{
 | 
						|
		Addresses: []v1.EndpointAddress{
 | 
						|
			{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
 | 
						|
			{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
 | 
						|
			{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
 | 
						|
		},
 | 
						|
		Ports: []v1.EndpointPort{
 | 
						|
			{Name: "port0", Port: 8080, Protocol: "TCP"},
 | 
						|
			{Name: "port1", Port: 8088, Protocol: "TCP"},
 | 
						|
		},
 | 
						|
	}}
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			ResourceVersion: "",
 | 
						|
			Name:            "foo",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: endptspkg.SortSubsets(expectedSubsets),
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsWithLabels(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	addPods(endpoints.podStore, ns, 3, 2, 0, false)
 | 
						|
	serviceLabels := map[string]string{"foo": "bar"}
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:      "foo",
 | 
						|
			Namespace: ns,
 | 
						|
			Labels:    serviceLabels,
 | 
						|
		},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports: []v1.ServicePort{
 | 
						|
				{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
 | 
						|
				{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	expectedSubsets := []v1.EndpointSubset{{
 | 
						|
		Addresses: []v1.EndpointAddress{
 | 
						|
			{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
 | 
						|
			{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
 | 
						|
			{IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
 | 
						|
		},
 | 
						|
		Ports: []v1.EndpointPort{
 | 
						|
			{Name: "port0", Port: 8080, Protocol: "TCP"},
 | 
						|
			{Name: "port1", Port: 8088, Protocol: "TCP"},
 | 
						|
		},
 | 
						|
	}}
 | 
						|
 | 
						|
	serviceLabels[v1.IsHeadlessService] = ""
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			ResourceVersion: "",
 | 
						|
			Name:            "foo",
 | 
						|
			Labels:          serviceLabels,
 | 
						|
		},
 | 
						|
		Subsets: endptspkg.SortSubsets(expectedSubsets),
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
 | 
						|
	ns := "bar"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				"foo": "bar",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	serviceLabels := map[string]string{"baz": "blah"}
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:      "foo",
 | 
						|
			Namespace: ns,
 | 
						|
			Labels:    serviceLabels,
 | 
						|
		},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	serviceLabels[v1.IsHeadlessService] = ""
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels:          serviceLabels,
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
 | 
						|
	var tests = []struct {
 | 
						|
		podsSynced            func() bool
 | 
						|
		servicesSynced        func() bool
 | 
						|
		endpointsSynced       func() bool
 | 
						|
		shouldUpdateEndpoints bool
 | 
						|
	}{
 | 
						|
		{neverReady, alwaysReady, alwaysReady, false},
 | 
						|
		{alwaysReady, neverReady, alwaysReady, false},
 | 
						|
		{alwaysReady, alwaysReady, neverReady, false},
 | 
						|
		{alwaysReady, alwaysReady, alwaysReady, true},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, test := range tests {
 | 
						|
		func() {
 | 
						|
			ns := "other"
 | 
						|
			testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
			defer testServer.Close()
 | 
						|
			endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
			addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
 | 
						|
			service := &v1.Service{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					Selector: map[string]string{},
 | 
						|
					Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
 | 
						|
				},
 | 
						|
			}
 | 
						|
			endpoints.serviceStore.Add(service)
 | 
						|
			endpoints.enqueueService(service)
 | 
						|
			endpoints.podsSynced = test.podsSynced
 | 
						|
			endpoints.servicesSynced = test.servicesSynced
 | 
						|
			endpoints.endpointsSynced = test.endpointsSynced
 | 
						|
			endpoints.workerLoopPeriod = 10 * time.Millisecond
 | 
						|
			stopCh := make(chan struct{})
 | 
						|
			defer close(stopCh)
 | 
						|
			go endpoints.Run(1, stopCh)
 | 
						|
 | 
						|
			// cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
 | 
						|
			// To ensure we get all updates, including unexpected ones, we need to wait at least as long as
 | 
						|
			// a single cache sync period and worker period, with some fudge room.
 | 
						|
			time.Sleep(150 * time.Millisecond)
 | 
						|
			if test.shouldUpdateEndpoints {
 | 
						|
				// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
 | 
						|
				wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
 | 
						|
					return endpoints.queue.Len() == 0, nil
 | 
						|
				})
 | 
						|
				endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
			} else {
 | 
						|
				endpointsHandler.ValidateRequestCount(t, 0)
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsHeadlessService(t *testing.T) {
 | 
						|
	ns := "headless"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector:  map[string]string{},
 | 
						|
			ClusterIP: api.ClusterIPNone,
 | 
						|
			Ports:     []v1.ServicePort{},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				"foo": "bar",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				"foo": "bar",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				"foo": "bar",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{"foo": "bar"},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector:  map[string]string{"foo": "bar"},
 | 
						|
			ClusterIP: "None",
 | 
						|
			Ports:     nil,
 | 
						|
		},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: "foo",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     nil,
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
 | 
						|
}
 | 
						|
 | 
						|
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
 | 
						|
// Just list all of the 3 false cases and 3 of the 12 true cases.
 | 
						|
func TestShouldPodBeInEndpoints(t *testing.T) {
 | 
						|
	testCases := []struct {
 | 
						|
		name     string
 | 
						|
		pod      *v1.Pod
 | 
						|
		expected bool
 | 
						|
	}{
 | 
						|
		// Pod should not be in endpoints cases:
 | 
						|
		{
 | 
						|
			name: "Failed pod with Never RestartPolicy",
 | 
						|
			pod: &v1.Pod{
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					RestartPolicy: v1.RestartPolicyNever,
 | 
						|
				},
 | 
						|
				Status: v1.PodStatus{
 | 
						|
					Phase: v1.PodFailed,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expected: false,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "Succeeded pod with Never RestartPolicy",
 | 
						|
			pod: &v1.Pod{
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					RestartPolicy: v1.RestartPolicyNever,
 | 
						|
				},
 | 
						|
				Status: v1.PodStatus{
 | 
						|
					Phase: v1.PodSucceeded,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expected: false,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "Succeeded pod with OnFailure RestartPolicy",
 | 
						|
			pod: &v1.Pod{
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					RestartPolicy: v1.RestartPolicyOnFailure,
 | 
						|
				},
 | 
						|
				Status: v1.PodStatus{
 | 
						|
					Phase: v1.PodSucceeded,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expected: false,
 | 
						|
		},
 | 
						|
		// Pod should be in endpoints cases:
 | 
						|
		{
 | 
						|
			name: "Failed pod with Always RestartPolicy",
 | 
						|
			pod: &v1.Pod{
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					RestartPolicy: v1.RestartPolicyAlways,
 | 
						|
				},
 | 
						|
				Status: v1.PodStatus{
 | 
						|
					Phase: v1.PodFailed,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expected: true,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "Pending pod with Never RestartPolicy",
 | 
						|
			pod: &v1.Pod{
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					RestartPolicy: v1.RestartPolicyNever,
 | 
						|
				},
 | 
						|
				Status: v1.PodStatus{
 | 
						|
					Phase: v1.PodPending,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expected: true,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "Unknown pod with OnFailure RestartPolicy",
 | 
						|
			pod: &v1.Pod{
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					RestartPolicy: v1.RestartPolicyOnFailure,
 | 
						|
				},
 | 
						|
				Status: v1.PodStatus{
 | 
						|
					Phase: v1.PodUnknown,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			expected: true,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	for _, test := range testCases {
 | 
						|
		result := shouldPodBeInEndpoints(test.pod)
 | 
						|
		if result != test.expected {
 | 
						|
			t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
func TestPodToEndpointAddressForService(t *testing.T) {
 | 
						|
	testCases := []struct {
 | 
						|
		name               string
 | 
						|
		expectedEndPointIP string
 | 
						|
		enableDualStack    bool
 | 
						|
		expectError        bool
 | 
						|
		enableDualStackPod bool
 | 
						|
 | 
						|
		service v1.Service
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:               "v4 service, in a single stack cluster",
 | 
						|
			expectedEndPointIP: "1.2.3.4",
 | 
						|
 | 
						|
			enableDualStack:    false,
 | 
						|
			expectError:        false,
 | 
						|
			enableDualStackPod: false,
 | 
						|
 | 
						|
			service: v1.Service{
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					ClusterIP: "10.0.0.1",
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "v4 service, in a dual stack cluster",
 | 
						|
 | 
						|
			expectedEndPointIP: "1.2.3.4",
 | 
						|
			enableDualStack:    true,
 | 
						|
			expectError:        false,
 | 
						|
			enableDualStackPod: true,
 | 
						|
 | 
						|
			service: v1.Service{
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					ClusterIP: "10.0.0.1",
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:               "v6 service, in a dual stack cluster. dual stack enabled",
 | 
						|
			expectedEndPointIP: "2000::0",
 | 
						|
 | 
						|
			enableDualStack:    true,
 | 
						|
			expectError:        false,
 | 
						|
			enableDualStackPod: true,
 | 
						|
 | 
						|
			service: v1.Service{
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					ClusterIP: "3000::1",
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
 | 
						|
		// in reality this is a misconfigured cluster
 | 
						|
		// i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6
 | 
						|
		// we are testing that we will keep producing the expected behavior
 | 
						|
		{
 | 
						|
			name:               "v6 service, in a v4 only cluster. dual stack disabled",
 | 
						|
			expectedEndPointIP: "1.2.3.4",
 | 
						|
 | 
						|
			enableDualStack:    false,
 | 
						|
			expectError:        false,
 | 
						|
			enableDualStackPod: false,
 | 
						|
 | 
						|
			service: v1.Service{
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					ClusterIP: "3000::1",
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:               "v6 service, in a v4 only cluster - dual stack enabled",
 | 
						|
			expectedEndPointIP: "1.2.3.4",
 | 
						|
 | 
						|
			enableDualStack:    true,
 | 
						|
			expectError:        true,
 | 
						|
			enableDualStackPod: false,
 | 
						|
 | 
						|
			service: v1.Service{
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					ClusterIP: "3000::1",
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	for _, tc := range testCases {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
 | 
						|
			podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
 | 
						|
			ns := "test"
 | 
						|
			addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod)
 | 
						|
			pods := podStore.List()
 | 
						|
			if len(pods) != 1 {
 | 
						|
				t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods))
 | 
						|
			}
 | 
						|
			pod := pods[0].(*v1.Pod)
 | 
						|
			epa, err := podToEndpointAddressForService(&tc.service, pod)
 | 
						|
 | 
						|
			if err != nil && !tc.expectError {
 | 
						|
				t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if err == nil && tc.expectError {
 | 
						|
				t.Fatalf("podToEndpointAddressForService should have returned error but it did not")
 | 
						|
			}
 | 
						|
 | 
						|
			if err != nil && tc.expectError {
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			if epa.IP != tc.expectedEndPointIP {
 | 
						|
				t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
 | 
						|
			}
 | 
						|
			if *(epa.NodeName) != pod.Spec.NodeName {
 | 
						|
				t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
 | 
						|
			}
 | 
						|
			if epa.TargetRef.Kind != "Pod" {
 | 
						|
				t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
 | 
						|
			}
 | 
						|
			if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
 | 
						|
				t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
 | 
						|
			}
 | 
						|
			if epa.TargetRef.Name != pod.ObjectMeta.Name {
 | 
						|
				t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
 | 
						|
			}
 | 
						|
			if epa.TargetRef.UID != pod.ObjectMeta.UID {
 | 
						|
				t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
 | 
						|
			}
 | 
						|
			if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
 | 
						|
				t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func TestPodToEndpointAddress(t *testing.T) {
 | 
						|
	podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
 | 
						|
	ns := "test"
 | 
						|
	addPods(podStore, ns, 1, 1, 0, false)
 | 
						|
	pods := podStore.List()
 | 
						|
	if len(pods) != 1 {
 | 
						|
		t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	pod := pods[0].(*v1.Pod)
 | 
						|
	epa := podToEndpointAddress(pod)
 | 
						|
	if epa.IP != pod.Status.PodIP {
 | 
						|
		t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
 | 
						|
	}
 | 
						|
	if *(epa.NodeName) != pod.Spec.NodeName {
 | 
						|
		t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
 | 
						|
	}
 | 
						|
	if epa.TargetRef.Kind != "Pod" {
 | 
						|
		t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
 | 
						|
	}
 | 
						|
	if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
 | 
						|
		t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
 | 
						|
	}
 | 
						|
	if epa.TargetRef.Name != pod.ObjectMeta.Name {
 | 
						|
		t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
 | 
						|
	}
 | 
						|
	if epa.TargetRef.UID != pod.ObjectMeta.UID {
 | 
						|
		t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
 | 
						|
	}
 | 
						|
	if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
 | 
						|
		t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestPodChanged(t *testing.T) {
 | 
						|
	podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
 | 
						|
	ns := "test"
 | 
						|
	addPods(podStore, ns, 1, 1, 0, false)
 | 
						|
	pods := podStore.List()
 | 
						|
	if len(pods) != 1 {
 | 
						|
		t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	oldPod := pods[0].(*v1.Pod)
 | 
						|
	newPod := oldPod.DeepCopy()
 | 
						|
 | 
						|
	if podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be unchanged for copied pod")
 | 
						|
	}
 | 
						|
 | 
						|
	newPod.Spec.NodeName = "changed"
 | 
						|
	if !podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be changed for pod with NodeName changed")
 | 
						|
	}
 | 
						|
	newPod.Spec.NodeName = oldPod.Spec.NodeName
 | 
						|
 | 
						|
	newPod.ObjectMeta.ResourceVersion = "changed"
 | 
						|
	if podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
 | 
						|
	}
 | 
						|
	newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
 | 
						|
 | 
						|
	newPod.Status.PodIP = "1.2.3.1"
 | 
						|
	if !podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be changed with pod IP address change")
 | 
						|
	}
 | 
						|
	newPod.Status.PodIP = oldPod.Status.PodIP
 | 
						|
 | 
						|
	/* dual stack tests */
 | 
						|
	// primary changes, because changing IPs is done by changing sandbox
 | 
						|
	// case 1: add new secondrary IP
 | 
						|
	newPod.Status.PodIP = "1.1.3.1"
 | 
						|
	newPod.Status.PodIPs = []v1.PodIP{
 | 
						|
		{
 | 
						|
			IP: "1.1.3.1",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			IP: "2000::1",
 | 
						|
		},
 | 
						|
	}
 | 
						|
	if !podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be changed with adding secondary IP")
 | 
						|
	}
 | 
						|
	// reset
 | 
						|
	newPod.Status.PodIPs = nil
 | 
						|
	newPod.Status.PodIP = oldPod.Status.PodIP
 | 
						|
 | 
						|
	// case 2: removing a secondary IP
 | 
						|
	saved := oldPod.Status.PodIP
 | 
						|
	oldPod.Status.PodIP = "1.1.3.1"
 | 
						|
	oldPod.Status.PodIPs = []v1.PodIP{
 | 
						|
		{
 | 
						|
			IP: "1.1.3.1",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			IP: "2000::1",
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	newPod.Status.PodIP = "1.2.3.4"
 | 
						|
	newPod.Status.PodIPs = []v1.PodIP{
 | 
						|
		{
 | 
						|
			IP: "1.2.3.4",
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	// reset
 | 
						|
	oldPod.Status.PodIPs = nil
 | 
						|
	newPod.Status.PodIPs = nil
 | 
						|
	oldPod.Status.PodIP = saved
 | 
						|
	newPod.Status.PodIP = saved
 | 
						|
	// case 3: change secondary
 | 
						|
	// case 2: removing a secondary IP
 | 
						|
	saved = oldPod.Status.PodIP
 | 
						|
	oldPod.Status.PodIP = "1.1.3.1"
 | 
						|
	oldPod.Status.PodIPs = []v1.PodIP{
 | 
						|
		{
 | 
						|
			IP: "1.1.3.1",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			IP: "2000::1",
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	newPod.Status.PodIP = "1.2.3.4"
 | 
						|
	newPod.Status.PodIPs = []v1.PodIP{
 | 
						|
		{
 | 
						|
			IP: "1.2.3.4",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			IP: "2000::2",
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	// reset
 | 
						|
	oldPod.Status.PodIPs = nil
 | 
						|
	newPod.Status.PodIPs = nil
 | 
						|
	oldPod.Status.PodIP = saved
 | 
						|
	newPod.Status.PodIP = saved
 | 
						|
 | 
						|
	/* end dual stack testing */
 | 
						|
 | 
						|
	newPod.ObjectMeta.Name = "wrong-name"
 | 
						|
	if !podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be changed with pod name change")
 | 
						|
	}
 | 
						|
	newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
 | 
						|
 | 
						|
	saveConditions := oldPod.Status.Conditions
 | 
						|
	oldPod.Status.Conditions = nil
 | 
						|
	if !podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be changed with pod readiness change")
 | 
						|
	}
 | 
						|
	oldPod.Status.Conditions = saveConditions
 | 
						|
 | 
						|
	now := metav1.NewTime(time.Now().UTC())
 | 
						|
	newPod.ObjectMeta.DeletionTimestamp = &now
 | 
						|
	if !podChangedHelper(oldPod, newPod, endpointChanged) {
 | 
						|
		t.Errorf("Expected pod to be changed with DeletionTimestamp change")
 | 
						|
	}
 | 
						|
	newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
 | 
						|
}
 | 
						|
 | 
						|
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Annotations: map[string]string{
 | 
						|
				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
 | 
						|
			},
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Annotations: map[string]string{
 | 
						|
				v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Annotations: map[string]string{
 | 
						|
				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
 | 
						|
			},
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
 | 
						|
	ns := "other"
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0*time.Second)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Annotations: map[string]string{
 | 
						|
				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	// Neither pod nor service has trigger time, this should cause annotation to be cleared.
 | 
						|
	addPods(endpoints.podStore, ns, 1, 1, 0, false)
 | 
						|
	endpoints.serviceStore.Add(&v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Selector: map[string]string{},
 | 
						|
			Ports:    []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
			Labels: map[string]string{
 | 
						|
				v1.IsHeadlessService: "",
 | 
						|
			}, // Annotation not set anymore.
 | 
						|
		},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
 | 
						|
		}},
 | 
						|
	})
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
 | 
						|
}
 | 
						|
 | 
						|
// TestPodUpdatesBatching verifies that endpoint updates caused by pod updates are batched together.
 | 
						|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
 | 
						|
// TODO(mborsz): Migrate this test to mock clock when possible.
 | 
						|
func TestPodUpdatesBatching(t *testing.T) {
 | 
						|
	type podUpdate struct {
 | 
						|
		delay   time.Duration
 | 
						|
		podName string
 | 
						|
		podIP   string
 | 
						|
	}
 | 
						|
 | 
						|
	tests := []struct {
 | 
						|
		name             string
 | 
						|
		batchPeriod      time.Duration
 | 
						|
		podsCount        int
 | 
						|
		updates          []podUpdate
 | 
						|
		finalDelay       time.Duration
 | 
						|
		wantRequestCount int
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:        "three updates with no batching",
 | 
						|
			batchPeriod: 0 * time.Second,
 | 
						|
			podsCount:   10,
 | 
						|
			updates: []podUpdate{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay:   200 * time.Millisecond,
 | 
						|
					podName: "pod0",
 | 
						|
					podIP:   "10.0.0.0",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod1",
 | 
						|
					podIP:   "10.0.0.1",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod2",
 | 
						|
					podIP:   "10.0.0.2",
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 3,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "three updates in one batch",
 | 
						|
			batchPeriod: 1 * time.Second,
 | 
						|
			podsCount:   10,
 | 
						|
			updates: []podUpdate{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay:   200 * time.Millisecond,
 | 
						|
					podName: "pod0",
 | 
						|
					podIP:   "10.0.0.0",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod1",
 | 
						|
					podIP:   "10.0.0.1",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod2",
 | 
						|
					podIP:   "10.0.0.2",
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 1,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "three updates in two batches",
 | 
						|
			batchPeriod: 1 * time.Second,
 | 
						|
			podsCount:   10,
 | 
						|
			updates: []podUpdate{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay:   200 * time.Millisecond,
 | 
						|
					podName: "pod0",
 | 
						|
					podIP:   "10.0.0.0",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod1",
 | 
						|
					podIP:   "10.0.0.1",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   1 * time.Second,
 | 
						|
					podName: "pod2",
 | 
						|
					podIP:   "10.0.0.2",
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 2,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			ns := "other"
 | 
						|
			resourceVersion := 1
 | 
						|
			testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
			defer testServer.Close()
 | 
						|
			endpoints := newController(testServer.URL, tc.batchPeriod)
 | 
						|
			stopCh := make(chan struct{})
 | 
						|
			defer close(stopCh)
 | 
						|
			endpoints.podsSynced = alwaysReady
 | 
						|
			endpoints.servicesSynced = alwaysReady
 | 
						|
			endpoints.endpointsSynced = alwaysReady
 | 
						|
			endpoints.workerLoopPeriod = 10 * time.Millisecond
 | 
						|
 | 
						|
			go endpoints.Run(1, stopCh)
 | 
						|
 | 
						|
			addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
 | 
						|
 | 
						|
			endpoints.serviceStore.Add(&v1.Service{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					Selector: map[string]string{"foo": "bar"},
 | 
						|
					Ports:    []v1.ServicePort{{Port: 80}},
 | 
						|
				},
 | 
						|
			})
 | 
						|
 | 
						|
			for _, update := range tc.updates {
 | 
						|
				time.Sleep(update.delay)
 | 
						|
 | 
						|
				old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
 | 
						|
				if err != nil {
 | 
						|
					t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
 | 
						|
				}
 | 
						|
				if !exists {
 | 
						|
					t.Fatalf("Pod %q doesn't exist", update.podName)
 | 
						|
				}
 | 
						|
				oldPod := old.(*v1.Pod)
 | 
						|
				newPod := oldPod.DeepCopy()
 | 
						|
				newPod.Status.PodIP = update.podIP
 | 
						|
				newPod.ResourceVersion = strconv.Itoa(resourceVersion)
 | 
						|
				resourceVersion++
 | 
						|
 | 
						|
				endpoints.podStore.Update(newPod)
 | 
						|
				endpoints.updatePod(oldPod, newPod)
 | 
						|
			}
 | 
						|
 | 
						|
			time.Sleep(tc.finalDelay)
 | 
						|
			endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestPodAddsBatching verifies that endpoint updates caused by pod addition are batched together.
 | 
						|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
 | 
						|
// TODO(mborsz): Migrate this test to mock clock when possible.
 | 
						|
func TestPodAddsBatching(t *testing.T) {
 | 
						|
	type podAdd struct {
 | 
						|
		delay time.Duration
 | 
						|
	}
 | 
						|
 | 
						|
	tests := []struct {
 | 
						|
		name             string
 | 
						|
		batchPeriod      time.Duration
 | 
						|
		adds             []podAdd
 | 
						|
		finalDelay       time.Duration
 | 
						|
		wantRequestCount int
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:        "three adds with no batching",
 | 
						|
			batchPeriod: 0 * time.Second,
 | 
						|
			adds: []podAdd{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay: 200 * time.Millisecond,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay: 100 * time.Millisecond,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay: 100 * time.Millisecond,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 3,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "three adds in one batch",
 | 
						|
			batchPeriod: 1 * time.Second,
 | 
						|
			adds: []podAdd{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay: 200 * time.Millisecond,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay: 100 * time.Millisecond,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay: 100 * time.Millisecond,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 1,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "three adds in two batches",
 | 
						|
			batchPeriod: 1 * time.Second,
 | 
						|
			adds: []podAdd{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay: 200 * time.Millisecond,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay: 100 * time.Millisecond,
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay: 1 * time.Second,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 2,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			ns := "other"
 | 
						|
			testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
			defer testServer.Close()
 | 
						|
			endpoints := newController(testServer.URL, tc.batchPeriod)
 | 
						|
			stopCh := make(chan struct{})
 | 
						|
			defer close(stopCh)
 | 
						|
			endpoints.podsSynced = alwaysReady
 | 
						|
			endpoints.servicesSynced = alwaysReady
 | 
						|
			endpoints.endpointsSynced = alwaysReady
 | 
						|
			endpoints.workerLoopPeriod = 10 * time.Millisecond
 | 
						|
 | 
						|
			go endpoints.Run(1, stopCh)
 | 
						|
 | 
						|
			endpoints.serviceStore.Add(&v1.Service{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					Selector: map[string]string{"foo": "bar"},
 | 
						|
					Ports:    []v1.ServicePort{{Port: 80}},
 | 
						|
				},
 | 
						|
			})
 | 
						|
 | 
						|
			for i, add := range tc.adds {
 | 
						|
				time.Sleep(add.delay)
 | 
						|
 | 
						|
				p := testPod(ns, i, 1, true, false)
 | 
						|
				endpoints.podStore.Add(p)
 | 
						|
				endpoints.addPod(p)
 | 
						|
			}
 | 
						|
 | 
						|
			time.Sleep(tc.finalDelay)
 | 
						|
			endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestPodDeleteBatching verifies that endpoint updates caused by pod deletion are batched together.
 | 
						|
// This test uses real time.Sleep, as there is no easy way to mock time in endpoints controller now.
 | 
						|
// TODO(mborsz): Migrate this test to mock clock when possible.
 | 
						|
func TestPodDeleteBatching(t *testing.T) {
 | 
						|
	type podDelete struct {
 | 
						|
		delay   time.Duration
 | 
						|
		podName string
 | 
						|
	}
 | 
						|
 | 
						|
	tests := []struct {
 | 
						|
		name             string
 | 
						|
		batchPeriod      time.Duration
 | 
						|
		podsCount        int
 | 
						|
		deletes          []podDelete
 | 
						|
		finalDelay       time.Duration
 | 
						|
		wantRequestCount int
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:        "three deletes with no batching",
 | 
						|
			batchPeriod: 0 * time.Second,
 | 
						|
			podsCount:   10,
 | 
						|
			deletes: []podDelete{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay:   200 * time.Millisecond,
 | 
						|
					podName: "pod0",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod1",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod2",
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 3,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "three deletes in one batch",
 | 
						|
			batchPeriod: 1 * time.Second,
 | 
						|
			podsCount:   10,
 | 
						|
			deletes: []podDelete{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay:   200 * time.Millisecond,
 | 
						|
					podName: "pod0",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod1",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod2",
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 1,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:        "three deletes in two batches",
 | 
						|
			batchPeriod: 1 * time.Second,
 | 
						|
			podsCount:   10,
 | 
						|
			deletes: []podDelete{
 | 
						|
				{
 | 
						|
					// endpoints.Run needs ~100 ms to start processing updates.
 | 
						|
					delay:   200 * time.Millisecond,
 | 
						|
					podName: "pod0",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   100 * time.Millisecond,
 | 
						|
					podName: "pod1",
 | 
						|
				},
 | 
						|
				{
 | 
						|
					delay:   1 * time.Second,
 | 
						|
					podName: "pod2",
 | 
						|
				},
 | 
						|
			},
 | 
						|
			finalDelay:       3 * time.Second,
 | 
						|
			wantRequestCount: 2,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range tests {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			ns := "other"
 | 
						|
			testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
			defer testServer.Close()
 | 
						|
			endpoints := newController(testServer.URL, tc.batchPeriod)
 | 
						|
			stopCh := make(chan struct{})
 | 
						|
			defer close(stopCh)
 | 
						|
			endpoints.podsSynced = alwaysReady
 | 
						|
			endpoints.servicesSynced = alwaysReady
 | 
						|
			endpoints.endpointsSynced = alwaysReady
 | 
						|
			endpoints.workerLoopPeriod = 10 * time.Millisecond
 | 
						|
 | 
						|
			go endpoints.Run(1, stopCh)
 | 
						|
 | 
						|
			addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false)
 | 
						|
 | 
						|
			endpoints.serviceStore.Add(&v1.Service{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
 | 
						|
				Spec: v1.ServiceSpec{
 | 
						|
					Selector: map[string]string{"foo": "bar"},
 | 
						|
					Ports:    []v1.ServicePort{{Port: 80}},
 | 
						|
				},
 | 
						|
			})
 | 
						|
 | 
						|
			for _, update := range tc.deletes {
 | 
						|
				time.Sleep(update.delay)
 | 
						|
 | 
						|
				old, exists, err := endpoints.podStore.GetByKey(fmt.Sprintf("%s/%s", ns, update.podName))
 | 
						|
				if err != nil {
 | 
						|
					t.Fatalf("Error while retrieving old value of %q: %v", update.podName, err)
 | 
						|
				}
 | 
						|
				if !exists {
 | 
						|
					t.Fatalf("Pod %q doesn't exist", update.podName)
 | 
						|
				}
 | 
						|
				endpoints.podStore.Delete(old)
 | 
						|
				endpoints.deletePod(old)
 | 
						|
			}
 | 
						|
 | 
						|
			time.Sleep(tc.finalDelay)
 | 
						|
			endpointsHandler.ValidateRequestCount(t, tc.wantRequestCount)
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestSyncEndpointsServiceNotFound(t *testing.T) {
 | 
						|
	ns := metav1.NamespaceDefault
 | 
						|
	testServer, endpointsHandler := makeTestServer(t, ns)
 | 
						|
	defer testServer.Close()
 | 
						|
	endpoints := newController(testServer.URL, 0)
 | 
						|
	endpoints.endpointsStore.Add(&v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:            "foo",
 | 
						|
			Namespace:       ns,
 | 
						|
			ResourceVersion: "1",
 | 
						|
		},
 | 
						|
	})
 | 
						|
	endpoints.syncService(ns + "/foo")
 | 
						|
	endpointsHandler.ValidateRequestCount(t, 1)
 | 
						|
	endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "DELETE", nil)
 | 
						|
}
 | 
						|
 | 
						|
func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
 | 
						|
	podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
 | 
						|
	return podChanged
 | 
						|
}
 |