mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 03:38:15 +00:00 
			
		
		
		
	switch to clients for bootstrap controller
This commit is contained in:
		@@ -24,7 +24,6 @@ go_library(
 | 
			
		||||
        "//pkg/api/endpoints:go_default_library",
 | 
			
		||||
        "//pkg/api/errors:go_default_library",
 | 
			
		||||
        "//pkg/api/install:go_default_library",
 | 
			
		||||
        "//pkg/api/rest:go_default_library",
 | 
			
		||||
        "//pkg/api/v1:go_default_library",
 | 
			
		||||
        "//pkg/apimachinery/registered:go_default_library",
 | 
			
		||||
        "//pkg/apis/apps/install:go_default_library",
 | 
			
		||||
@@ -62,10 +61,8 @@ go_library(
 | 
			
		||||
        "//pkg/registry/autoscaling/rest:go_default_library",
 | 
			
		||||
        "//pkg/registry/batch/rest:go_default_library",
 | 
			
		||||
        "//pkg/registry/certificates/rest:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/namespace:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/rangeallocation:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/rest:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service/ipallocator/controller:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service/portallocator/controller:go_default_library",
 | 
			
		||||
        "//pkg/registry/extensions/rest:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -25,13 +25,10 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/endpoints"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/errors"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/rest"
 | 
			
		||||
	coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/genericapiserver"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/namespace"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
 | 
			
		||||
	corerest "k8s.io/kubernetes/pkg/registry/core/rest"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/service"
 | 
			
		||||
	servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
 | 
			
		||||
	portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/async"
 | 
			
		||||
@@ -41,13 +38,14 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const kubernetesServiceName = "kubernetes"
 | 
			
		||||
 | 
			
		||||
// Controller is the controller manager for the core bootstrap Kubernetes controller
 | 
			
		||||
// loops, which manage creating the "kubernetes" service, the "default" and "kube-system"
 | 
			
		||||
// namespace, and provide the IP repair check on service IPs
 | 
			
		||||
type Controller struct {
 | 
			
		||||
	ServiceClient   coreclient.ServicesGetter
 | 
			
		||||
	NamespaceRegistry namespace.Registry
 | 
			
		||||
	ServiceRegistry   service.Registry
 | 
			
		||||
	NamespaceClient coreclient.NamespacesGetter
 | 
			
		||||
 | 
			
		||||
	ServiceClusterIPRegistry rangeallocation.RangeRegistry
 | 
			
		||||
	ServiceClusterIPInterval time.Duration
 | 
			
		||||
@@ -65,6 +63,7 @@ type Controller struct {
 | 
			
		||||
 | 
			
		||||
	PublicIP net.IP
 | 
			
		||||
 | 
			
		||||
	// ServiceIP indicates where the kubernetes service will live.  It may not be nil.
 | 
			
		||||
	ServiceIP                 net.IP
 | 
			
		||||
	ServicePort               int
 | 
			
		||||
	ExtraServicePorts         []api.ServicePort
 | 
			
		||||
@@ -76,11 +75,10 @@ type Controller struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewBootstrapController returns a controller for watching the core capabilities of the master
 | 
			
		||||
func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter) *Controller {
 | 
			
		||||
func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter) *Controller {
 | 
			
		||||
	return &Controller{
 | 
			
		||||
		ServiceClient:   serviceClient,
 | 
			
		||||
		NamespaceRegistry: legacyRESTStorage.NamespaceRegistry,
 | 
			
		||||
		ServiceRegistry:   legacyRESTStorage.ServiceRegistry,
 | 
			
		||||
		NamespaceClient: nsClient,
 | 
			
		||||
 | 
			
		||||
		EndpointReconciler: c.EndpointReconcilerConfig.Reconciler,
 | 
			
		||||
		EndpointInterval:   c.EndpointReconcilerConfig.Interval,
 | 
			
		||||
@@ -119,8 +117,8 @@ func (c *Controller) Start() {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceRegistry, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
 | 
			
		||||
	repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceRegistry, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
 | 
			
		||||
	repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
 | 
			
		||||
	repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
 | 
			
		||||
 | 
			
		||||
	// run all of the controllers once prior to returning from Start.
 | 
			
		||||
	if err := repairClusterIPs.RunOnce(); err != nil {
 | 
			
		||||
@@ -173,23 +171,21 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
 | 
			
		||||
	if err := c.CreateNamespaceIfNeeded(api.NamespaceDefault); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if c.ServiceIP != nil {
 | 
			
		||||
 | 
			
		||||
	servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
 | 
			
		||||
		if err := c.CreateOrUpdateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
 | 
			
		||||
	if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
 | 
			
		||||
		if err := c.EndpointReconciler.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
 | 
			
		||||
	if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CreateNamespaceIfNeeded will create a namespace if it doesn't already exist
 | 
			
		||||
func (c *Controller) CreateNamespaceIfNeeded(ns string) error {
 | 
			
		||||
	ctx := api.NewContext()
 | 
			
		||||
	if _, err := c.NamespaceRegistry.GetNamespace(ctx, ns); err == nil {
 | 
			
		||||
	if _, err := c.NamespaceClient.Namespaces().Get(ns); err == nil {
 | 
			
		||||
		// the namespace already exists
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
@@ -199,7 +195,7 @@ func (c *Controller) CreateNamespaceIfNeeded(ns string) error {
 | 
			
		||||
			Namespace: "",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	err := c.NamespaceRegistry.CreateNamespace(ctx, newNs)
 | 
			
		||||
	_, err := c.NamespaceClient.Namespaces().Create(newNs)
 | 
			
		||||
	if err != nil && errors.IsAlreadyExists(err) {
 | 
			
		||||
		err = nil
 | 
			
		||||
	}
 | 
			
		||||
@@ -241,7 +237,6 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp
 | 
			
		||||
// CreateMasterServiceIfNeeded will create the specified service if it
 | 
			
		||||
// doesn't already exist.
 | 
			
		||||
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error {
 | 
			
		||||
	ctx := api.NewDefaultContext()
 | 
			
		||||
	if s, err := c.ServiceClient.Services(api.NamespaceDefault).Get(serviceName); err == nil {
 | 
			
		||||
		// The service already exists.
 | 
			
		||||
		if reconcile {
 | 
			
		||||
@@ -268,9 +263,6 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
 | 
			
		||||
			Type:            serviceType,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	if err := rest.BeforeCreate(service.Strategy, ctx, svc); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err := c.ServiceClient.Services(api.NamespaceDefault).Create(svc)
 | 
			
		||||
	if err != nil && errors.IsAlreadyExists(err) {
 | 
			
		||||
 
 | 
			
		||||
@@ -272,8 +272,8 @@ func (m *Master) InstallLegacyAPI(c *Config, restOptionsGetter genericapiserver.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if c.EnableCoreControllers {
 | 
			
		||||
		serviceClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
 | 
			
		||||
		bootstrapController := c.NewBootstrapController(legacyRESTStorage, serviceClient)
 | 
			
		||||
		coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
 | 
			
		||||
		bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
 | 
			
		||||
		if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil {
 | 
			
		||||
			glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err)
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -31,9 +31,7 @@ go_library(
 | 
			
		||||
        "//pkg/registry/core/endpoint/etcd:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/event/etcd:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/limitrange/etcd:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/namespace:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/namespace/etcd:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/node:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/node/etcd:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/persistentvolume/etcd:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/persistentvolumeclaim/etcd:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -43,9 +43,7 @@ import (
 | 
			
		||||
	endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd"
 | 
			
		||||
	eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd"
 | 
			
		||||
	limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/namespace"
 | 
			
		||||
	namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/node"
 | 
			
		||||
	nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd"
 | 
			
		||||
	pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd"
 | 
			
		||||
	pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd"
 | 
			
		||||
@@ -86,10 +84,6 @@ type LegacyRESTStorageProvider struct {
 | 
			
		||||
// master.go for wiring controllers.
 | 
			
		||||
// TODO remove this by running the controller as a poststarthook
 | 
			
		||||
type LegacyRESTStorage struct {
 | 
			
		||||
	NodeRegistry              node.Registry
 | 
			
		||||
	NamespaceRegistry         namespace.Registry
 | 
			
		||||
	ServiceRegistry           service.Registry
 | 
			
		||||
	EndpointRegistry          endpoint.Registry
 | 
			
		||||
	ServiceClusterIPAllocator rangeallocation.RangeRegistry
 | 
			
		||||
	ServiceNodePortAllocator  rangeallocation.RangeRegistry
 | 
			
		||||
}
 | 
			
		||||
@@ -132,16 +126,14 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
 | 
			
		||||
	configMapStorage := configmapetcd.NewREST(restOptionsGetter(api.Resource("configMaps")))
 | 
			
		||||
 | 
			
		||||
	namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptionsGetter(api.Resource("namespaces")))
 | 
			
		||||
	restStorage.NamespaceRegistry = namespace.NewRegistry(namespaceStorage)
 | 
			
		||||
 | 
			
		||||
	endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints")))
 | 
			
		||||
	restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage)
 | 
			
		||||
	endpointRegistry := endpoint.NewRegistry(endpointsStorage)
 | 
			
		||||
 | 
			
		||||
	nodeStorage, err := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClientConfig, c.ProxyTransport)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
 | 
			
		||||
	}
 | 
			
		||||
	restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node)
 | 
			
		||||
 | 
			
		||||
	podStorage := podetcd.NewStorage(
 | 
			
		||||
		restOptionsGetter(api.Resource("pods")),
 | 
			
		||||
@@ -151,7 +143,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))
 | 
			
		||||
	restStorage.ServiceRegistry = service.NewRegistry(serviceRESTStorage)
 | 
			
		||||
	serviceRegistry := service.NewRegistry(serviceRESTStorage)
 | 
			
		||||
 | 
			
		||||
	var serviceClusterIPRegistry rangeallocation.RangeRegistry
 | 
			
		||||
	serviceClusterIPRange := c.ServiceIPRange
 | 
			
		||||
@@ -185,7 +177,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
 | 
			
		||||
 | 
			
		||||
	controllerStorage := controlleretcd.NewStorage(restOptionsGetter(api.Resource("replicationControllers")))
 | 
			
		||||
 | 
			
		||||
	serviceRest := service.NewStorage(restStorage.ServiceRegistry, restStorage.EndpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
 | 
			
		||||
	serviceRest := service.NewStorage(serviceRegistry, endpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
 | 
			
		||||
 | 
			
		||||
	restStorageMap := map[string]rest.Storage{
 | 
			
		||||
		"pods":             podStorage.Pod,
 | 
			
		||||
 
 | 
			
		||||
@@ -17,9 +17,9 @@ go_library(
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/api:go_default_library",
 | 
			
		||||
        "//pkg/api/errors:go_default_library",
 | 
			
		||||
        "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
 | 
			
		||||
        "//pkg/client/retry:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/rangeallocation:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service/ipallocator:go_default_library",
 | 
			
		||||
        "//pkg/util/runtime:go_default_library",
 | 
			
		||||
        "//pkg/util/wait:go_default_library",
 | 
			
		||||
@@ -33,7 +33,7 @@ go_test(
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/api:go_default_library",
 | 
			
		||||
        "//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service/ipallocator:go_default_library",
 | 
			
		||||
        "//pkg/registry/registrytest:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -23,9 +23,9 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/errors"
 | 
			
		||||
	coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/retry"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/service"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
@@ -48,17 +48,17 @@ import (
 | 
			
		||||
// TODO: perform repair?
 | 
			
		||||
type Repair struct {
 | 
			
		||||
	interval      time.Duration
 | 
			
		||||
	registry service.Registry
 | 
			
		||||
	serviceClient coreclient.ServicesGetter
 | 
			
		||||
	network       *net.IPNet
 | 
			
		||||
	alloc         rangeallocation.RangeRegistry
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
 | 
			
		||||
// and generates informational warnings for a cluster that is not in sync.
 | 
			
		||||
func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
 | 
			
		||||
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
 | 
			
		||||
	return &Repair{
 | 
			
		||||
		interval:      interval,
 | 
			
		||||
		registry: registry,
 | 
			
		||||
		serviceClient: serviceClient,
 | 
			
		||||
		network:       network,
 | 
			
		||||
		alloc:         alloc,
 | 
			
		||||
	}
 | 
			
		||||
@@ -99,13 +99,12 @@ func (c *Repair) runOnce() error {
 | 
			
		||||
		return fmt.Errorf("unable to refresh the service IP block: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
 | 
			
		||||
	// We explicitly send no resource version, since the resource version
 | 
			
		||||
	// of 'latest' is from a different collection, it's not comparable to
 | 
			
		||||
	// the service collection. The caching layer keeps per-collection RVs,
 | 
			
		||||
	// and this is proper, since in theory the collections could be hosted
 | 
			
		||||
	// in separate etcd (or even non-etcd) instances.
 | 
			
		||||
	list, err := c.registry.ListServices(ctx, &api.ListOptions{})
 | 
			
		||||
	list, err := c.serviceClient.Services(api.NamespaceAll).List(api.ListOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to refresh the service IP block: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -23,8 +23,8 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/registrytest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type mockRangeRegistry struct {
 | 
			
		||||
@@ -49,12 +49,12 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRepair(t *testing.T) {
 | 
			
		||||
	registry := registrytest.NewServiceRegistry()
 | 
			
		||||
	fakeClient := fake.NewSimpleClientset()
 | 
			
		||||
	_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
 | 
			
		||||
	ipregistry := &mockRangeRegistry{
 | 
			
		||||
		item: &api.RangeAllocation{},
 | 
			
		||||
	}
 | 
			
		||||
	r := NewRepair(0, registry, cidr, ipregistry)
 | 
			
		||||
	r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
 | 
			
		||||
 | 
			
		||||
	if err := r.RunOnce(); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
@@ -67,7 +67,7 @@ func TestRepair(t *testing.T) {
 | 
			
		||||
		item:      &api.RangeAllocation{},
 | 
			
		||||
		updateErr: fmt.Errorf("test error"),
 | 
			
		||||
	}
 | 
			
		||||
	r = NewRepair(0, registry, cidr, ipregistry)
 | 
			
		||||
	r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
 | 
			
		||||
	if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -84,7 +84,7 @@ func TestRepairEmpty(t *testing.T) {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	registry := registrytest.NewServiceRegistry()
 | 
			
		||||
	fakeClient := fake.NewSimpleClientset()
 | 
			
		||||
	ipregistry := &mockRangeRegistry{
 | 
			
		||||
		item: &api.RangeAllocation{
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{
 | 
			
		||||
@@ -94,7 +94,7 @@ func TestRepairEmpty(t *testing.T) {
 | 
			
		||||
			Data:  dst.Data,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	r := NewRepair(0, registry, cidr, ipregistry)
 | 
			
		||||
	r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
 | 
			
		||||
	if err := r.RunOnce(); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -117,29 +117,32 @@ func TestRepairWithExisting(t *testing.T) {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	registry := registrytest.NewServiceRegistry()
 | 
			
		||||
	registry.List = api.ServiceList{
 | 
			
		||||
		Items: []api.Service{
 | 
			
		||||
			{
 | 
			
		||||
	fakeClient := fake.NewSimpleClientset(
 | 
			
		||||
		&api.Service{
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{Namespace: "one", Name: "one"},
 | 
			
		||||
			Spec:       api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
			
		||||
		},
 | 
			
		||||
			{
 | 
			
		||||
		&api.Service{
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{Namespace: "two", Name: "two"},
 | 
			
		||||
			Spec:       api.ServiceSpec{ClusterIP: "192.168.1.100"},
 | 
			
		||||
		},
 | 
			
		||||
			{ // outside CIDR, will be dropped
 | 
			
		||||
		&api.Service{ // outside CIDR, will be dropped
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{Namespace: "three", Name: "three"},
 | 
			
		||||
			Spec:       api.ServiceSpec{ClusterIP: "192.168.0.1"},
 | 
			
		||||
		},
 | 
			
		||||
			{ // empty, ignored
 | 
			
		||||
		&api.Service{ // empty, ignored
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{Namespace: "four", Name: "four"},
 | 
			
		||||
			Spec:       api.ServiceSpec{ClusterIP: ""},
 | 
			
		||||
		},
 | 
			
		||||
			{ // duplicate, dropped
 | 
			
		||||
		&api.Service{ // duplicate, dropped
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{Namespace: "five", Name: "five"},
 | 
			
		||||
			Spec:       api.ServiceSpec{ClusterIP: "192.168.1.1"},
 | 
			
		||||
		},
 | 
			
		||||
			{ // headless
 | 
			
		||||
		&api.Service{ // headless
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{Namespace: "six", Name: "six"},
 | 
			
		||||
			Spec:       api.ServiceSpec{ClusterIP: "None"},
 | 
			
		||||
		},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	ipregistry := &mockRangeRegistry{
 | 
			
		||||
		item: &api.RangeAllocation{
 | 
			
		||||
@@ -150,7 +153,7 @@ func TestRepairWithExisting(t *testing.T) {
 | 
			
		||||
			Data:  dst.Data,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	r := NewRepair(0, registry, cidr, ipregistry)
 | 
			
		||||
	r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
 | 
			
		||||
	if err := r.RunOnce(); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,7 @@ go_library(
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/api:go_default_library",
 | 
			
		||||
        "//pkg/api/errors:go_default_library",
 | 
			
		||||
        "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
 | 
			
		||||
        "//pkg/client/retry:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/rangeallocation:go_default_library",
 | 
			
		||||
        "//pkg/registry/core/service:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/errors"
 | 
			
		||||
	coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/retry"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/service"
 | 
			
		||||
@@ -34,17 +35,17 @@ import (
 | 
			
		||||
// See ipallocator/controller/repair.go; this is a copy for ports.
 | 
			
		||||
type Repair struct {
 | 
			
		||||
	interval      time.Duration
 | 
			
		||||
	registry  service.Registry
 | 
			
		||||
	serviceClient coreclient.ServicesGetter
 | 
			
		||||
	portRange     net.PortRange
 | 
			
		||||
	alloc         rangeallocation.RangeRegistry
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
 | 
			
		||||
// and generates informational warnings for a cluster that is not in sync.
 | 
			
		||||
func NewRepair(interval time.Duration, registry service.Registry, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
 | 
			
		||||
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
 | 
			
		||||
	return &Repair{
 | 
			
		||||
		interval:      interval,
 | 
			
		||||
		registry:  registry,
 | 
			
		||||
		serviceClient: serviceClient,
 | 
			
		||||
		portRange:     portRange,
 | 
			
		||||
		alloc:         alloc,
 | 
			
		||||
	}
 | 
			
		||||
@@ -88,13 +89,12 @@ func (c *Repair) runOnce() error {
 | 
			
		||||
		return fmt.Errorf("unable to refresh the port block: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
 | 
			
		||||
	// We explicitly send no resource version, since the resource version
 | 
			
		||||
	// of 'latest' is from a different collection, it's not comparable to
 | 
			
		||||
	// the service collection. The caching layer keeps per-collection RVs,
 | 
			
		||||
	// and this is proper, since in theory the collections could be hosted
 | 
			
		||||
	// in separate etcd (or even non-etcd) instances.
 | 
			
		||||
	list, err := c.registry.ListServices(ctx, &api.ListOptions{})
 | 
			
		||||
	list, err := c.serviceClient.Services(api.NamespaceAll).List(api.ListOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to refresh the port block: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user