mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			436 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			436 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package config
 | 
						|
 | 
						|
import (
 | 
						|
	"reflect"
 | 
						|
	"sort"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/apimachinery/pkg/watch"
 | 
						|
	informers "k8s.io/client-go/informers"
 | 
						|
	"k8s.io/client-go/kubernetes/fake"
 | 
						|
	ktesting "k8s.io/client-go/testing"
 | 
						|
)
 | 
						|
 | 
						|
type sortedServices []*v1.Service
 | 
						|
 | 
						|
func (s sortedServices) Len() int {
 | 
						|
	return len(s)
 | 
						|
}
 | 
						|
func (s sortedServices) Swap(i, j int) {
 | 
						|
	s[i], s[j] = s[j], s[i]
 | 
						|
}
 | 
						|
func (s sortedServices) Less(i, j int) bool {
 | 
						|
	return s[i].Name < s[j].Name
 | 
						|
}
 | 
						|
 | 
						|
type ServiceHandlerMock struct {
 | 
						|
	lock sync.Mutex
 | 
						|
 | 
						|
	state   map[types.NamespacedName]*v1.Service
 | 
						|
	synced  bool
 | 
						|
	updated chan []*v1.Service
 | 
						|
	process func([]*v1.Service)
 | 
						|
}
 | 
						|
 | 
						|
func NewServiceHandlerMock() *ServiceHandlerMock {
 | 
						|
	shm := &ServiceHandlerMock{
 | 
						|
		state:   make(map[types.NamespacedName]*v1.Service),
 | 
						|
		updated: make(chan []*v1.Service, 5),
 | 
						|
	}
 | 
						|
	shm.process = func(services []*v1.Service) {
 | 
						|
		shm.updated <- services
 | 
						|
	}
 | 
						|
	return shm
 | 
						|
}
 | 
						|
 | 
						|
func (h *ServiceHandlerMock) OnServiceAdd(service *v1.Service) {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						|
	h.state[namespacedName] = service
 | 
						|
	h.sendServices()
 | 
						|
}
 | 
						|
 | 
						|
func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *v1.Service) {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						|
	h.state[namespacedName] = service
 | 
						|
	h.sendServices()
 | 
						|
}
 | 
						|
 | 
						|
func (h *ServiceHandlerMock) OnServiceDelete(service *v1.Service) {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						|
	delete(h.state, namespacedName)
 | 
						|
	h.sendServices()
 | 
						|
}
 | 
						|
 | 
						|
func (h *ServiceHandlerMock) OnServiceSynced() {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	h.synced = true
 | 
						|
	h.sendServices()
 | 
						|
}
 | 
						|
 | 
						|
func (h *ServiceHandlerMock) sendServices() {
 | 
						|
	if !h.synced {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	services := make([]*v1.Service, 0, len(h.state))
 | 
						|
	for _, svc := range h.state {
 | 
						|
		services = append(services, svc)
 | 
						|
	}
 | 
						|
	sort.Sort(sortedServices(services))
 | 
						|
	h.process(services)
 | 
						|
}
 | 
						|
 | 
						|
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*v1.Service) {
 | 
						|
	// We might get 1 or more updates for N service updates, because we
 | 
						|
	// over write older snapshots of services from the producer go-routine
 | 
						|
	// if the consumer falls behind.
 | 
						|
	var services []*v1.Service
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case services = <-h.updated:
 | 
						|
			if reflect.DeepEqual(services, expectedServices) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		// Unittests will hard timeout in 5m with a stack trace, prevent that
 | 
						|
		// and surface a clearer reason for failure.
 | 
						|
		case <-time.After(wait.ForeverTestTimeout):
 | 
						|
			t.Errorf("Timed out. Expected %#v, Got %#v", expectedServices, services)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type sortedEndpoints []*v1.Endpoints
 | 
						|
 | 
						|
func (s sortedEndpoints) Len() int {
 | 
						|
	return len(s)
 | 
						|
}
 | 
						|
func (s sortedEndpoints) Swap(i, j int) {
 | 
						|
	s[i], s[j] = s[j], s[i]
 | 
						|
}
 | 
						|
func (s sortedEndpoints) Less(i, j int) bool {
 | 
						|
	return s[i].Name < s[j].Name
 | 
						|
}
 | 
						|
 | 
						|
type EndpointsHandlerMock struct {
 | 
						|
	lock sync.Mutex
 | 
						|
 | 
						|
	state   map[types.NamespacedName]*v1.Endpoints
 | 
						|
	synced  bool
 | 
						|
	updated chan []*v1.Endpoints
 | 
						|
	process func([]*v1.Endpoints)
 | 
						|
}
 | 
						|
 | 
						|
func NewEndpointsHandlerMock() *EndpointsHandlerMock {
 | 
						|
	ehm := &EndpointsHandlerMock{
 | 
						|
		state:   make(map[types.NamespacedName]*v1.Endpoints),
 | 
						|
		updated: make(chan []*v1.Endpoints, 5),
 | 
						|
	}
 | 
						|
	ehm.process = func(endpoints []*v1.Endpoints) {
 | 
						|
		ehm.updated <- endpoints
 | 
						|
	}
 | 
						|
	return ehm
 | 
						|
}
 | 
						|
 | 
						|
func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *v1.Endpoints) {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
						|
	h.state[namespacedName] = endpoints
 | 
						|
	h.sendEndpoints()
 | 
						|
}
 | 
						|
 | 
						|
