mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1072 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1072 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2016 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package service
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"reflect"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 	"k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/conversion"
 | |
| 	pkgruntime "k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/apimachinery/pkg/watch"
 | |
| 	cache "k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/client-go/util/workqueue"
 | |
| 	v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | |
| 	federationcache "k8s.io/kubernetes/federation/client/cache"
 | |
| 	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
 | |
| 	"k8s.io/kubernetes/federation/pkg/dnsprovider"
 | |
| 	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | |
| 	fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | |
| 	"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
 | |
| 	"k8s.io/kubernetes/pkg/api"
 | |
| 	v1 "k8s.io/kubernetes/pkg/api/v1"
 | |
| 	kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | |
| 	"k8s.io/kubernetes/pkg/client/legacylisters"
 | |
| 	"k8s.io/kubernetes/pkg/client/record"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	serviceSyncPeriod = 10 * time.Minute
 | |
| 	clusterSyncPeriod = 10 * time.Minute
 | |
| 
 | |
| 	// How long to wait before retrying the processing of a service change.
 | |
| 	// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
 | |
| 	// should be changed appropriately.
 | |
| 	minRetryDelay = 5 * time.Second
 | |
| 	maxRetryDelay = 300 * time.Second
 | |
| 
 | |
| 	// client retry count and interval is when accessing a remote kube-apiserver or federation apiserver
 | |
| 	// how many times should be attempted and how long it should sleep when failure occurs
 | |
| 	// the retry should be in short time so no exponential backoff
 | |
| 	clientRetryCount = 5
 | |
| 
 | |
| 	retryable = true
 | |
| 
 | |
| 	doNotRetry = time.Duration(0)
 | |
| 
 | |
| 	UserAgentName = "federation-service-controller"
 | |
| 	KubeAPIQPS    = 20.0
 | |
| 	KubeAPIBurst  = 30
 | |
| 
 | |
| 	maxNoOfClusters = 100
 | |
| 
 | |
| 	updateTimeout         = 30 * time.Second
 | |
| 	allClustersKey        = "ALL_CLUSTERS"
 | |
| 	clusterAvailableDelay = time.Second * 20
 | |
| )
 | |
| 
 | |
| type cachedService struct {
 | |
| 	lastState *v1.Service
 | |
| 	// The state as successfully applied to the DNS server
 | |
| 	appliedState *v1.Service
 | |
| 	// cluster endpoint map hold subset info from kubernetes clusters
 | |
| 	// key clusterName
 | |
| 	// value is a flag that if there is ready address, 1 means there is ready address
 | |
| 	endpointMap map[string]int
 | |
| 	// serviceStatusMap map holds service status info from kubernetes clusters, keyed on clusterName
 | |
| 	serviceStatusMap map[string]v1.LoadBalancerStatus
 | |
| 	// Ensures only one goroutine can operate on this service at any given time.
 | |
| 	rwlock sync.Mutex
 | |
| 	// Controls error back-off for proceeding federation service to k8s clusters
 | |
| 	lastRetryDelay time.Duration
 | |
| 	// Controls error back-off for updating federation service back to federation apiserver
 | |
| 	lastFedUpdateDelay time.Duration
 | |
| 	// Controls error back-off for dns record update
 | |
| 	lastDNSUpdateDelay time.Duration
 | |
| }
 | |
| 
 | |
| type serviceCache struct {
 | |
| 	rwlock sync.Mutex // protects serviceMap
 | |
| 	// federation service map contains all service received from federation apiserver
 | |
| 	// key serviceName
 | |
| 	fedServiceMap map[string]*cachedService
 | |
| }
 | |
| 
 | |
| type ServiceController struct {
 | |
| 	dns              dnsprovider.Interface
 | |
| 	federationClient fedclientset.Interface
 | |
| 	federationName   string
 | |
| 	// serviceDnsSuffix is the DNS suffix we use when publishing service DNS names
 | |
| 	serviceDnsSuffix string
 | |
| 	// zoneName and zoneID are used to identify the zone in which to put records
 | |
| 	zoneName string
 | |
| 	zoneID   string
 | |
| 	// each federation should be configured with a single zone (e.g. "mycompany.com")
 | |
| 	dnsZones     dnsprovider.Zones
 | |
| 	serviceCache *serviceCache
 | |
| 	clusterCache *clusterClientCache
 | |
| 	// A store of services, populated by the serviceController
 | |
| 	serviceStore listers.StoreToServiceLister
 | |
| 	// Watches changes to all services
 | |
| 	serviceController cache.Controller
 | |
| 	federatedInformer fedutil.FederatedInformer
 | |
| 	// A store of services, populated by the serviceController
 | |
| 	clusterStore federationcache.StoreToClusterLister
 | |
| 	// Watches changes to all services
 | |
| 	clusterController cache.Controller
 | |
| 	eventBroadcaster  record.EventBroadcaster
 | |
| 	eventRecorder     record.EventRecorder
 | |
| 	// services that need to be synced
 | |
| 	queue           *workqueue.Type
 | |
| 	knownClusterSet sets.String
 | |
| 	// endpoint worker map contains all the clusters registered with an indication that worker exist
 | |
| 	// key clusterName
 | |
| 	endpointWorkerMap map[string]bool
 | |
| 	// channel for worker to signal that it is going out of existence
 | |
| 	endpointWorkerDoneChan chan string
 | |
| 	// service worker map contains all the clusters registered with an indication that worker exist
 | |
| 	// key clusterName
 | |
| 	serviceWorkerMap map[string]bool
 | |
| 	// channel for worker to signal that it is going out of existence
 | |
| 	serviceWorkerDoneChan chan string
 | |
| 
 | |
| 	// For triggering all services reconciliation. This is used when
 | |
| 	// a new cluster becomes available.
 | |
| 	clusterDeliverer *util.DelayingDeliverer
 | |
| 
 | |
| 	deletionHelper *deletionhelper.DeletionHelper
 | |
| }
 | |
