mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Update for external load balancer.
Implementation of updating external load balancer on service update. This partially fixes #4411.
This commit is contained in:
		@@ -104,45 +104,10 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
 | 
			
		||||
	// TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers
 | 
			
		||||
	// correctly no matter what http operations happen.
 | 
			
		||||
	if service.Spec.CreateExternalLoadBalancer {
 | 
			
		||||
		if rs.cloud == nil {
 | 
			
		||||
			return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
 | 
			
		||||
		}
 | 
			
		||||
		if service.Spec.Protocol != api.ProtocolTCP {
 | 
			
		||||
			// TODO: Support UDP here too.
 | 
			
		||||
			return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.")
 | 
			
		||||
		}
 | 
			
		||||
		balancer, ok := rs.cloud.TCPLoadBalancer()
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, fmt.Errorf("the cloud provider does not support external TCP load balancers.")
 | 
			
		||||
		}
 | 
			
		||||
		zones, ok := rs.cloud.Zones()
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, fmt.Errorf("the cloud provider does not support zone enumeration.")
 | 
			
		||||
		}
 | 
			
		||||
		hosts, err := rs.machines.ListMinions(ctx)
 | 
			
		||||
		err := rs.createExternalLoadBalancer(ctx, service)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		zone, err := zones.GetZone()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		var affinityType api.AffinityType = service.Spec.SessionAffinity
 | 
			
		||||
		if len(service.Spec.PublicIPs) > 0 {
 | 
			
		||||
			for _, publicIP := range service.Spec.PublicIPs {
 | 
			
		||||
				_, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					// TODO: have to roll-back any successful calls.
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			service.Spec.PublicIPs = []string{ip.String()}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := rs.registry.CreateService(ctx, service); err != nil {
 | 
			
		||||
@@ -166,7 +131,9 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP))
 | 
			
		||||
	rs.deleteExternalLoadBalancer(service)
 | 
			
		||||
	if service.Spec.CreateExternalLoadBalancer {
 | 
			
		||||
		rs.deleteExternalLoadBalancer(service)
 | 
			
		||||
	}
 | 
			
		||||
	return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -224,7 +191,22 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
 | 
			
		||||
	if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 {
 | 
			
		||||
		return nil, false, errors.NewInvalid("service", service.Name, errs)
 | 
			
		||||
	}
 | 
			
		||||
	// TODO: check to see if external load balancer status changed
 | 
			
		||||
	// Recreate external load balancer if changed.
 | 
			
		||||
	if externalLoadBalancerNeedsUpdate(oldService, service) {
 | 
			
		||||
		// TODO: support updating existing balancers
 | 
			
		||||
		if oldService.Spec.CreateExternalLoadBalancer {
 | 
			
		||||
			err = rs.deleteExternalLoadBalancer(oldService)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, false, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if service.Spec.CreateExternalLoadBalancer {
 | 
			
		||||
			err = rs.createExternalLoadBalancer(ctx, service)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, false, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	err = rs.registry.UpdateService(ctx, service)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, false, err
 | 
			
		||||
@@ -247,9 +229,53 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
 | 
			
		||||
	return e.Endpoints[rand.Intn(len(e.Endpoints))], nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {
 | 
			
		||||
	if rs.cloud == nil {
 | 
			
		||||
		return fmt.Errorf("requested an external service, but no cloud provider supplied.")
 | 
			
		||||
	}
 | 
			
		||||
	if service.Spec.Protocol != api.ProtocolTCP {
 | 
			
		||||
		// TODO: Support UDP here too.
 | 
			
		||||
		return fmt.Errorf("external load balancers for non TCP services are not currently supported.")
 | 
			
		||||
	}
 | 
			
		||||
	balancer, ok := rs.cloud.TCPLoadBalancer()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return fmt.Errorf("the cloud provider does not support external TCP load balancers.")
 | 
			
		||||
	}
 | 
			
		||||
	zones, ok := rs.cloud.Zones()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return fmt.Errorf("the cloud provider does not support zone enumeration.")
 | 
			
		||||
	}
 | 
			
		||||
	hosts, err := rs.machines.ListMinions(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	zone, err := zones.GetZone()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	// TODO: We should be able to rely on valid input, and not do defaulting here.
 | 
			
		||||
	var affinityType api.AffinityType = service.Spec.SessionAffinity
 | 
			
		||||
	if len(service.Spec.PublicIPs) > 0 {
 | 
			
		||||
		for _, publicIP := range service.Spec.PublicIPs {
 | 
			
		||||
			_, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				// TODO: have to roll-back any successful calls.
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		service.Spec.PublicIPs = []string{ip.String()}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rs *REST) deleteExternalLoadBalancer(service *api.Service) error {
 | 
			
		||||
	if !service.Spec.CreateExternalLoadBalancer || rs.cloud == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	if rs.cloud == nil {
 | 
			
		||||
		return fmt.Errorf("requested an external service, but no cloud provider supplied.")
 | 
			
		||||
	}
 | 
			
		||||
	zones, ok := rs.cloud.Zones()
 | 
			
		||||
	if !ok {
 | 
			
		||||
@@ -272,3 +298,24 @@ func (rs *REST) deleteExternalLoadBalancer(service *api.Service) error {
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func externalLoadBalancerNeedsUpdate(old, new *api.Service) bool {
 | 
			
		||||
	if !old.Spec.CreateExternalLoadBalancer && !new.Spec.CreateExternalLoadBalancer {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	if old.Spec.CreateExternalLoadBalancer != new.Spec.CreateExternalLoadBalancer ||
 | 
			
		||||
		old.Spec.Port != new.Spec.Port ||
 | 
			
		||||
		old.Spec.SessionAffinity != new.Spec.SessionAffinity ||
 | 
			
		||||
		old.Spec.Protocol != new.Spec.Protocol {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	if len(old.Spec.PublicIPs) != len(new.Spec.PublicIPs) {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	for i := range old.Spec.PublicIPs {
 | 
			
		||||
		if old.Spec.PublicIPs[i] != new.Spec.PublicIPs[i] {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -302,6 +302,50 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestServiceRegistryUpdateExternalService(t *testing.T) {
 | 
			
		||||
	ctx := api.NewDefaultContext()
 | 
			
		||||
	registry := registrytest.NewServiceRegistry()
 | 
			
		||||
	fakeCloud := &cloud.FakeCloud{}
 | 
			
		||||
	machines := []string{"foo", "bar", "baz"}
 | 
			
		||||
	storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
 | 
			
		||||
 | 
			
		||||
	// Create non-external load balancer.
 | 
			
		||||
	svc1 := &api.Service{
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{Name: "foo"},
 | 
			
		||||
		Spec: api.ServiceSpec{
 | 
			
		||||
			Port:                       6502,
 | 
			
		||||
			Selector:                   map[string]string{"bar": "baz"},
 | 
			
		||||
			CreateExternalLoadBalancer: false,
 | 
			
		||||
			Protocol:                   api.ProtocolTCP,
 | 
			
		||||
			SessionAffinity:            api.AffinityTypeNone,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	storage.Create(ctx, svc1)
 | 
			
		||||
	if len(fakeCloud.Calls) != 0 {
 | 
			
		||||
		t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Modify load balancer to be external.
 | 
			
		||||
	svc2 := new(api.Service)
 | 
			
		||||
	*svc2 = *svc1
 | 
			
		||||
	svc2.Spec.CreateExternalLoadBalancer = true
 | 
			
		||||
	storage.Update(ctx, svc2)
 | 
			
		||||
	if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
 | 
			
		||||
		t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Change port.
 | 
			
		||||
	svc3 := new(api.Service)
 | 
			
		||||
	*svc3 = *svc2
 | 
			
		||||
	svc3.Spec.Port = 6504
 | 
			
		||||
	storage.Update(ctx, svc3)
 | 
			
		||||
	if len(fakeCloud.Calls) != 6 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" ||
 | 
			
		||||
		fakeCloud.Calls[2] != "get-zone" || fakeCloud.Calls[3] != "delete" ||
 | 
			
		||||
		fakeCloud.Calls[4] != "get-zone" || fakeCloud.Calls[5] != "create" {
 | 
			
		||||
		t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestServiceRegistryGet(t *testing.T) {
 | 
			
		||||
	ctx := api.NewDefaultContext()
 | 
			
		||||
	registry := registrytest.NewServiceRegistry()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user