func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
						|
	h.state[namespacedName] = endpoints
 | 
						|
	h.sendEndpoints()
 | 
						|
}
 | 
						|
 | 
						|
func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *v1.Endpoints) {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
						|
	delete(h.state, namespacedName)
 | 
						|
	h.sendEndpoints()
 | 
						|
}
 | 
						|
 | 
						|
func (h *EndpointsHandlerMock) OnEndpointsSynced() {
 | 
						|
	h.lock.Lock()
 | 
						|
	defer h.lock.Unlock()
 | 
						|
	h.synced = true
 | 
						|
	h.sendEndpoints()
 | 
						|
}
 | 
						|
 | 
						|
func (h *EndpointsHandlerMock) sendEndpoints() {
 | 
						|
	if !h.synced {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	endpoints := make([]*v1.Endpoints, 0, len(h.state))
 | 
						|
	for _, eps := range h.state {
 | 
						|
		endpoints = append(endpoints, eps)
 | 
						|
	}
 | 
						|
	sort.Sort(sortedEndpoints(endpoints))
 | 
						|
	h.process(endpoints)
 | 
						|
}
 | 
						|
 | 
						|
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*v1.Endpoints) {
 | 
						|
	// We might get 1 or more updates for N endpoint updates, because we
 | 
						|
	// over write older snapshots of endpoints from the producer go-routine
 | 
						|
	// if the consumer falls behind. Unittests will hard timeout in 5m.
 | 
						|
	var endpoints []*v1.Endpoints
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case endpoints = <-h.updated:
 | 
						|
			if reflect.DeepEqual(endpoints, expectedEndpoints) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		// Unittests will hard timeout in 5m with a stack trace, prevent that
 | 
						|
		// and surface a clearer reason for failure.
 | 
						|
		case <-time.After(wait.ForeverTestTimeout):
 | 
						|
			t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestNewServiceAddedAndNotified(t *testing.T) {
 | 
						|
	client := fake.NewSimpleClientset()
 | 
						|
	fakeWatch := watch.NewFake()
 | 
						|
	client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
 | 
						|
 | 
						|
	config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
 | 
						|
	handler := NewServiceHandlerMock()
 | 
						|
	config.RegisterEventHandler(handler)
 | 
						|
	go sharedInformers.Start(stopCh)
 | 
						|
	go config.Run(stopCh)
 | 
						|
 | 
						|
	service := &v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
 | 
						|
		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(service)
 | 
						|
	handler.ValidateServices(t, []*v1.Service{service})
 | 
						|
}
 | 
						|
 | 
						|
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
 | 
						|
	client := fake.NewSimpleClientset()
 | 
						|
	fakeWatch := watch.NewFake()
 | 
						|
	client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
 | 
						|
 | 
						|
	config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
 | 
						|
	handler := NewServiceHandlerMock()
 | 
						|
	config.RegisterEventHandler(handler)
 | 
						|
	go sharedInformers.Start(stopCh)
 | 
						|
	go config.Run(stopCh)
 | 
						|
 | 
						|
	service1 := &v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
 | 
						|
		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(service1)
 | 
						|
	handler.ValidateServices(t, []*v1.Service{service1})
 | 
						|
 | 
						|
	service2 := &v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
 | 
						|
		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(service2)
 | 
						|
	services := []*v1.Service{service2, service1}
 | 
						|
	handler.ValidateServices(t, services)
 | 
						|
 | 
						|
	fakeWatch.Delete(service1)
 | 
						|
	services = []*v1.Service{service2}
 | 
						|
	handler.ValidateServices(t, services)
 | 
						|
}
 | 
						|
 | 
						|