| 
 | |
| // New returns a new service controller to keep DNS provider service resources
 | |
| // (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
 | |
| 
 | |
| func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | |
| 	federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
 | |
| 	broadcaster := record.NewBroadcaster()
 | |
| 	// federationClient event is not supported yet
 | |
| 	// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
 | |
| 	recorder := broadcaster.NewRecorder(v1.EventSource{Component: UserAgentName})
 | |
| 
 | |
| 	s := &ServiceController{
 | |
| 		dns:              dns,
 | |
| 		federationClient: federationClient,
 | |
| 		federationName:   federationName,
 | |
| 		serviceDnsSuffix: serviceDnsSuffix,
 | |
| 		zoneName:         zoneName,
 | |
| 		zoneID:           zoneID,
 | |
| 		serviceCache:     &serviceCache{fedServiceMap: make(map[string]*cachedService)},
 | |
| 		clusterCache: &clusterClientCache{
 | |
| 			rwlock:    sync.Mutex{},
 | |
| 			clientMap: make(map[string]*clusterCache),
 | |
| 		},
 | |
| 		eventBroadcaster: broadcaster,
 | |
| 		eventRecorder:    recorder,
 | |
| 		queue:            workqueue.New(),
 | |
| 		knownClusterSet:  make(sets.String),
 | |
| 	}
 | |
| 	s.clusterDeliverer = util.NewDelayingDeliverer()
 | |
| 	s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
 | |
| 		&cache.ListWatch{
 | |
| 			ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
 | |
| 				return s.federationClient.Core().Services(metav1.NamespaceAll).List(options)
 | |
| 			},
 | |
| 			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | |
| 				return s.federationClient.Core().Services(metav1.NamespaceAll).Watch(options)
 | |
| 			},
 | |
| 		},
 | |
| 		&v1.Service{},
 | |
| 		serviceSyncPeriod,
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: s.enqueueService,
 | |
| 			UpdateFunc: func(old, cur interface{}) {
 | |
| 				// there is case that old and new are equals but we still catch the event now.
 | |
| 				if !reflect.DeepEqual(old, cur) {
 | |
| 					s.enqueueService(cur)
 | |
| 				}
 | |
| 			},
 | |
| 			DeleteFunc: s.enqueueService,
 | |
| 		},
 | |
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
 | |
| 	)
 | |
| 	s.clusterStore.Store, s.clusterController = cache.NewInformer(
 | |
| 		&cache.ListWatch{
 | |
| 			ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
 | |
| 				return s.federationClient.Federation().Clusters().List(options)
 | |
| 			},
 | |
| 			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | |
| 				return s.federationClient.Federation().Clusters().Watch(options)
 | |
| 			},
 | |
| 		},
 | |
| 		&v1beta1.Cluster{},
 | |
| 		clusterSyncPeriod,
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			DeleteFunc: s.clusterCache.delFromClusterSet,
 | |
| 			AddFunc:    s.clusterCache.addToClientMap,
 | |
| 			UpdateFunc: func(old, cur interface{}) {
 | |
| 				oldCluster, ok := old.(*v1beta1.Cluster)
 | |
| 				if !ok {
 | |
| 					return
 | |
| 				}
 | |
| 				curCluster, ok := cur.(*v1beta1.Cluster)
 | |
| 				if !ok {
 | |
| 					return
 | |
| 				}
 | |
| 				if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
 | |
| 					// update when spec is changed
 | |
| 					s.clusterCache.addToClientMap(cur)
 | |
| 				}
 | |
| 
 | |
| 				pred := getClusterConditionPredicate()
 | |
| 				// only update when condition changed to ready from not-ready
 | |
| 				if !pred(*oldCluster) && pred(*curCluster) {
 | |
| 					s.clusterCache.addToClientMap(cur)
 | |
| 				}
 | |
| 				// did not handle ready -> not-ready
 | |
| 				// how could we stop a controller?
 | |
| 			},
 | |
| 		},
 | |
| 	)
 | |
| 
 | |
| 	clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
 | |
| 		ClusterAvailable: func(cluster *v1beta1.Cluster) {
 | |
| 			s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
 | |
| 		},
 | |
| 	}
 | |
| 	fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
 | |
| 		return cache.NewInformer(
 | |
| 			&cache.ListWatch{
 | |
| 				ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
 | |
| 					return targetClient.Core().Services(metav1.NamespaceAll).List(options)
 | |
| 				},
 | |
| 				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | |
| 					return targetClient.Core().Services(metav1.NamespaceAll).Watch(options)
 | |
| 				},
 | |
| 			},
 | |
| 			&v1.Service{},
 | |
