From a11106edd3aa0c2d675042c2e1e911a7f81ea9b2 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Mon, 23 Mar 2015 22:59:38 +0000 Subject: [PATCH 1/6] Put the node controller in its own package rather than in the generic cloudprovider/controller package. --- cmd/integration/integration.go | 4 ++-- cmd/kube-controller-manager/app/controllermanager.go | 4 ++-- cmd/kubernetes/kubernetes.go | 4 ++-- pkg/cloudprovider/{controller => nodecontroller}/doc.go | 4 ++-- .../{controller => nodecontroller}/nodecontroller.go | 2 +- .../{controller => nodecontroller}/nodecontroller_test.go | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) rename pkg/cloudprovider/{controller => nodecontroller}/doc.go (86%) rename pkg/cloudprovider/{controller => nodecontroller}/nodecontroller.go (99%) rename pkg/cloudprovider/{controller => nodecontroller}/nodecontroller_test.go (99%) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 9b0a52a0739..8518ab16bd6 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -41,7 +41,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" @@ -221,7 +221,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "") + nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "") nodeController.Run(5*time.Second, true) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index e3914197383..db71f46c8ca 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -30,7 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" - nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" @@ -187,7 +187,7 @@ func (s *CMServer) Run(_ []string) error { glog.Warning("DEPRECATION NOTICE: sync_node_status flag is being deprecated. It has no effect now and it will be removed in a future version.") } - nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, + nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 61708b9b815..1f4f1f9948e 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -34,7 +34,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" @@ -128,7 +128,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, }, } - nodeController := nodeControllerPkg.NewNodeController( + nodeController := nodecontroller.NewNodeController( nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "") nodeController.Run(10*time.Second, true) diff --git a/pkg/cloudprovider/controller/doc.go b/pkg/cloudprovider/nodecontroller/doc.go similarity index 86% rename from pkg/cloudprovider/controller/doc.go rename to pkg/cloudprovider/nodecontroller/doc.go index 64b73b3b216..097cac95d8c 100644 --- a/pkg/cloudprovider/controller/doc.go +++ b/pkg/cloudprovider/nodecontroller/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains code for syncing cloud instances with +// Package nodecontroller contains code for syncing cloud instances with // minion registry -package controller +package nodecontroller diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go similarity index 99% rename from pkg/cloudprovider/controller/nodecontroller.go rename to pkg/cloudprovider/nodecontroller/nodecontroller.go index e88d27c4db1..2d1f03b30d2 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package nodecontroller import ( "errors" diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go similarity index 99% rename from pkg/cloudprovider/controller/nodecontroller_test.go rename to pkg/cloudprovider/nodecontroller/nodecontroller_test.go index fafdfd0dcbe..b32456864b2 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package nodecontroller import ( "errors" From ccc300289f2014c57013d9ea95ace173d8d70c0f Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Tue, 24 Mar 2015 17:32:43 +0000 Subject: [PATCH 2/6] Implement a ServiceController that watches services and handles keeping external load balancers up-to-date based on the service's specs, using the new DeltaFIFO watch queue class. Remove the old registry REST handler code for creating/updating/deleting load balancers. Also clean up a bunch of the GCE cloudprovider code related to load balancers. --- .../salt/kube-controller-manager/default | 6 +- .../app/controllermanager.go | 8 + cmd/kubernetes/kubernetes.go | 6 + pkg/cloudprovider/cloud.go | 2 + pkg/cloudprovider/gce/gce.go | 74 ++- pkg/cloudprovider/servicecontroller/doc.go | 19 + .../servicecontroller/servicecontroller.go | 434 ++++++++++++++++++ .../servicecontroller_test.go | 126 +++++ pkg/registry/service/doc.go | 2 +- pkg/registry/service/rest.go | 162 +------ pkg/registry/service/rest_test.go | 53 +-- test/e2e/service.go | 11 +- 12 files changed, 683 insertions(+), 220 deletions(-) create mode 100644 pkg/cloudprovider/servicecontroller/doc.go create mode 100644 pkg/cloudprovider/servicecontroller/servicecontroller.go create mode 100644 pkg/cloudprovider/servicecontroller/servicecontroller_test.go diff --git a/cluster/saltbase/salt/kube-controller-manager/default b/cluster/saltbase/salt/kube-controller-manager/default index c10a3976ac8..17a7b5d39cc 100644 --- a/cluster/saltbase/salt/kube-controller-manager/default +++ b/cluster/saltbase/salt/kube-controller-manager/default @@ -6,12 +6,16 @@ {% set master="--master=127.0.0.1:8080" -%} {% set machines = ""-%} +{% set cluster_name = "" -%} {% set minion_regexp = "--minion_regexp=.*" -%} {% set sync_nodes = "--sync_nodes=true" -%} {% if pillar['node_instance_prefix'] is defined -%} {% set minion_regexp = "--minion_regexp='" + pillar['node_instance_prefix'] + ".*'" -%} {% endif -%} +{% if pillar['instance_prefix'] is defined -%} + {% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%} +{% endif -%} {% set cloud_provider = "" -%} {% set cloud_config = "" -%} @@ -49,4 +53,4 @@ {% endif -%} # grains.cloud is defined -DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}" +DAEMON_ARGS="{{daemon_args}} {{master}} {{machines}} {{cluster_name}} {{ minion_regexp }} {{ cloud_provider }} {{ sync_nodes }} {{ cloud_config }} {{pillar['log_level']}}" diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index db71f46c8ca..06f800bd48d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" @@ -49,6 +50,7 @@ type CMServer struct { ClientConfig client.Config CloudProvider string CloudConfigFile string + ClusterName string MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ @@ -192,6 +195,11 @@ func (s *CMServer) Run(_ []string) error { s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) + serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) + if err := serviceController.Run(); err != nil { + glog.Errorf("Failed to start service controller: %v", err) + } + resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 1f4f1f9948e..8ec247d9b30 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -35,6 +35,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" @@ -132,6 +133,11 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "") nodeController.Run(10*time.Second, true) + serviceController := servicecontroller.New(nil, cl, "kubernetes") + if err := serviceController.Run(); err != nil { + glog.Warningf("Running without a service controller: %v", err) + } + endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 3f76e80147f..ddf74af7da0 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -50,6 +50,8 @@ func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) stri type TCPLoadBalancer interface { // TCPLoadBalancerExists returns whether the specified load balancer exists. // TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service + // TODO: This should really return the details of the load balancer so we can + // determine if it matches the needs of a service rather than if it exists. TCPLoadBalancerExists(name, region string) (bool, error) // CreateTCPLoadBalancer creates a new tcp load balancer. Returns the IP address or hostname of the balancer CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error) diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 188aa34ba52..f5f577bce38 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,7 +17,6 @@ limitations under the License. package gce_cloud import ( - "errors" "fmt" "io" "io/ioutil" @@ -37,6 +36,7 @@ import ( "code.google.com/p/gcfg" compute "code.google.com/p/google-api-go-client/compute/v1" container "code.google.com/p/google-api-go-client/container/v1beta1" + "code.google.com/p/google-api-go-client/googleapi" "github.com/golang/glog" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -196,7 +196,7 @@ const ( GCEAffinityTypeClientIPProto GCEAffinityType = "CLIENT_IP_PROTO" ) -func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) (string, error) { +func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinityType GCEAffinityType) error { var instances []string for _, host := range hosts { instances = append(instances, makeHostLink(gce.projectID, gce.zone, host)) @@ -208,13 +208,16 @@ func (gce *GCECloud) makeTargetPool(name, region string, hosts []string, affinit } op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() if err != nil { - return "", err + return err } if err = gce.waitForRegionOp(op, region); err != nil { - return "", err + return err } - link := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) - return link, nil + return nil +} + +func (gce *GCECloud) targetPoolURL(name, region string) string { + return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { @@ -228,7 +231,10 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error } } if pollOp.Error != nil && len(pollOp.Error.Errors) > 0 { - return errors.New(pollOp.Error.Errors[0].Message) + return &googleapi.Error{ + Code: int(pollOp.HttpErrorStatusCode), + Message: pollOp.Error.Errors[0].Message, + } } return nil } @@ -236,10 +242,21 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error // TCPLoadBalancerExists is an implementation of TCPLoadBalancer.TCPLoadBalancerExists. func (gce *GCECloud) TCPLoadBalancerExists(name, region string) (bool, error) { _, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() + if err == nil { + return true, nil + } + if isHTTPErrorCode(err, http.StatusNotFound) { + return false, nil + } return false, err } -//translate from what K8s supports to what the cloud provider supports for session affinity. +func isHTTPErrorCode(err error, code int) bool { + apiErr, ok := err.(*googleapi.Error) + return ok && apiErr.Code == code +} + +// translate from what K8s supports to what the cloud provider supports for session affinity. func translateAffinityType(affinityType api.AffinityType) GCEAffinityType { switch affinityType { case api.AffinityTypeClientIP: @@ -253,10 +270,15 @@ func translateAffinityType(affinityType api.AffinityType) GCEAffinityType { } // CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer. +// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're +// owned by the project and available to be used, and use them if they are. func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinityType api.AffinityType) (string, error) { - pool, err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) + err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) if err != nil { - return "", err + if !isHTTPErrorCode(err, http.StatusConflict) { + return "", err + } + glog.Infof("Creating forwarding rule pointing at target pool that already exists: %v", err) } if len(ports) == 0 { @@ -276,18 +298,17 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I Name: name, IPProtocol: "TCP", PortRange: fmt.Sprintf("%d-%d", minPort, maxPort), - Target: pool, - } - if len(externalIP) > 0 { - req.IPAddress = externalIP.String() + Target: gce.targetPoolURL(name, region), } op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do() - if err != nil { + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { return "", err } - err = gce.waitForRegionOp(op, region) - if err != nil { - return "", err + if op != nil { + err = gce.waitForRegionOp(op, region) + if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { + return "", err + } } fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do() if err != nil { @@ -343,23 +364,28 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) // DeleteTCPLoadBalancer is an implementation of TCPLoadBalancer.DeleteTCPLoadBalancer. func (gce *GCECloud) DeleteTCPLoadBalancer(name, region string) error { op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do() - if err != nil { - glog.Warningln("Failed to delete Forwarding Rules %s: got error %s. Trying to delete Target Pool", name, err.Error()) + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name) + } else if err != nil { + glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error()) return err } else { err = gce.waitForRegionOp(op, region) if err != nil { - glog.Warningln("Failed waiting for Forwarding Rule %s to be deleted: got error %s. Trying to delete Target Pool", name, err.Error()) + glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error()) } } op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() - if err != nil { - glog.Warningln("Failed to delete Target Pool %s, got error %s.", name, err.Error()) + if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Target pool %s already deleted.", name) + return nil + } else if err != nil { + glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error()) return err } err = gce.waitForRegionOp(op, region) if err != nil { - glog.Warningln("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) + glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error()) } return err } diff --git a/pkg/cloudprovider/servicecontroller/doc.go b/pkg/cloudprovider/servicecontroller/doc.go new file mode 100644 index 00000000000..697b65d991f --- /dev/null +++ b/pkg/cloudprovider/servicecontroller/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package servicecontroller contains code for syncing cloud load balancers +// with the service registry. +package servicecontroller diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go new file mode 100644 index 00000000000..d77e6241d3e --- /dev/null +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -0,0 +1,434 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicecontroller + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/golang/glog" +) + +const ( + clientRetryCount = 5 + clientRetryInterval = 5 * time.Second + + retryable = true + notRetryable = false +) + +type ServiceController struct { + cloud cloudprovider.Interface + kubeClient client.Interface + clusterName string + balancer cloudprovider.TCPLoadBalancer + zone cloudprovider.Zone + mu sync.Mutex // protects serviceMap + serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc +} + +// New returns a new service controller to keep cloud provider service resources +// (like external load balancers) in sync with the registry. +func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName string) *ServiceController { + return &ServiceController{ + cloud: cloud, + kubeClient: kubeClient, + clusterName: clusterName, + serviceMap: make(map[string]*api.Service), + } +} + +// Run starts a background goroutine that watches for changes to services that +// have (or had) externalLoadBalancers=true and ensures that they have external +// load balancers created and deleted appropriately. +func (s *ServiceController) Run() error { + if err := s.init(); err != nil { + return err + } + + // We have to make this check beecause the ListWatch that we use in + // WatchServices requires Client functions that aren't in the interface + // for some reason. + if _, ok := s.kubeClient.(*client.Client); !ok { + return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.") + } + + go s.watchServices() + return nil +} + +func (s *ServiceController) init() error { + if s.cloud == nil { + return fmt.Errorf("ServiceController should not be run without a cloudprovider.") + } + + balancer, ok := s.cloud.TCPLoadBalancer() + if !ok { + return fmt.Errorf("the cloud provider does not support external TCP load balancers.") + } + s.balancer = balancer + + zones, ok := s.cloud.Zones() + if !ok { + return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating external load balancers.") + } + zone, err := zones.GetZone() + if err != nil { + return fmt.Errorf("failed to get zone from cloud provider, will not be able to create external load balancers: %v", err) + } + s.zone = zone + return nil +} + +func (s *ServiceController) watchServices() { + // Get the currently existing set of services and then all future creates + // and updates of services. + // TODO: Add a compressor that intelligently squashes together updates? + keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() }) + serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister) + lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) + cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() + // TODO: Add proper retries rather than just re-adding to the queue? + for { + newItem := serviceQueue.Pop() + deltas, ok := newItem.(cache.Deltas) + if !ok { + glog.Errorf("Received object from service watcher that wasn't Deltas: %+v", newItem) + } + delta := deltas.Newest() + if delta == nil { + glog.Errorf("Received nil delta from watcher queue.") + continue + } + err, shouldRetry := s.processDelta(delta) + if shouldRetry { + // Add the failed service back to the queue so we'll retry it. + glog.Errorf("Failed to process service delta. Retrying: %v", err) + time.Sleep(5 * time.Second) + serviceQueue.AddIfNotPresent(deltas) + } else if err != nil { + glog.Errorf("Failed to process service delta. Not retrying: %v", err) + } + } +} + +// Returns an error if processing the delta failed, along with a boolean +// indicator of whether the processing should be retried. +func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { + service, ok := delta.Object.(*api.Service) + if !ok { + // If the DeltaFIFO saw a key in our cache that it didn't know about, it + // can send a deletion with an unknown state. Grab the service from our + // cache for deleting. + key, ok := delta.Object.(cache.DeletedFinalStateUnknown) + if !ok { + return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable + } + service, ok = s.getService(key.Key) + if !ok { + return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable + } + delta.Object = service + } + glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service) + + // TODO: Make this more parallel. The only things that need to serialized + // are changes to services with the same namespace and name. + // TODO: Handle added, updated, and sync differently? + switch delta.Type { + case cache.Added: + fallthrough + case cache.Updated: + fallthrough + case cache.Sync: + return s.createLoadBalancerIfNeeded(service) + case cache.Deleted: + return s.handleDelete(service) + default: + glog.Errorf("Unexpected delta type: %v", delta.Type) + } + return nil, notRetryable +} + +// Returns whatever error occurred along with a boolean indicator of whether it +// should be retried. +func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) { + namespacedName, err := cache.MetaNamespaceKeyFunc(service) + if err != nil { + return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable + } + + cachedService, cached := s.getService(namespacedName) + if cached && !needsUpdate(cachedService, service) { + glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) + return nil, notRetryable + } + if cached { + // If the service already exists but needs to be updated, delete it so that + // we can recreate it cleanly. + if cachedService.Spec.CreateExternalLoadBalancer { + glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName) + if err := s.ensureLBDeleted(cachedService); err != nil { + return err, retryable + } + } + } else { + // If we don't have any cached memory of the load balancer and it already + // exists, optimistically consider our work done. + // TODO: If we could read the spec of the existing load balancer, we could + // determine if an update is necessary. + exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region) + if err != nil { + return fmt.Errorf("Error getting LB for service %s", namespacedName), retryable + } + if exists && len(service.Spec.PublicIPs) == 0 { + // The load balancer exists, but we apparently don't know about its public + // IPs, so just delete it and recreate it to get back to a sane state. + // TODO: Ideally the cloud provider interface would return the IP for us. + glog.Infof("Deleting old LB for service with no public IPs %s", namespacedName) + if err := s.ensureLBDeleted(service); err != nil { + return err, retryable + } + } else if exists { + // TODO: Better handle updates for non-cached services, this is optimistic. + glog.Infof("LB already exists for service %s", namespacedName) + return nil, notRetryable + } + } + + if !service.Spec.CreateExternalLoadBalancer { + glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName) + return nil, notRetryable + } + + glog.V(2).Infof("Creating LB for service %s", namespacedName) + + // The load balancer doesn't exist yet, so create it. + publicIPstring := fmt.Sprint(service.Spec.PublicIPs) + err = s.createExternalLoadBalancer(service) + if err != nil { + return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable + } + + if publicIPstring == fmt.Sprint(service.Spec.PublicIPs) { + glog.Infof("Not persisting unchanged service to registry.") + return nil, notRetryable + } + s.setService(namespacedName, service) + + // If creating the load balancer succeeded, persist the updated service. + if err = s.persistUpdate(service); err != nil { + return fmt.Errorf("Failed to persist updated publicIPs to apiserver, even after retries. Giving up: %v", err), notRetryable + } + return nil, notRetryable +} + +// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or +// the object having been deleted. +func (s *ServiceController) persistUpdate(service *api.Service) error { + var err error + for i := 0; i < clientRetryCount; i++ { + _, err = s.kubeClient.Services(service.Namespace).Update(service) + if err == nil { + return nil + } + glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v", + service.Name, err) + time.Sleep(clientRetryInterval) + } + return err +} + +func (s *ServiceController) createExternalLoadBalancer(service *api.Service) error { + ports, err := getTCPPorts(service) + if err != nil { + return err + } + nodes, err := s.kubeClient.Nodes().List(labels.Everything()) + if err != nil { + return err + } + name := s.loadBalancerName(service) + if len(service.Spec.PublicIPs) > 0 { + for _, publicIP := range service.Spec.PublicIPs { + // TODO: Make this actually work for multiple IPs by using different + // names for each. For now, we'll just create the first and break. + endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), + ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity) + if err != nil { + return err + } + service.Spec.PublicIPs = []string{endpoint} + break + } + } else { + endpoint, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil, + ports, hostsFromNodeList(nodes), service.Spec.SessionAffinity) + if err != nil { + return err + } + service.Spec.PublicIPs = []string{endpoint} + } + return nil +} + +// Returns whatever error occurred along with a boolean indicator of whether it +// should be retried. +func (s *ServiceController) handleDelete(service *api.Service) (error, bool) { + if err := s.ensureLBDeleted(service); err != nil { + return err, retryable + } + namespacedName, err := cache.MetaNamespaceKeyFunc(service) + if err != nil { + // This is panic-worthy, since the queue shouldn't have been able to + // handle the service if it couldn't generate a name for it. + return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable + } + s.deleteService(namespacedName) + return nil, notRetryable +} + +// Ensures that the load balancer associated with the given service is deleted, +// doing the deletion if necessary. +func (s *ServiceController) ensureLBDeleted(service *api.Service) error { + // This is only needed because not all delete load balancer implementations + // are currently idempotent to the LB not existing. + if exists, err := s.balancer.TCPLoadBalancerExists(s.loadBalancerName(service), s.zone.Region); err != nil { + return err + } else if !exists { + return nil + } + + if err := s.balancer.DeleteTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region); err != nil { + return err + } + return nil +} + +// listKeys implements the interface required by DeltaFIFO to list the keys we +// already know about. +func (s *ServiceController) listKeys() []string { + s.mu.Lock() + defer s.mu.Unlock() + keys := make([]string, 0, len(s.serviceMap)) + for k := range s.serviceMap { + keys = append(keys, k) + } + return keys +} + +func (s *ServiceController) getService(serviceName string) (*api.Service, bool) { + s.mu.Lock() + defer s.mu.Unlock() + info, ok := s.serviceMap[serviceName] + return info, ok +} + +func (s *ServiceController) setService(serviceName string, info *api.Service) { + s.mu.Lock() + defer s.mu.Unlock() + s.serviceMap[serviceName] = info +} + +func (s *ServiceController) deleteService(serviceName string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.serviceMap, serviceName) +} + +func needsUpdate(oldService *api.Service, newService *api.Service) bool { + if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer { + return false + } + if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer { + return true + } + if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { + return true + } + if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) { + return true + } + for i := range oldService.Spec.PublicIPs { + if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] { + return true + } + } + return false +} + +// TODO: Use a shorter name that's less likely to be longer than cloud +// providers' length limits. +func (s *ServiceController) loadBalancerName(service *api.Service) string { + return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) +} + +// TODO: Deduplicate this with the copy in pkg/registry/service/rest.go. +func getTCPPorts(service *api.Service) ([]int, error) { + ports := []int{} + for i := range service.Spec.Ports { + // TODO: Support UDP. Remove the check from the API validation package once + // it's supported. + sp := &service.Spec.Ports[i] + if sp.Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") + } + ports = append(ports, sp.Port) + } + return ports, nil +} + +func portsEqual(x, y *api.Service) bool { + xPorts, err := getTCPPorts(x) + if err != nil { + return false + } + yPorts, err := getTCPPorts(y) + if err != nil { + return false + } + if len(xPorts) != len(yPorts) { + return false + } + // Use a map for comparison since port slices aren't necessarily sorted. + xPortMap := make(map[int]bool) + for _, xPort := range xPorts { + xPortMap[xPort] = true + } + for _, yPort := range yPorts { + if !xPortMap[yPort] { + return false + } + } + return true +} + +func hostsFromNodeList(list *api.NodeList) []string { + result := make([]string, len(list.Items)) + for ix := range list.Items { + result[ix] = list.Items[ix].Name + } + return result +} diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go new file mode 100644 index 00000000000..73aa5ff08b4 --- /dev/null +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -0,0 +1,126 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package servicecontroller + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" +) + +const region = "us-central" + +func TestCreateExternalLoadBalancer(t *testing.T) { + table := []struct { + service *api.Service + expectErr bool + expectCreateAttempt bool + }{ + { + service: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "no-external-balancer", + Namespace: "default", + }, + Spec: api.ServiceSpec{ + CreateExternalLoadBalancer: false, + }, + }, + expectErr: false, + expectCreateAttempt: false, + }, + { + service: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "udp-service", + Namespace: "default", + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Port: 80, + Protocol: api.ProtocolUDP, + }}, + CreateExternalLoadBalancer: true, + }, + }, + expectErr: true, + expectCreateAttempt: false, + }, + { + service: &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "basic-service1", + Namespace: "default", + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Port: 80, + Protocol: api.ProtocolTCP, + }}, + CreateExternalLoadBalancer: true, + }, + }, + expectErr: false, + expectCreateAttempt: true, + }, + } + + for _, item := range table { + cloud := &fake_cloud.FakeCloud{} + cloud.Region = region + client := &testclient.Fake{} + controller := New(cloud, client, "test-cluster") + controller.init() + cloud.Calls = nil // ignore any cloud calls made in init() + client.Actions = nil // ignore any client calls made in init() + err, _ := controller.createLoadBalancerIfNeeded(item.service) + if !item.expectErr && err != nil { + t.Errorf("unexpected error: %v", err) + } else if item.expectErr && err == nil { + t.Errorf("expected error creating %v, got nil", item.service) + } + if !item.expectCreateAttempt { + if len(cloud.Calls) > 0 { + t.Errorf("unexpected cloud provider calls: %v", cloud.Calls) + } + if len(client.Actions) > 0 { + t.Errorf("unexpected client actions: %v", client.Actions) + } + } else { + if len(cloud.Balancers) != 1 { + t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) + } else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) || + cloud.Balancers[0].Region != region || + cloud.Balancers[0].Ports[0] != item.service.Spec.Ports[0].Port { + t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0]) + } + actionFound := false + for _, action := range client.Actions { + if action.Action == "update-service" { + actionFound = true + } + } + if !actionFound { + t.Errorf("expected updated service to be sent to client, got these actions instead: %v", client.Actions) + } + } + } +} + +// TODO(a-robinson): Add tests for update/sync/delete. diff --git a/pkg/registry/service/doc.go b/pkg/registry/service/doc.go index bd503f2b0e1..1022a6e73c7 100644 --- a/pkg/registry/service/doc.go +++ b/pkg/registry/service/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package service provides Registry interface and it's RESTStorage +// Package service provides the Registry interface and its RESTStorage // implementation for storing Service api objects. package service diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index f84e53ef6ad..2402f31d3e7 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -42,7 +42,8 @@ import ( // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry + registry Registry + // TODO(a-robinson): Remove cloud cloud cloudprovider.Interface machines minion.Registry endpoints endpoint.Registry @@ -97,6 +98,14 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return nil, err } + // Make sure that we'll be able to create a load balancer for the service, + // even though it'll be created by the ServiceController. + if service.Spec.CreateExternalLoadBalancer { + if _, err := getTCPPorts(service); err != nil { + return nil, err + } + } + releaseServiceIP := false defer func() { if releaseServiceIP { @@ -123,15 +132,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err releaseServiceIP = true } - // TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers - // correctly no matter what http operations happen. - if service.Spec.CreateExternalLoadBalancer { - err := rs.createExternalLoadBalancer(ctx, service) - if err != nil { - return nil, err - } - } - out, err := rs.registry.CreateService(ctx, service) if err != nil { err = rest.CheckGeneratedNameError(rest.Services, err, service) @@ -144,14 +144,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return out, err } -func hostsFromMinionList(list *api.NodeList) []string { - result := make([]string, len(list.Items)) - for ix := range list.Items { - result[ix] = list.Items[ix].Name - } - return result -} - func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { service, err := rs.registry.GetService(ctx, id) if err != nil { @@ -160,9 +152,6 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { if api.IsServiceIPSet(service) { rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP)) } - if service.Spec.CreateExternalLoadBalancer { - rs.deleteExternalLoadBalancer(ctx, service) - } return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) } @@ -219,22 +208,6 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 { return nil, false, errors.NewInvalid("service", service.Name, errs) } - // Recreate external load balancer if changed. - if externalLoadBalancerNeedsUpdate(oldService, service) { - // TODO: support updating existing balancers - if oldService.Spec.CreateExternalLoadBalancer { - err = rs.deleteExternalLoadBalancer(ctx, oldService) - if err != nil { - return nil, false, err - } - } - if service.Spec.CreateExternalLoadBalancer { - err = rs.createExternalLoadBalancer(ctx, service) - if err != nil { - return nil, false, err - } - } - } out, err := rs.registry.UpdateService(ctx, service) return out, false, err } @@ -283,52 +256,7 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou return nil, nil, fmt.Errorf("no endpoints available for %q", id) } -func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { - if rs.cloud == nil { - return fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - - ports, err := getTCPPorts(service) - if err != nil { - return err - } - - balancer, ok := rs.cloud.TCPLoadBalancer() - if !ok { - return fmt.Errorf("the cloud provider does not support external TCP load balancers.") - } - zones, ok := rs.cloud.Zones() - if !ok { - return fmt.Errorf("the cloud provider does not support zone enumeration.") - } - hosts, err := rs.machines.ListMinions(ctx, labels.Everything(), fields.Everything()) - if err != nil { - return err - } - zone, err := zones.GetZone() - if err != nil { - return err - } - name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name) - var affinityType api.AffinityType = service.Spec.SessionAffinity - if len(service.Spec.PublicIPs) > 0 { - for _, publicIP := range service.Spec.PublicIPs { - _, err = balancer.CreateTCPLoadBalancer(name, zone.Region, net.ParseIP(publicIP), ports, hostsFromMinionList(hosts), affinityType) - if err != nil { - // TODO: have to roll-back any successful calls. - return err - } - } - } else { - endpoint, err := balancer.CreateTCPLoadBalancer(name, zone.Region, nil, ports, hostsFromMinionList(hosts), affinityType) - if err != nil { - return err - } - service.Spec.PublicIPs = []string{endpoint} - } - return nil -} - +// TODO: Deduplicate with the copy of this in pkg/registry/service/rest.go func getTCPPorts(service *api.Service) ([]int, error) { ports := []int{} for i := range service.Spec.Ports { @@ -341,71 +269,3 @@ func getTCPPorts(service *api.Service) ([]int, error) { } return ports, nil } - -func portsEqual(x, y *api.Service) bool { - xPorts, err := getTCPPorts(x) - if err != nil { - return false - } - yPorts, err := getTCPPorts(y) - if err != nil { - return false - } - if len(xPorts) != len(yPorts) { - return false - } - for i := range xPorts { - if xPorts[i] != yPorts[i] { - return false - } - } - return true -} - -func (rs *REST) deleteExternalLoadBalancer(ctx api.Context, service *api.Service) error { - if rs.cloud == nil { - return fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - zones, ok := rs.cloud.Zones() - if !ok { - // We failed to get zone enumerator. - // As this should have failed when we tried in "create" too, - // assume external load balancer was never created. - return nil - } - balancer, ok := rs.cloud.TCPLoadBalancer() - if !ok { - // See comment above. - return nil - } - zone, err := zones.GetZone() - if err != nil { - return err - } - name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name) - if err := balancer.DeleteTCPLoadBalancer(name, zone.Region); err != nil { - return err - } - return nil -} - -func externalLoadBalancerNeedsUpdate(oldService, newService *api.Service) bool { - if !oldService.Spec.CreateExternalLoadBalancer && !newService.Spec.CreateExternalLoadBalancer { - return false - } - if oldService.Spec.CreateExternalLoadBalancer != newService.Spec.CreateExternalLoadBalancer { - return true - } - if !portsEqual(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { - return true - } - if len(oldService.Spec.PublicIPs) != len(newService.Spec.PublicIPs) { - return true - } - for i := range oldService.Spec.PublicIPs { - if oldService.Spec.PublicIPs[i] != newService.Spec.PublicIPs[i] { - return true - } - } - return false -} diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index b96c17c24e2..cbf3f29f87c 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -19,7 +19,6 @@ package service import ( "bytes" "encoding/gob" - "fmt" "net" "strings" "testing" @@ -257,10 +256,11 @@ func TestServiceRegistryExternalService(t *testing.T) { }}, }, } - if _, err := storage.Create(ctx, svc); err != nil { - t.Fatalf("Unexpected error: %v", err) + _, err := storage.Create(ctx, svc) + if err != nil { + t.Errorf("Failed to create service: %#v", err) } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } srv, err := registry.GetService(ctx, svc.Name) @@ -270,38 +270,11 @@ func TestServiceRegistryExternalService(t *testing.T) { if srv == nil { t.Errorf("Failed to find service: %s", svc.Name) } - if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 { + if len(fakeCloud.Balancers) != 0 { t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers) } } -func TestServiceRegistryExternalServiceError(t *testing.T) { - storage, registry, fakeCloud := NewTestREST(t, nil) - fakeCloud.Err = fmt.Errorf("test error") - svc := &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - CreateExternalLoadBalancer: true, - SessionAffinity: api.AffinityTypeNone, - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - }}, - }, - } - ctx := api.NewDefaultContext() - if _, err := storage.Create(ctx, svc); err == nil { - t.Fatalf("Unexpected success") - } - if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } - if registry.Service != nil { - t.Errorf("Expected registry.CreateService to not get called, but it got %#v", registry.Service) - } -} - func TestServiceRegistryDelete(t *testing.T) { ctx := api.NewDefaultContext() storage, registry, fakeCloud := NewTestREST(t, nil) @@ -343,7 +316,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { } registry.CreateService(ctx, svc) storage.Delete(ctx, svc.Name) - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } if e, a := "foo", registry.DeletedID; e != a { @@ -381,7 +354,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc2); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -391,9 +364,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc3); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" || - fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" || - fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } } @@ -423,7 +394,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { if _, err := storage.Create(ctx, svc1); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -433,9 +404,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc2); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" || - fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" || - fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" { + if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } } @@ -744,7 +713,7 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { if err != nil { t.Errorf("Unexpected error %v", err) } - if len(fakeCloud.Balancers) != 1 || fakeCloud.Balancers[0].Name != "kubernetes-default-foo" || fakeCloud.Balancers[0].Ports[0] != 6502 { + if len(fakeCloud.Balancers) != 0 { t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers) } } diff --git a/test/e2e/service.go b/test/e2e/service.go index 2cce8805e79..d98efe50d3e 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -286,6 +286,15 @@ var _ = Describe("Services", func() { Expect(err).NotTo(HaveOccurred()) }(ns, serviceName) + // Wait for the load balancer to be created asynchronously, which is + // (unfortunately) currently indicated by a public IP address being + // added to the spec. + for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) { + result, _ = c.Services(ns).Get(serviceName) + if len(result.Spec.PublicIPs) == 1 { + break + } + } if len(result.Spec.PublicIPs) != 1 { Failf("got unexpected number (%d) of public IPs for externally load balanced service: %v", result.Spec.PublicIPs, result) } @@ -325,7 +334,7 @@ var _ = Describe("Services", func() { By("hitting the pod through the service's external load balancer") var resp *http.Response - for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) { + for t := time.Now(); time.Since(t) < time.Minute; time.Sleep(5 * time.Second) { resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port)) if err == nil { break From 7b647c5dbc02d16d121718ed5bbd0f3b93840e63 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 1 Apr 2015 02:55:23 +0000 Subject: [PATCH 3/6] Fix the services namespace test to wait for the services' load balancers to be created. --- test/e2e/service.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/test/e2e/service.go b/test/e2e/service.go index d98efe50d3e..22df0aa4536 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -287,14 +287,9 @@ var _ = Describe("Services", func() { }(ns, serviceName) // Wait for the load balancer to be created asynchronously, which is - // (unfortunately) currently indicated by a public IP address being - // added to the spec. - for t := time.Now(); time.Since(t) < 4*time.Minute; time.Sleep(5 * time.Second) { - result, _ = c.Services(ns).Get(serviceName) - if len(result.Spec.PublicIPs) == 1 { - break - } - } + // currently indicated by a public IP address being added to the spec. + result, err = waitForPublicIPs(c, serviceName, ns) + Expect(err).NotTo(HaveOccurred()) if len(result.Spec.PublicIPs) != 1 { Failf("got unexpected number (%d) of public IPs for externally load balanced service: %v", result.Spec.PublicIPs, result) } @@ -378,13 +373,19 @@ var _ = Describe("Services", func() { service.ObjectMeta.Name = serviceName service.ObjectMeta.Namespace = namespace By("creating service " + serviceName + " in namespace " + namespace) - result, err := c.Services(namespace).Create(service) + _, err := c.Services(namespace).Create(service) Expect(err).NotTo(HaveOccurred()) defer func(namespace, serviceName string) { // clean up when we're done By("deleting service " + serviceName + " in namespace " + namespace) err := c.Services(namespace).Delete(serviceName) Expect(err).NotTo(HaveOccurred()) }(namespace, serviceName) + } + } + for _, namespace := range namespaces { + for _, serviceName := range serviceNames { + result, err := waitForPublicIPs(c, serviceName, namespace) + Expect(err).NotTo(HaveOccurred()) publicIPs = append(publicIPs, result.Spec.PublicIPs...) // Save 'em to check uniqueness } } @@ -392,6 +393,24 @@ var _ = Describe("Services", func() { }) }) +func waitForPublicIPs(c *client.Client, serviceName, namespace string) (*api.Service, error) { + const timeout = 4 * time.Minute + var service *api.Service + By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a public IP", timeout, serviceName, namespace)) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) { + service, err := c.Services(namespace).Get(serviceName) + if err != nil { + Logf("Get service failed, ignoring for 5s: %v", err) + continue + } + if len(service.Spec.PublicIPs) > 0 { + return service, nil + } + Logf("Waiting for service %s in namespace %s to have a public IP (%v)", serviceName, namespace, time.Since(start)) + } + return service, fmt.Errorf("service %s in namespace %s to have a public IP after %.2f seconds", nil, serviceName, namespace, podStartTimeout.Seconds()) +} + func validateUniqueOrFail(s []string) { By(fmt.Sprintf("validating unique: %v", s)) sort.Strings(s) From 2b14fc1d14e467c07dc8fcbbb3278310f1b2c80d Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 1 Apr 2015 06:39:00 +0000 Subject: [PATCH 4/6] Remove the cloud provider field from the services REST handler and the master now that load balancers are handled by the ServiceController. --- cmd/kube-apiserver/app/server.go | 1 - pkg/master/master.go | 4 +- pkg/registry/service/rest.go | 8 +-- pkg/registry/service/rest_test.go | 86 ++++++++----------------------- 4 files changed, 25 insertions(+), 74 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 867faf788ef..9f18878a6ba 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -272,7 +272,6 @@ func (s *APIServer) Run(_ []string) error { } config := &master.Config{ - Cloud: cloud, EtcdHelper: helper, EventTTL: s.EventTTL, KubeletClient: kubeletClient, diff --git a/pkg/master/master.go b/pkg/master/master.go index a14dd4393d0..8d750dfbed6 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -40,7 +40,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authorizer" "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/handlers" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -72,7 +71,6 @@ import ( // Config is a structure used to configure a Master. type Config struct { - Cloud cloudprovider.Interface EtcdHelper tools.EtcdHelper EventTTL time.Duration MinionRegexp string @@ -396,7 +394,7 @@ func (m *Master) init(c *Config) { "bindings": podStorage.Binding, "replicationControllers": controllerStorage, - "services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), + "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), "endpoints": endpointsStorage, "minions": nodeStorage, "minions/status": nodeStatusStorage, diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 2402f31d3e7..5050eaa7096 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -29,7 +29,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" @@ -42,9 +41,7 @@ import ( // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry - // TODO(a-robinson): Remove cloud - cloud cloudprovider.Interface + registry Registry machines minion.Registry endpoints endpoint.Registry portalMgr *ipAllocator @@ -52,7 +49,7 @@ type REST struct { } // NewStorage returns a new REST. -func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet, +func NewStorage(registry Registry, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet, clusterName string) *REST { // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) ipa := newIPAllocator(portalNet) @@ -63,7 +60,6 @@ func NewStorage(registry Registry, cloud cloudprovider.Interface, machines minio return &REST{ registry: registry, - cloud: cloud, machines: machines, endpoints: endpoints, portalMgr: ipa, diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index cbf3f29f87c..31468f2d4d1 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -27,22 +27,20 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" - cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) -func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry, *cloud.FakeCloud) { +func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry) { registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} endpointRegistry := ®istrytest.EndpointRegistry{ Endpoints: endpoints, } nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) - storage := NewStorage(registry, fakeCloud, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes") - return storage, registry, fakeCloud + storage := NewStorage(registry, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes") + return storage, registry } func makeIPNet(t *testing.T) *net.IPNet { @@ -64,7 +62,7 @@ func deepCloneService(svc *api.Service) *api.Service { } func TestServiceRegistryCreate(t *testing.T) { - storage, registry, fakeCloud := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) storage.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -96,9 +94,6 @@ func TestServiceRegistryCreate(t *testing.T) { if created_service.Spec.PortalIP != "1.2.3.1" { t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } srv, err := registry.GetService(ctx, svc.Name) if err != nil { t.Errorf("unexpected error: %v", err) @@ -109,7 +104,7 @@ func TestServiceRegistryCreate(t *testing.T) { } func TestServiceStorageValidatesCreate(t *testing.T) { - storage, _, _ := NewTestREST(t, nil) + storage, _ := NewTestREST(t, nil) failureCases := map[string]api.Service{ "empty ID": { ObjectMeta: api.ObjectMeta{Name: ""}, @@ -147,7 +142,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) { func TestServiceRegistryUpdate(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, _ := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) svc, err := registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -195,7 +190,7 @@ func TestServiceRegistryUpdate(t *testing.T) { func TestServiceStorageValidatesUpdate(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, _ := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -243,7 +238,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { func TestServiceRegistryExternalService(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, fakeCloud := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -260,9 +255,6 @@ func TestServiceRegistryExternalService(t *testing.T) { if err != nil { t.Errorf("Failed to create service: %#v", err) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } srv, err := registry.GetService(ctx, svc.Name) if err != nil { t.Errorf("Unexpected error: %v", err) @@ -270,14 +262,11 @@ func TestServiceRegistryExternalService(t *testing.T) { if srv == nil { t.Errorf("Failed to find service: %s", svc.Name) } - if len(fakeCloud.Balancers) != 0 { - t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers) - } } func TestServiceRegistryDelete(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, fakeCloud := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -291,9 +280,6 @@ func TestServiceRegistryDelete(t *testing.T) { } registry.CreateService(ctx, svc) storage.Delete(ctx, svc.Name) - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } if e, a := "foo", registry.DeletedID; e != a { t.Errorf("Expected %v, but got %v", e, a) } @@ -301,7 +287,7 @@ func TestServiceRegistryDelete(t *testing.T) { func TestServiceRegistryDeleteExternal(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, fakeCloud := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -316,9 +302,6 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { } registry.CreateService(ctx, svc) storage.Delete(ctx, svc.Name) - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } if e, a := "foo", registry.DeletedID; e != a { t.Errorf("Expected %v, but got %v", e, a) } @@ -326,7 +309,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { func TestServiceRegistryUpdateExternalService(t *testing.T) { ctx := api.NewDefaultContext() - storage, _, fakeCloud := NewTestREST(t, nil) + storage, _ := NewTestREST(t, nil) // Create non-external load balancer. svc1 := &api.Service{ @@ -344,9 +327,6 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, err := storage.Create(ctx, svc1); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } // Modify load balancer to be external. svc2 := deepCloneService(svc1) @@ -354,9 +334,6 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc2); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } // Change port. svc3 := deepCloneService(svc2) @@ -364,14 +341,11 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc3); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } } func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { ctx := api.NewDefaultContext() - storage, _, fakeCloud := NewTestREST(t, nil) + storage, _ := NewTestREST(t, nil) // Create external load balancer. svc1 := &api.Service{ @@ -394,9 +368,6 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { if _, err := storage.Create(ctx, svc1); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } // Modify ports svc2 := deepCloneService(svc1) @@ -404,14 +375,11 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { if _, _, err := storage.Update(ctx, svc2); err != nil { t.Fatalf("Unexpected error: %v", err) } - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } } func TestServiceRegistryGet(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, fakeCloud := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -419,9 +387,6 @@ func TestServiceRegistryGet(t *testing.T) { }, }) storage.Get(ctx, "foo") - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } if e, a := "foo", registry.GottenID; e != a { t.Errorf("Expected %v, but got %v", e, a) } @@ -443,7 +408,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { }, }, } - storage, registry, _ := NewTestREST(t, endpoints) + storage, registry := NewTestREST(t, endpoints) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ @@ -490,7 +455,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { func TestServiceRegistryList(t *testing.T) { ctx := api.NewDefaultContext() - storage, registry, fakeCloud := NewTestREST(t, nil) + storage, registry := NewTestREST(t, nil) registry.CreateService(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -506,9 +471,6 @@ func TestServiceRegistryList(t *testing.T) { registry.List.ResourceVersion = "1" s, _ := storage.List(ctx, labels.Everything(), fields.Everything()) sl := s.(*api.ServiceList) - if len(fakeCloud.Calls) != 0 { - t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) - } if len(sl.Items) != 2 { t.Fatalf("Expected 2 services, but got %v", len(sl.Items)) } @@ -524,7 +486,7 @@ func TestServiceRegistryList(t *testing.T) { } func TestServiceRegistryIPAllocation(t *testing.T) { - rest, _, _ := NewTestREST(t, nil) + rest, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc1 := &api.Service{ @@ -589,7 +551,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) { } func TestServiceRegistryIPReallocation(t *testing.T) { - rest, _, _ := NewTestREST(t, nil) + rest, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc1 := &api.Service{ @@ -638,7 +600,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { } func TestServiceRegistryIPUpdate(t *testing.T) { - rest, _, _ := NewTestREST(t, nil) + rest, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -682,7 +644,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { } func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { - rest, _, fakeCloud := NewTestREST(t, nil) + rest, _ := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -713,18 +675,14 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { if err != nil { t.Errorf("Unexpected error %v", err) } - if len(fakeCloud.Balancers) != 0 { - t.Errorf("Unexpected balancer created: %v", fakeCloud.Balancers) - } } func TestServiceRegistryIPReloadFromStorage(t *testing.T) { registry := registrytest.NewServiceRegistry() - fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) endpoints := ®istrytest.EndpointRegistry{} - rest1 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") + rest1 := NewStorage(registry, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") rest1.portalMgr.randomAttempts = 0 svc := &api.Service{ @@ -755,7 +713,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { // This will reload from storage, finding the previous 2 nodeRegistry = registrytest.NewMinionRegistry(machines, api.NodeResources{}) - rest2 := NewStorage(registry, fakeCloud, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") + rest2 := NewStorage(registry, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") rest2.portalMgr.randomAttempts = 0 svc = &api.Service{ @@ -814,7 +772,7 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { } func TestCreate(t *testing.T) { - rest, registry, _ := NewTestREST(t, nil) + rest, registry := NewTestREST(t, nil) rest.portalMgr.randomAttempts = 0 test := resttest.New(t, rest, registry.SetError) From 9a351e3670ee19b0485d28728d81e5e1d83ab7bd Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 3 Apr 2015 19:06:25 +0000 Subject: [PATCH 5/6] Move validation of load balancers only supporting TCP ports to validation.go. --- pkg/api/validation/validation.go | 8 +++++ pkg/api/validation/validation_test.go | 31 +++++++++++++++++++ .../servicecontroller/servicecontroller.go | 1 - pkg/registry/service/rest.go | 22 ------------- 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 19de235a949..b49e883796b 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -927,6 +927,14 @@ func ValidateService(service *api.Service) errs.ValidationErrorList { } } + if service.Spec.CreateExternalLoadBalancer { + for i := range service.Spec.Ports { + if service.Spec.Ports[i].Protocol != api.ProtocolTCP { + allErrs = append(allErrs, errs.NewFieldInvalid("spec.ports", service.Spec.Ports[i], "cannot create an external load balancer with non-TCP ports")) + } + } + } + return allErrs } diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 8eda725f2ba..ddc27439946 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -1583,6 +1583,22 @@ func TestValidateService(t *testing.T) { }, numErrs: 1, }, + { + name: "invalid load balancer protocol 1", + tweakSvc: func(s *api.Service) { + s.Spec.CreateExternalLoadBalancer = true + s.Spec.Ports[0].Protocol = "UDP" + }, + numErrs: 1, + }, + { + name: "invalid load balancer protocol 2", + tweakSvc: func(s *api.Service) { + s.Spec.CreateExternalLoadBalancer = true + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p", Port: 12345, Protocol: "UDP"}) + }, + numErrs: 1, + }, { name: "valid 1", tweakSvc: func(s *api.Service) { @@ -1620,6 +1636,21 @@ func TestValidateService(t *testing.T) { }, numErrs: 0, }, + { + name: "valid external load balancer", + tweakSvc: func(s *api.Service) { + s.Spec.CreateExternalLoadBalancer = true + }, + numErrs: 0, + }, + { + name: "valid external load balancer 2 ports", + tweakSvc: func(s *api.Service) { + s.Spec.CreateExternalLoadBalancer = true + s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p", Port: 12345, Protocol: "TCP"}) + }, + numErrs: 0, + }, } for _, tc := range testCases { diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go index d77e6241d3e..9a4c98a0d2d 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -385,7 +385,6 @@ func (s *ServiceController) loadBalancerName(service *api.Service) string { return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) } -// TODO: Deduplicate this with the copy in pkg/registry/service/rest.go. func getTCPPorts(service *api.Service) ([]int, error) { ports := []int{} for i := range service.Spec.Ports { diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 5050eaa7096..fb5b1195ae8 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -94,14 +94,6 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return nil, err } - // Make sure that we'll be able to create a load balancer for the service, - // even though it'll be created by the ServiceController. - if service.Spec.CreateExternalLoadBalancer { - if _, err := getTCPPorts(service); err != nil { - return nil, err - } - } - releaseServiceIP := false defer func() { if releaseServiceIP { @@ -251,17 +243,3 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou } return nil, nil, fmt.Errorf("no endpoints available for %q", id) } - -// TODO: Deduplicate with the copy of this in pkg/registry/service/rest.go -func getTCPPorts(service *api.Service) ([]int, error) { - ports := []int{} - for i := range service.Spec.Ports { - sp := &service.Spec.Ports[i] - if sp.Protocol != api.ProtocolTCP { - // TODO: Support UDP here too. - return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") - } - ports = append(ports, sp.Port) - } - return ports, nil -} From fc08a0a71b7a8b00271da28c14c4d0897c502988 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Thu, 9 Apr 2015 20:48:27 +0000 Subject: [PATCH 6/6] Do service creation/update/deletion work in a pool of goroutines, protecting each service with a lock to ensure that no two goroutines will process a service at the same time. This is needed to avoid weird race conditions. --- .../app/controllermanager.go | 2 - pkg/cloudprovider/cloud.go | 2 + .../servicecontroller/servicecontroller.go | 172 +++++++++++------- .../servicecontroller_test.go | 3 +- 4 files changed, 107 insertions(+), 72 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 06f800bd48d..060c26239c0 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -50,7 +50,6 @@ type CMServer struct { ClientConfig client.Config CloudProvider string CloudConfigFile string - ClusterName string MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -102,7 +101,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") - fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index ddf74af7da0..f3c3b18491d 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -42,6 +42,8 @@ type Clusters interface { Master(clusterName string) (string, error) } +// TODO(#6812): Use a shorter name that's less likely to be longer than cloud +// providers' name length limits. func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string { return clusterName + "-" + serviceNamespace + "-" + serviceName } diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go index 9a4c98a0d2d..1d17323f10d 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -19,19 +19,25 @@ package servicecontroller import ( "fmt" "net" + "sort" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) const ( + workerGoroutines = 10 + clientRetryCount = 5 clientRetryInterval = 5 * time.Second @@ -39,14 +45,24 @@ const ( notRetryable = false ) +type cachedService struct { + service *api.Service + // Ensures only one goroutine can operate on this service at any given time. + mu sync.Mutex +} + +type serviceCache struct { + mu sync.Mutex // protects serviceMap + serviceMap map[string]*cachedService +} + type ServiceController struct { cloud cloudprovider.Interface kubeClient client.Interface clusterName string balancer cloudprovider.TCPLoadBalancer zone cloudprovider.Zone - mu sync.Mutex // protects serviceMap - serviceMap map[string]*api.Service // keys generated by cache.MetaNamespaceKeyFunc + cache *serviceCache } // New returns a new service controller to keep cloud provider service resources @@ -56,7 +72,7 @@ func New(cloud cloudprovider.Interface, kubeClient client.Interface, clusterName cloud: cloud, kubeClient: kubeClient, clusterName: clusterName, - serviceMap: make(map[string]*api.Service), + cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, } } @@ -75,7 +91,16 @@ func (s *ServiceController) Run() error { return fmt.Errorf("ServiceController only works with real Client objects, but was passed something else satisfying the client Interface.") } - go s.watchServices() + // Get the currently existing set of services and then all future creates + // and updates of services. + // No delta compressor is needed for the DeltaFIFO queue because we only ever + // care about the most recent state. + serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.cache) + lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) + cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() + for i := 0; i < workerGoroutines; i++ { + go s.watchServices(serviceQueue) + } return nil } @@ -102,15 +127,8 @@ func (s *ServiceController) init() error { return nil } -func (s *ServiceController) watchServices() { - // Get the currently existing set of services and then all future creates - // and updates of services. - // TODO: Add a compressor that intelligently squashes together updates? - keyLister := cache.KeyListerFunc(func() []string { return s.listKeys() }) - serviceQueue := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, keyLister) - lw := cache.NewListWatchFromClient(s.kubeClient.(*client.Client), "services", api.NamespaceAll, fields.Everything()) - cache.NewReflector(lw, &api.Service{}, serviceQueue, 0).Run() - // TODO: Add proper retries rather than just re-adding to the queue? +// Loop infinitely, processing all service updates provided by the queue. +func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { for { newItem := serviceQueue.Pop() deltas, ok := newItem.(cache.Deltas) @@ -129,7 +147,7 @@ func (s *ServiceController) watchServices() { time.Sleep(5 * time.Second) serviceQueue.AddIfNotPresent(deltas) } else if err != nil { - glog.Errorf("Failed to process service delta. Not retrying: %v", err) + util.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) } } } @@ -138,6 +156,8 @@ func (s *ServiceController) watchServices() { // indicator of whether the processing should be retried. func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { service, ok := delta.Object.(*api.Service) + var namespacedName types.NamespacedName + var cachedService *cachedService if !ok { // If the DeltaFIFO saw a key in our cache that it didn't know about, it // can send a deletion with an unknown state. Grab the service from our @@ -146,16 +166,25 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { if !ok { return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable } - service, ok = s.getService(key.Key) + cachedService, ok = s.cache.get(key.Key) if !ok { return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable } - delta.Object = service + namespacedName = types.NamespacedName{service.Namespace, service.Name} + service = cachedService.service + delta.Object = cachedService.service + } else { + namespacedName.Namespace = service.Namespace + namespacedName.Name = service.Name + cachedService = s.cache.getOrCreate(namespacedName.String()) } glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service) - // TODO: Make this more parallel. The only things that need to serialized - // are changes to services with the same namespace and name. + // Ensure that no other goroutine will interfere with our processing of the + // service. + cachedService.mu.Lock() + defer cachedService.mu.Unlock() + // TODO: Handle added, updated, and sync differently? switch delta.Type { case cache.Added: @@ -163,9 +192,19 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { case cache.Updated: fallthrough case cache.Sync: - return s.createLoadBalancerIfNeeded(service) + err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.service) + if err != nil { + return err, retry + } + // Always update the cache upon success + cachedService.service = service + s.cache.set(namespacedName.String(), cachedService) case cache.Deleted: - return s.handleDelete(service) + err := s.ensureLBDeleted(service) + if err != nil { + return err, retryable + } + s.cache.delete(namespacedName.String()) default: glog.Errorf("Unexpected delta type: %v", delta.Type) } @@ -174,18 +213,12 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // Returns whatever error occurred along with a boolean indicator of whether it // should be retried. -func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (error, bool) { - namespacedName, err := cache.MetaNamespaceKeyFunc(service) - if err != nil { - return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable - } - - cachedService, cached := s.getService(namespacedName) - if cached && !needsUpdate(cachedService, service) { +func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) { + if cachedService != nil && !needsUpdate(cachedService, service) { glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) return nil, notRetryable } - if cached { + if cachedService != nil { // If the service already exists but needs to be updated, delete it so that // we can recreate it cleanly. if cachedService.Spec.CreateExternalLoadBalancer { @@ -227,7 +260,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er // The load balancer doesn't exist yet, so create it. publicIPstring := fmt.Sprint(service.Spec.PublicIPs) - err = s.createExternalLoadBalancer(service) + err := s.createExternalLoadBalancer(service) if err != nil { return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable } @@ -236,7 +269,6 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er glog.Infof("Not persisting unchanged service to registry.") return nil, notRetryable } - s.setService(namespacedName, service) // If creating the load balancer succeeded, persist the updated service. if err = s.persistUpdate(service); err != nil { @@ -245,8 +277,6 @@ func (s *ServiceController) createLoadBalancerIfNeeded(service *api.Service) (er return nil, notRetryable } -// TODO(a-robinson): Handle repeated failures due to ResourceVersion changes or -// the object having been deleted. func (s *ServiceController) persistUpdate(service *api.Service) error { var err error for i := 0; i < clientRetryCount; i++ { @@ -254,6 +284,20 @@ func (s *ServiceController) persistUpdate(service *api.Service) error { if err == nil { return nil } + // If the object no longer exists, we don't want to recreate it. Just bail + // out so that we can process the delete, which we should soon be receiving + // if we haven't already. + if errors.IsNotFound(err) { + glog.Infof("Not persisting update to service that no longer exists: %v", err) + return nil + } + // TODO: Try to resolve the conflict if the change was unrelated to load + // balancers and public IPs. For now, just rely on the fact that we'll + // also process the update that caused the resource version to change. + if errors.IsConflict(err) { + glog.Infof("Not persisting update to service that has been changed since we received it: %v", err) + return nil + } glog.Warningf("Failed to persist updated PublicIPs to service %s after creating its external load balancer: %v", service.Name, err) time.Sleep(clientRetryInterval) @@ -266,7 +310,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err if err != nil { return err } - nodes, err := s.kubeClient.Nodes().List(labels.Everything()) + nodes, err := s.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) if err != nil { return err } @@ -294,24 +338,8 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err return nil } -// Returns whatever error occurred along with a boolean indicator of whether it -// should be retried. -func (s *ServiceController) handleDelete(service *api.Service) (error, bool) { - if err := s.ensureLBDeleted(service); err != nil { - return err, retryable - } - namespacedName, err := cache.MetaNamespaceKeyFunc(service) - if err != nil { - // This is panic-worthy, since the queue shouldn't have been able to - // handle the service if it couldn't generate a name for it. - return fmt.Errorf("Couldn't generate namespaced name for service: %v", err), notRetryable - } - s.deleteService(namespacedName) - return nil, notRetryable -} - // Ensures that the load balancer associated with the given service is deleted, -// doing the deletion if necessary. +// doing the deletion if necessary. Should always be retried upon failure. func (s *ServiceController) ensureLBDeleted(service *api.Service) error { // This is only needed because not all delete load balancer implementations // are currently idempotent to the LB not existing. @@ -327,9 +355,9 @@ func (s *ServiceController) ensureLBDeleted(service *api.Service) error { return nil } -// listKeys implements the interface required by DeltaFIFO to list the keys we +// ListKeys implements the interface required by DeltaFIFO to list the keys we // already know about. -func (s *ServiceController) listKeys() []string { +func (s *serviceCache) ListKeys() []string { s.mu.Lock() defer s.mu.Unlock() keys := make([]string, 0, len(s.serviceMap)) @@ -339,20 +367,31 @@ func (s *ServiceController) listKeys() []string { return keys } -func (s *ServiceController) getService(serviceName string) (*api.Service, bool) { +func (s *serviceCache) get(serviceName string) (*cachedService, bool) { s.mu.Lock() defer s.mu.Unlock() - info, ok := s.serviceMap[serviceName] - return info, ok + service, ok := s.serviceMap[serviceName] + return service, ok } -func (s *ServiceController) setService(serviceName string, info *api.Service) { +func (s *serviceCache) getOrCreate(serviceName string) *cachedService { s.mu.Lock() defer s.mu.Unlock() - s.serviceMap[serviceName] = info + service, ok := s.serviceMap[serviceName] + if !ok { + service = &cachedService{} + s.serviceMap[serviceName] = service + } + return service } -func (s *ServiceController) deleteService(serviceName string) { +func (s *serviceCache) set(serviceName string, service *cachedService) { + s.mu.Lock() + defer s.mu.Unlock() + s.serviceMap[serviceName] = service +} + +func (s *serviceCache) delete(serviceName string) { s.mu.Lock() defer s.mu.Unlock() delete(s.serviceMap, serviceName) @@ -379,10 +418,8 @@ func needsUpdate(oldService *api.Service, newService *api.Service) bool { return false } -// TODO: Use a shorter name that's less likely to be longer than cloud -// providers' length limits. func (s *ServiceController) loadBalancerName(service *api.Service) string { - return s.cloud.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) + return cloudprovider.GetLoadBalancerName(s.clusterName, service.Namespace, service.Name) } func getTCPPorts(service *api.Service) ([]int, error) { @@ -411,13 +448,10 @@ func portsEqual(x, y *api.Service) bool { if len(xPorts) != len(yPorts) { return false } - // Use a map for comparison since port slices aren't necessarily sorted. - xPortMap := make(map[int]bool) - for _, xPort := range xPorts { - xPortMap[xPort] = true - } - for _, yPort := range yPorts { - if !xPortMap[yPort] { + sort.Ints(xPorts) + sort.Ints(yPorts) + for i := range xPorts { + if xPorts[i] != yPorts[i] { return false } } diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go index 73aa5ff08b4..d1a5a46a817 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) const region = "us-central" @@ -89,7 +90,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) { controller.init() cloud.Calls = nil // ignore any cloud calls made in init() client.Actions = nil // ignore any client calls made in init() - err, _ := controller.createLoadBalancerIfNeeded(item.service) + err, _ := controller.createLoadBalancerIfNeeded(types.NamespacedName{"foo", "bar"}, item.service, nil) if !item.expectErr && err != nil { t.Errorf("unexpected error: %v", err) } else if item.expectErr && err == nil {