func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
 | 
						|
	client := fake.NewSimpleClientset()
 | 
						|
	fakeWatch := watch.NewFake()
 | 
						|
	client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
 | 
						|
 | 
						|
	config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
 | 
						|
	handler := NewServiceHandlerMock()
 | 
						|
	handler2 := NewServiceHandlerMock()
 | 
						|
	config.RegisterEventHandler(handler)
 | 
						|
	config.RegisterEventHandler(handler2)
 | 
						|
	go sharedInformers.Start(stopCh)
 | 
						|
	go config.Run(stopCh)
 | 
						|
 | 
						|
	service1 := &v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
 | 
						|
		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
 | 
						|
	}
 | 
						|
	service2 := &v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
 | 
						|
		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(service1)
 | 
						|
	fakeWatch.Add(service2)
 | 
						|
 | 
						|
	services := []*v1.Service{service2, service1}
 | 
						|
	handler.ValidateServices(t, services)
 | 
						|
	handler2.ValidateServices(t, services)
 | 
						|
}
 | 
						|
 | 
						|
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
 | 
						|
	client := fake.NewSimpleClientset()
 | 
						|
	fakeWatch := watch.NewFake()
 | 
						|
	client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
 | 
						|
 | 
						|
	config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
 | 
						|
	handler := NewEndpointsHandlerMock()
 | 
						|
	handler2 := NewEndpointsHandlerMock()
 | 
						|
	config.RegisterEventHandler(handler)
 | 
						|
	config.RegisterEventHandler(handler2)
 | 
						|
	go sharedInformers.Start(stopCh)
 | 
						|
	go config.Run(stopCh)
 | 
						|
 | 
						|
	endpoints1 := &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 80}},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
	endpoints2 := &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 80}},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(endpoints1)
 | 
						|
	fakeWatch.Add(endpoints2)
 | 
						|
 | 
						|
	endpoints := []*v1.Endpoints{endpoints2, endpoints1}
 | 
						|
	handler.ValidateEndpoints(t, endpoints)
 | 
						|
	handler2.ValidateEndpoints(t, endpoints)
 | 
						|
}
 | 
						|
 | 
						|
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
 | 
						|
	client := fake.NewSimpleClientset()
 | 
						|
	fakeWatch := watch.NewFake()
 | 
						|
	client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
 | 
						|
 | 
						|
	config := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), time.Minute)
 | 
						|
	handler := NewEndpointsHandlerMock()
 | 
						|
	handler2 := NewEndpointsHandlerMock()
 | 
						|
	config.RegisterEventHandler(handler)
 | 
						|
	config.RegisterEventHandler(handler2)
 | 
						|
	go sharedInformers.Start(stopCh)
 | 
						|
	go config.Run(stopCh)
 | 
						|
 | 
						|
	endpoints1 := &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 80}},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
	endpoints2 := &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 80}},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(endpoints1)
 | 
						|
	fakeWatch.Add(endpoints2)
 | 
						|
 | 
						|
	endpoints := []*v1.Endpoints{endpoints2, endpoints1}
 | 
						|
	handler.ValidateEndpoints(t, endpoints)
 | 
						|
	handler2.ValidateEndpoints(t, endpoints)
 | 
						|
 | 
						|
	// Add one more
 | 
						|
	endpoints3 := &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 80}},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
	fakeWatch.Add(endpoints3)
 | 
						|
	endpoints = []*v1.Endpoints{endpoints2, endpoints1, endpoints3}
 | 
						|
	handler.ValidateEndpoints(t, endpoints)
 | 
						|
	handler2.ValidateEndpoints(t, endpoints)
 | 
						|
 | 
						|
	// Update the "foo" service with new endpoints
 | 
						|
	endpoints1v2 := &v1.Endpoints{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
 | 
						|
		Subsets: []v1.EndpointSubset{{
 | 
						|
			Addresses: []v1.EndpointAddress{{IP: "7.7.7.7"}},
 | 
						|
			Ports:     []v1.EndpointPort{{Port: 80}},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
	fakeWatch.Modify(endpoints1v2)
 | 
						|
	endpoints = []*v1.Endpoints{endpoints2, endpoints1v2, endpoints3}
 | 
						|
	handler.ValidateEndpoints(t, endpoints)
 | 
						|
	handler2.ValidateEndpoints(t, endpoints)
 | 
						|
 | 
						|
	// Remove "bar" endpoints
 | 
						|
	fakeWatch.Delete(endpoints2)
 | 
						|
	endpoints = []*v1.Endpoints{endpoints1v2, endpoints3}
 | 
						|
	handler.ValidateEndpoints(t, endpoints)
 | 
						|
	handler2.ValidateEndpoints(t, endpoints)
 | 
						|
}
 | 
						|
 | 
						|
// TODO: Add a unittest for interrupts getting processed in a timely manner.
 | 
						|
// Currently this module has a circular dependency with config, and so it's
 | 
						|
// named config_test, which means even test methods need to be public. This
 | 
						|
// is refactoring that we can avoid by resolving the dependency.
 |