| 			controller.NoResyncPeriodFunc(),
 | |
| 			// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
 | |
| 			// would be just confirmation that some service operation succeeded.
 | |
| 			util.NewTriggerOnAllChanges(
 | |
| 				func(obj pkgruntime.Object) {
 | |
| 					// TODO: Use this to enque services.
 | |
| 				},
 | |
| 			))
 | |
| 	}
 | |
| 
 | |
| 	s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
 | |
| 
 | |
| 	federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer,
 | |
| 		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | |
| 			svc := obj.(*v1.Service)
 | |
| 			_, err := client.Core().Services(svc.Namespace).Create(svc)
 | |
| 			return err
 | |
| 		},
 | |
| 		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | |
| 			svc := obj.(*v1.Service)
 | |
| 			_, err := client.Core().Services(svc.Namespace).Update(svc)
 | |
| 			return err
 | |
| 		},
 | |
| 		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | |
| 			svc := obj.(*v1.Service)
 | |
| 			err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{})
 | |
| 			return err
 | |
| 		})
 | |
| 
 | |
| 	s.deletionHelper = deletionhelper.NewDeletionHelper(
 | |
| 		s.hasFinalizerFunc,
 | |
| 		s.removeFinalizerFunc,
 | |
| 		s.addFinalizerFunc,
 | |
| 		// objNameFunc
 | |
| 		func(obj pkgruntime.Object) string {
 | |
| 			service := obj.(*v1.Service)
 | |
| 			return service.Name
 | |
| 		},
 | |
| 		updateTimeout,
 | |
| 		s.eventRecorder,
 | |
| 		s.federatedInformer,
 | |
| 		federatedUpdater,
 | |
| 	)
 | |
| 
 | |
| 	s.endpointWorkerMap = make(map[string]bool)
 | |
| 	s.serviceWorkerMap = make(map[string]bool)
 | |
| 	s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
 | |
| 	s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // Returns true if the given object has the given finalizer in its ObjectMeta.
 | |
| func (s *ServiceController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool {
 | |
| 	service := obj.(*v1.Service)
 | |
| 	for i := range service.ObjectMeta.Finalizers {
 | |
| 		if string(service.ObjectMeta.Finalizers[i]) == finalizer {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // Removes the finalizer from the given objects ObjectMeta.
 | |
| // Assumes that the given object is a service.
 | |
| func (s *ServiceController) removeFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) {
 | |
| 	service := obj.(*v1.Service)
 | |
| 	newFinalizers := []string{}
 | |
| 	hasFinalizer := false
 | |
| 	for i := range service.ObjectMeta.Finalizers {
 | |
| 		if string(service.ObjectMeta.Finalizers[i]) != finalizer {
 | |
| 			newFinalizers = append(newFinalizers, service.ObjectMeta.Finalizers[i])
 | |
| 		} else {
 | |
| 			hasFinalizer = true
 | |
| 		}
 | |
| 	}
 | |
| 	if !hasFinalizer {
 | |
| 		// Nothing to do.
 | |
| 		return obj, nil
 | |
| 	}
 | |
| 	service.ObjectMeta.Finalizers = newFinalizers
 | |
| 	service, err := s.federationClient.Core().Services(service.Namespace).Update(service)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to remove finalizer %s from service %s: %v", finalizer, service.Name, err)
 | |
| 	}
 | |
| 	return service, nil
 | |
| }
 | |
| 
 | |
| // Adds the given finalizer to the given objects ObjectMeta.
 | |
| // Assumes that the given object is a service.
 | |
| func (s *ServiceController) addFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) {
 | |
| 	service := obj.(*v1.Service)
 | |
| 	service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, finalizer)
 | |
| 	service, err := s.federationClient.Core().Services(service.Namespace).Update(service)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to add finalizer %s to service %s: %v", finalizer, service.Name, err)
 | |
| 	}
 | |
| 	return service, nil
 | |
| }
 | |
| 
 | |
| // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
 | |
| func (s *ServiceController) enqueueService(obj interface{}) {
 | |
| 	key, err := controller.KeyFunc(obj)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
 | |
| 		return
 | |
| 	}
 | |
| 	s.queue.Add(key)
 | |
| }
 | |
| 
 | |
| // Run starts a background goroutine that watches for changes to federation services
 | |
| // and ensures that they have Kubernetes services created, updated or deleted appropriately.
 | |
| // federationSyncPeriod controls how often we check the federation's services to
 | |
| // ensure that the correct Kubernetes services (and associated DNS entries) exist.
 | |
| // This is only necessary to fudge over failed watches.
 | |
| // clusterSyncPeriod controls how often we check the federation's underlying clusters and
 | |
| // their Kubernetes services to ensure that matching services created independently of the Federation
 | |
| // (e.g. directly via the underlying cluster's API) are correctly accounted for.
 | |
| 
 | |
| // It's an error to call Run() more than once for a given ServiceController
 | |
| // object.
 | |
| func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
 | |
| 	if err := s.init(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer runtime.HandleCrash()
 | |
| 	s.federatedInformer.Start()
 | |
| 	s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
 | |
| 		// TODO: Use this new clusterDeliverer to reconcile services in new clusters.
 | |
| 	})
 | |
| 	go s.serviceController.Run(stopCh)
 | |
| 	go s.clusterController.Run(stopCh)
 | |
| 	for i := 0; i < workers; i++ {
 | |
| 		go wait.Until(s.fedServiceWorker, time.Second, stopCh)
 | |
| 	}
 | |
| 	go wait.Until(s.clusterEndpointWorker, time.Second, stopCh)
 | |
| 	go wait.Until(s.clusterServiceWorker, time.Second, stopCh)
 | |
| 	go wait.Until(s.clusterSyncLoop, time.Second, stopCh)
 | |
| 	<-stopCh
 | |
| 	glog.Infof("Shutting down Federation Service Controller")
 | |
| 	s.queue.ShutDown()
 | |
| 	s.federatedInformer.Stop()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) init() error {
 | |
| 	if s.federationName == "" {
 | |
| 		return fmt.Errorf("ServiceController should not be run without federationName.")
 | |
| 	}
 | |
| 	if s.zoneName == "" && s.zoneID == "" {
 | |
| 		return fmt.Errorf("ServiceController must be run with either zoneName or zoneID.")
 | |
| 	}
 | |
| 	if s.serviceDnsSuffix == "" {
 | |
| 		// TODO: Is this the right place to do defaulting?
 | |
| 		if s.zoneName == "" {
 | |
| 			return fmt.Errorf("ServiceController must be run with zoneName, if serviceDnsSuffix is not set.")
 | |
| 		}
 | |
| 		s.serviceDnsSuffix = s.zoneName
 | |
| 	}
 | |
| 	if s.dns == nil {
 | |
| 		return fmt.Errorf("ServiceController should not be run without a dnsprovider.")
 | |
| 	}
 | |
| 	zones, ok := s.dns.Zones()
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records.")
 | |
| 	}
 | |
| 	s.dnsZones = zones
 | |
| 	matchingZones, err := getDnsZones(s.zoneName, s.zoneID, s.dnsZones)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error querying for DNS zones: %v", err)
 | |
| 	}
 | |
| 	if len(matchingZones) == 0 {
 | |
| 		if s.zoneName == "" {
 | |
| 			return fmt.Errorf("ServiceController must be run with zoneName to create zone automatically.")
 | |
| 		}
 | |
| 		glog.Infof("DNS zone %q not found.  Creating DNS zone %q.", s.zoneName, s.zoneName)
 | |
| 		managedZone, err := s.dnsZones.New(s.zoneName)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		zone, err := s.dnsZones.Add(managedZone)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		glog.Infof("DNS zone %q successfully created.  Note that DNS resolution will not work until you have registered this name with "+
 | |
| 			"a DNS registrar and they have changed the authoritative name servers for your domain to point to your DNS provider.", zone.Name())
 | |
| 	}
 | |
| 	if len(matchingZones) > 1 {
 | |
| 		return fmt.Errorf("Multiple matching DNS zones found for %q; please specify zoneID", s.zoneName)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
 | |
| // It enforces that the syncService is never invoked concurrently with the same key.
 | |
| func (s *ServiceController) fedServiceWorker() {
 | |
| 	for {
 | |
| 		func() {
 | |
| 			key, quit := s.queue.Get()
 | |
| 			if quit {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			defer s.queue.Done(key)
 | |
| 			err := s.syncService(key.(string))
 | |
| 			if err != nil {
 | |
| 				glog.Errorf("Error syncing service: %v", err)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func wantsDNSRecords(service *v1.Service) bool {
 | |
| 	return service.Spec.Type == v1.ServiceTypeLoadBalancer
 | |
| }
 | |
| 
 | |
| // processServiceForCluster creates or updates service to all registered running clusters,
 | |
| // update DNS records and update the service info with DNS entries to federation apiserver.
 | |
| // the function returns any error caught
 | |
| func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
 | |
| 	glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
 | |
| 	// Create or Update k8s Service
 | |
| 	err := s.ensureClusterService(cachedService, clusterName, service, client)
 | |
| 	if err != nil {
 | |
| 		glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
 | |
| 		return err
 | |
| 	}
 | |
| 	glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // updateFederationService Returns whatever error occurred along with a boolean indicator of whether it
 | |
| // should be retried.
 | |
| func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) {
 | |
| 	// Clone federation service, and create them in underlying k8s cluster
 | |
| 	desiredService := &v1.Service{
 | |
| 		ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta),
 | |
| 		Spec:       *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)),
 | |
| 	}
 | |
| 
 | |
| 	// handle available clusters one by one
 | |
| 	var hasErr bool
 | |
| 	for clusterName, cache := range s.clusterCache.clientMap {
 | |
| 		go func(cache *clusterCache, clusterName string) {
 | |
| 			err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
 | |
| 			if err != nil {
 | |
| 				hasErr = true
 | |
| 			}
 | |
| 		}(cache, clusterName)
 | |
| 	}
 | |
| 	if hasErr {
 | |
| 		// detail error has been dumped inside the loop
 | |
| 		return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable
 | |
| 	}
 | |
| 	return nil, !retryable
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
 | |
| 	var err error
 | |
| 	var needUpdate bool
 | |
| 	for i := 0; i < clientRetryCount; i++ {
 | |
| 		svc, err := client.Core().Services(service.Namespace).Get(service.Name, metav1.GetOptions{})
 | |
| 		if err == nil {
 | |
| 			// service exists
 | |
| 			glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
 | |
| 			//reserve immutable fields
 | |
| 			service.Spec.ClusterIP = svc.Spec.ClusterIP
 | |
| 
 | |
| 			//reserve auto assigned field
 | |
| 			for i, oldPort := range svc.Spec.Ports {
 | |
| 				for _, port := range service.Spec.Ports {
 | |
| 					if port.NodePort == 0 {
 | |
| 						if !portEqualExcludeNodePort(&oldPort, &port) {
 | |
| 							svc.Spec.Ports[i] = port
 | |
| 							needUpdate = true
 | |
| 						}
 | |
| 					} else {
 | |
| 						if !portEqualForLB(&oldPort, &port) {
 | |
| 							svc.Spec.Ports[i] = port
 | |
| 							needUpdate = true
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if needUpdate {
 | |
| 				// we only apply spec update
 | |
| 				svc.Spec = service.Spec
 | |
| 				_, err = client.Core().Services(svc.Namespace).Update(svc)
 | |
| 				if err == nil {
 | |
| 					glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName)
 | |
| 					return nil
 | |
| 				} else {
 | |
| 					glog.V(4).Infof("Failed to update %+v", err)
 | |
| 				}
 | |
| 			} else {
 | |
| 				glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName)
 | |
| 				return nil
 | |
| 			}
 | |
| 		} else if errors.IsNotFound(err) {
 | |
| 			// Create service if it is not found
 | |
| 			glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new",
 | |
| 				service.Namespace, service.Name, clusterName)
 | |
| 			service.ResourceVersion = ""
 | |
| 			_, err = client.Core().Services(service.Namespace).Create(service)
 | |
| 			if err == nil {
 | |
| 				glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName)
 | |
| 				return nil
 | |
| 			}
 | |
| 			glog.V(4).Infof("Failed to create %+v", err)
 | |
| 			if errors.IsAlreadyExists(err) {
 | |
| 				glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName)
 | |
| 				return nil
 | |
| 			}
 | |
| 		}
 | |
| 		if errors.IsConflict(err) {
 | |
| 			glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
 | |
| 				service.Namespace, service.Name, err)
 | |
| 		}
 | |
| 		// should we reuse same retry delay for all clusters?
 | |
| 		time.Sleep(cachedService.nextRetryDelay())
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) allServices() []*cachedService {
 | |
| 	s.rwlock.Lock()
 | |
| 	defer s.rwlock.Unlock()
 | |
| 	services := make([]*cachedService, 0, len(s.fedServiceMap))
 | |
| 	for _, v := range s.fedServiceMap {
 | |
| 		services = append(services, v)
 | |
| 	}
 | |
| 	return services
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
 | |
| 	s.rwlock.Lock()
 | |
| 	defer s.rwlock.Unlock()
 | |
| 	service, ok := s.fedServiceMap[serviceName]
 | |
| 	return service, ok
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
 | |
| 	s.rwlock.Lock()
 | |
| 	defer s.rwlock.Unlock()
 | |
| 	service, ok := s.fedServiceMap[serviceName]
 | |
| 	if !ok {
 | |
| 		service = &cachedService{
 | |
| 			endpointMap:      make(map[string]int),
 | |
| 			serviceStatusMap: make(map[string]v1.LoadBalancerStatus),
 | |
| 		}
 | |
| 		s.fedServiceMap[serviceName] = service
 | |
| 	}
 | |
| 	return service
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) set(serviceName string, service *cachedService) {
 | |
| 	s.rwlock.Lock()
 | |
| 	defer s.rwlock.Unlock()
 | |
| 	s.fedServiceMap[serviceName] = service
 | |
| }
 | |
| 
 | |
| func (s *serviceCache) delete(serviceName string) {
 | |
| 	s.rwlock.Lock()
 | |
| 	defer s.rwlock.Unlock()
 | |
| 	delete(s.fedServiceMap, serviceName)
 | |
| }
 | |
| 
 | |
| // needsUpdateDNS check if the dns records of the given service should be updated
 | |
| func (s *ServiceController) needsUpdateDNS(oldService *v1.Service, newService *v1.Service) bool {
 | |
| 	if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) {
 | |
| 		return false
 | |
| 	}
 | |
| 	if wantsDNSRecords(oldService) != wantsDNSRecords(newService) {
 | |
| 		s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
 | |
| 			oldService.Spec.Type, newService.Spec.Type)
 | |
| 		return true
 | |
| 	}
 | |
| 	if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
 | |
| 		return true
 | |
| 	}
 | |
| 	if !LoadBalancerIPsAreEqual(oldService, newService) {
 | |
| 		s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
 | |
| 			oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
 | |
| 		return true
 | |
| 	}
 | |
| 	if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
 | |
| 		s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
 | |
| 			len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
 | |
| 		return true
 | |
| 	}
 | |
| 	for i := range oldService.Spec.ExternalIPs {
 | |
| 		if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
 | |
| 			s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
 | |
| 				newService.Spec.ExternalIPs[i])
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
 | |
| 		return true
 | |
| 	}
 | |
| 	if oldService.UID != newService.UID {
 | |
| 		s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
 | |
| 			oldService.UID, newService.UID)
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
 | |
| 	// TODO: quinton: Probably applies for DNS SVC records.  Come back to this.
 | |
| 	//var protocol api.Protocol
 | |
| 
 | |
| 	ports := []*v1.ServicePort{}
 | |
| 	for i := range service.Spec.Ports {
 | |
| 		sp := &service.Spec.Ports[i]
 | |
| 		// The check on protocol was removed here.  The DNS provider itself is now responsible for all protocol validation
 | |
| 		ports = append(ports, sp)
 | |
| 	}
 | |
| 	return ports, nil
 | |
| }
 | |
| 
 | |
| func portsEqualForLB(x, y *v1.Service) bool {
 | |
| 	xPorts, err := getPortsForLB(x)
 | |
| 	if err != nil {
 | |
| 		return false
 | |
| 	}
 | |
| 	yPorts, err := getPortsForLB(y)
 | |
| 	if err != nil {
 | |
| 		return false
 | |
| 	}
 | |
| 	return portSlicesEqualForLB(xPorts, yPorts)
 | |
| }
 | |
| 
 | |
| func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
 | |
| 	if len(x) != len(y) {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	for i := range x {
 | |
| 		if !portEqualForLB(x[i], y[i]) {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func portEqualForLB(x, y *v1.ServicePort) bool {
 | |
| 	// TODO: Should we check name?  (In theory, an LB could expose it)
 | |
| 	if x.Name != y.Name {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.Protocol != y.Protocol {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.Port != y.Port {
 | |
| 		return false
 | |
| 	}
 | |
| 	if x.NodePort != y.NodePort {
 | |
| 		return false
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func portEqualExcludeNodePort(x, y *v1.ServicePort) bool {
 | |
| 	// TODO: Should we check name?  (In theory, an LB could expose it)
 | |
| 	if x.Name != y.Name {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.Protocol != y.Protocol {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if x.Port != y.Port {
 | |
| 		return false
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func clustersFromList(list *v1beta1.ClusterList) []string {
 | |
| 	result := []string{}
 | |
| 	for ix := range list.Items {
 | |
| 		result = append(result, list.Items[ix].Name)
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| // getClusterConditionPredicate filter all clusters meet condition of
 | |
| // condition.type=Ready and condition.status=true
 | |
| func getClusterConditionPredicate() federationcache.ClusterConditionPredicate {
 | |
| 	return func(cluster v1beta1.Cluster) bool {
 | |
| 		// If we have no info, don't accept
 | |
| 		if len(cluster.Status.Conditions) == 0 {
 | |
| 			return false
 | |
| 		}
 | |
| 		for _, cond := range cluster.Status.Conditions {
 | |
| 			//We consider the cluster for load balancing only when its ClusterReady condition status
 | |
| 			//is ConditionTrue
 | |
| 			if cond.Type == v1beta1.ClusterReady && cond.Status != v1.ConditionTrue {
 | |
| 				glog.V(4).Infof("Ignoring cluster %v with %v condition status %v", cluster.Name, cond.Type, cond.Status)
 | |
| 				return false
 | |
| 			}
 | |
| 		}
 | |
| 		return true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // clusterSyncLoop observes running clusters changes, and apply all services to new added cluster
 | |
| // and add dns records for the changes
 | |
| func (s *ServiceController) clusterSyncLoop() {
 | |
| 	var servicesToUpdate []*cachedService
 | |
| 	// should we remove cache for cluster from ready to not ready? should remove the condition predicate if no
 | |
| 	clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List()
 | |
| 	if err != nil {
 | |
| 		glog.Infof("Fail to get cluster list")
 | |
| 		return
 | |
| 	}
 | |
| 	newClusters := clustersFromList(&clusters)
 | |
| 	var newSet, increase sets.String
 | |
| 	newSet = sets.NewString(newClusters...)
 | |
| 	if newSet.Equal(s.knownClusterSet) {
 | |
| 		// The set of cluster names in the services in the federation hasn't changed, but we can retry
 | |
| 		// updating any services that we failed to update last time around.
 | |
| 		servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
 | |
| 		return
 | |
| 	}
 | |
| 	glog.Infof("Detected change in list of cluster names. New  set: %v, Old set: %v", newSet, s.knownClusterSet)
 | |
| 	increase = newSet.Difference(s.knownClusterSet)
 | |
| 	// do nothing when cluster is removed.
 | |
| 	if increase != nil {
 | |
| 		// Try updating all services, and save the ones that fail to try again next
 | |
| 		// round.
 | |
| 		servicesToUpdate = s.serviceCache.allServices()
 | |
| 		numServices := len(servicesToUpdate)
 | |
| 		for newCluster := range increase {
 | |
| 			glog.Infof("New cluster observed %s", newCluster)
 | |
| 			s.updateAllServicesToCluster(servicesToUpdate, newCluster)
 | |
| 		}
 | |
| 		servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters)
 | |
| 		glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster",
 | |
| 			numServices-len(servicesToUpdate), numServices)
 | |
| 	}
 | |
| 	s.knownClusterSet = newSet
 | |
| }
 | |
| 
 | |
| func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) {
 | |
| 	cluster, ok := s.clusterCache.clientMap[clusterName]
 | |
| 	if ok {
 | |
| 		for _, cachedService := range services {
 | |
| 			appliedState := cachedService.lastState
 | |
| 			s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // updateDNSRecords updates all existing federation service DNS Records so that
 | |
| // they will match the list of cluster names provided.
 | |
| // Returns the list of services that couldn't be updated.
 | |
| func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) {
 | |
| 	for _, service := range services {
 | |
| 		func() {
 | |
| 			service.rwlock.Lock()
 | |
| 			defer service.rwlock.Unlock()
 | |
| 			// If the applied state is nil, that means it hasn't yet been successfully dealt
 | |
| 			// with by the DNS Record reconciler. We can trust the DNS Record
 | |
| 			// reconciler to ensure the federation service's DNS records are created to target
 | |
| 			// the correct backend service IP's
 | |
| 			if service.appliedState == nil {
 | |
| 				return
 | |
| 			}
 | |
| 			if err := s.lockedUpdateDNSRecords(service, clusters); err != nil {
 | |
| 				glog.Errorf("External error while updating DNS Records: %v.", err)
 | |
| 				servicesToRetry = append(servicesToRetry, service)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 	return servicesToRetry
 | |
| }
 | |
| 
 | |
| // lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex
 | |
| // associated with the service.
 | |
| func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error {
 | |
| 	if !wantsDNSRecords(service.appliedState) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	ensuredCount := 0
 | |
| 	unensuredCount := 0
 | |
| 	for key := range s.clusterCache.clientMap {
 | |
| 		for _, clusterName := range clusterNames {
 | |
| 			if key == clusterName {
 | |
| 				err := s.ensureDnsRecords(clusterName, service)
 | |
| 				if err != nil {
 | |
| 					unensuredCount += 1
 | |
| 					glog.V(4).Infof("Failed to update DNS records for service %v from cluster %s: %v", service, clusterName, err)
 | |
| 				} else {
 | |
| 					ensuredCount += 1
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	missedCount := len(clusterNames) - ensuredCount - unensuredCount
 | |
| 	if missedCount > 0 || unensuredCount > 0 {
 | |
| 		return fmt.Errorf("Failed to update DNS records for %d clusters for service %v due to missing clients [missed count: %d] and/or failing to ensure DNS records [unensured count: %d]",
 | |
| 			len(clusterNames), service, missedCount, unensuredCount)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func LoadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
 | |
| 	return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
 | |
| }
 | |
| 
 | |
| // Computes the next retry, using exponential backoff
 | |
| // mutex must be held.
 | |
| func (s *cachedService) nextRetryDelay() time.Duration {
 | |
| 	s.lastRetryDelay = s.lastRetryDelay * 2
 | |
| 	if s.lastRetryDelay < minRetryDelay {
 | |
| 		s.lastRetryDelay = minRetryDelay
 | |
| 	}
 | |
| 	if s.lastRetryDelay > maxRetryDelay {
 | |
| 		s.lastRetryDelay = maxRetryDelay
 | |
| 	}
 | |
| 	return s.lastRetryDelay
 | |
| }
 | |
| 
 | |
| // resetRetryDelay Resets the retry exponential backoff.  mutex must be held.
 | |
| func (s *cachedService) resetRetryDelay() {
 | |
| 	s.lastRetryDelay = time.Duration(0)
 | |
| }
 | |
| 
 | |
| // Computes the next retry, using exponential backoff
 | |
| // mutex must be held.
 | |
| func (s *cachedService) nextFedUpdateDelay() time.Duration {
 | |
| 	s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2
 | |
| 	if s.lastFedUpdateDelay < minRetryDelay {
 | |
| 		s.lastFedUpdateDelay = minRetryDelay
 | |
| 	}
 | |
| 	if s.lastFedUpdateDelay > maxRetryDelay {
 | |
| 		s.lastFedUpdateDelay = maxRetryDelay
 | |
| 	}
 | |
| 	return s.lastFedUpdateDelay
 | |
| }
 | |
| 
 | |
| // resetRetryDelay Resets the retry exponential backoff.  mutex must be held.
 | |
| func (s *cachedService) resetFedUpdateDelay() {
 | |
| 	s.lastFedUpdateDelay = time.Duration(0)
 | |
| }
 | |
| 
 | |
| // Computes the next retry, using exponential backoff
 | |
| // mutex must be held.
 | |
| func (s *cachedService) nextDNSUpdateDelay() time.Duration {
 | |
| 	s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2
 | |
| 	if s.lastDNSUpdateDelay < minRetryDelay {
 | |
| 		s.lastDNSUpdateDelay = minRetryDelay
 | |
| 	}
 | |
| 	if s.lastDNSUpdateDelay > maxRetryDelay {
 | |
| 		s.lastDNSUpdateDelay = maxRetryDelay
 | |
| 	}
 | |
| 	return s.lastDNSUpdateDelay
 | |
| }
 | |
| 
 | |
| // resetRetryDelay Resets the retry exponential backoff.  mutex must be held.
 | |
| func (s *cachedService) resetDNSUpdateDelay() {
 | |
| 	s.lastDNSUpdateDelay = time.Duration(0)
 | |
| }
 | |
| 
 | |
| // syncService will sync the Service with the given key if it has had its expectations fulfilled,
 | |
| // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
 | |
| // invoked concurrently with the same key.
 | |
| func (s *ServiceController) syncService(key string) error {
 | |
| 	startTime := time.Now()
 | |
| 	var cachedService *cachedService
 | |
| 	var retryDelay time.Duration
 | |
| 	defer func() {
 | |
| 		glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
 | |
| 	}()
 | |
| 	// obj holds the latest service info from apiserver
 | |
| 	objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
 | |
| 		s.queue.Add(key)
 | |
| 		return err
 | |
| 	}
 | |
| 	if !exists {
 | |
| 		// service absence in store means watcher caught the deletion, ensure LB info is cleaned
 | |
| 		glog.Infof("Service has been deleted %v", key)
 | |
| 		err, retryDelay = s.processServiceDeletion(key)
 | |
| 	}
 | |
| 	// Create a copy before modifying the obj to prevent race condition with
 | |
| 	// other readers of obj from store.
 | |
| 	obj, err := conversion.NewCloner().DeepCopy(objFromStore)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
 | |
| 		s.queue.Add(key)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if exists {
 | |
| 		service, ok := obj.(*v1.Service)
 | |
| 		if ok {
 | |
| 			cachedService = s.serviceCache.getOrCreate(key)
 | |
| 			err, retryDelay = s.processServiceUpdate(cachedService, service, key)
 | |
| 		} else {
 | |
| 			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 			if !ok {
 | |
| 				return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj)
 | |
| 			}
 | |
| 			glog.Infof("Found tombstone for %v", key)
 | |
| 			err, retryDelay = s.processServiceDeletion(tombstone.Key)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if retryDelay != 0 {
 | |
| 		s.enqueueService(obj)
 | |
| 	} else if err != nil {
 | |
| 		runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // processServiceUpdate returns an error if processing the service update failed, along with a time.Duration
 | |
| // indicating whether processing should be retried; zero means no-retry; otherwise
 | |
| // we should retry in that Duration.
 | |
| func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
 | |
| 	// Ensure that no other goroutine will interfere with our processing of the
 | |
| 	// service.
 | |
| 	cachedService.rwlock.Lock()
 | |
| 	defer cachedService.rwlock.Unlock()
 | |
| 
 | |
| 	if service.DeletionTimestamp != nil {
 | |
| 		if err := s.delete(service); err != nil {
 | |
| 			glog.Errorf("Failed to delete %s: %v", service, err)
 | |
| 			s.eventRecorder.Eventf(service, api.EventTypeNormal, "DeleteFailed",
 | |
| 				"Service delete failed: %v", err)
 | |
| 			return err, cachedService.nextRetryDelay()
 | |
| 		}
 | |
| 		return nil, doNotRetry
 | |
| 	}
 | |
| 
 | |
| 	glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s",
 | |
| 		service.Name)
 | |
| 	// Add the required finalizers before creating a service in underlying clusters.
 | |
| 	updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v",
 | |
| 			service.Name, err)
 | |
| 		return err, cachedService.nextRetryDelay()
 | |
| 	}
 | |
| 	service = updatedServiceObj.(*v1.Service)
 | |
| 
 | |
| 	glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name)
 | |
| 
 | |
| 	// Update the cached service (used above for populating synthetic deletes)
 | |
| 	// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver
 | |
| 	// if the same service is changed before this go routine finished, there will be another queue entry to handle that.
 | |
| 	cachedService.lastState = service
 | |
| 	err, retry := s.updateFederationService(key, cachedService)
 | |
| 	if err != nil {
 | |
| 		message := "Error occurs when updating service to all clusters"
 | |
| 		if retry {
 | |
| 			message += " (will retry): "
 | |
| 		} else {
 | |
| 			message += " (will not retry): "
 | |
| 		}
 | |
| 		message += err.Error()
 | |
| 		s.eventRecorder.Event(service, v1.EventTypeWarning, "UpdateServiceFail", message)
 | |
| 		return err, cachedService.nextRetryDelay()
 | |
| 	}
 | |
| 	// Always update the cache upon success.
 | |
| 	// NOTE: Since we update the cached service if and only if we successfully
 | |
| 	// processed it, a cached service being nil implies that it hasn't yet
 | |
| 	// been successfully processed.
 | |
| 
 | |
| 	cachedService.appliedState = service
 | |
| 	s.serviceCache.set(key, cachedService)
 | |
| 	glog.V(4).Infof("Successfully proceeded services %s", key)
 | |
| 	cachedService.resetRetryDelay()
 | |
| 	return nil, doNotRetry
 | |
| }
 | |
| 
 | |
| // delete deletes the given service or returns error if the deletion was not complete.
 | |
| func (s *ServiceController) delete(service *v1.Service) error {
 | |
| 	glog.V(3).Infof("Handling deletion of service: %v", *service)
 | |
| 	_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(service)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
 | |
| 	if err != nil {
 | |
| 		// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
 | |
| 		// This is expected when we are processing an update as a result of service finalizer deletion.
 | |
| 		// The process that deleted the last finalizer is also going to delete the service and we do not have to do anything.
 | |
| 		if !errors.IsNotFound(err) {
 | |
| 			return fmt.Errorf("failed to delete service: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration
 | |
| // indicating whether processing should be retried; zero means no-retry; otherwise
 | |
| // we should retry in that Duration.
 | |
| func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
 | |
| 	glog.V(2).Infof("Process service deletion for %v", key)
 | |
| 	s.serviceCache.delete(key)
 | |
| 	return nil, doNotRetry
 | |
| }
 | 
