mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	fed: Make federated updater responsible for recording events
This commit is contained in:
		@@ -188,7 +188,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
 | 
			
		||||
		),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer,
 | 
			
		||||
	fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", fdc.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj runtime.Object) error {
 | 
			
		||||
			rs := obj.(*extensionsv1.Deployment)
 | 
			
		||||
			_, err := client.Extensions().Deployments(rs.Namespace).Create(rs)
 | 
			
		||||
@@ -214,7 +214,6 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
 | 
			
		||||
			return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
 | 
			
		||||
		},
 | 
			
		||||
		updateTimeout,
 | 
			
		||||
		fdc.eventRecorder,
 | 
			
		||||
		fdc.fedDeploymentInformer,
 | 
			
		||||
		fdc.fedUpdater,
 | 
			
		||||
	)
 | 
			
		||||
@@ -526,13 +525,11 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
 | 
			
		||||
 | 
			
		||||
		if !exists {
 | 
			
		||||
			if replicas > 0 {
 | 
			
		||||
				fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "CreateInCluster",
 | 
			
		||||
					"Creating deployment in cluster %s", clusterName)
 | 
			
		||||
 | 
			
		||||
				operations = append(operations, fedutil.FederatedOperation{
 | 
			
		||||
					Type:        fedutil.OperationTypeAdd,
 | 
			
		||||
					Obj:         ld,
 | 
			
		||||
					ClusterName: clusterName,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
@@ -541,13 +538,11 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
 | 
			
		||||
			currentLd := ldObj.(*extensionsv1.Deployment)
 | 
			
		||||
			// Update existing replica set, if needed.
 | 
			
		||||
			if !fedutil.DeploymentEquivalent(ld, currentLd) {
 | 
			
		||||
				fdc.eventRecorder.Eventf(fd, api.EventTypeNormal, "UpdateInCluster",
 | 
			
		||||
					"Updating deployment in cluster %s", clusterName)
 | 
			
		||||
 | 
			
		||||
				operations = append(operations, fedutil.FederatedOperation{
 | 
			
		||||
					Type:        fedutil.OperationTypeUpdate,
 | 
			
		||||
					Obj:         ld,
 | 
			
		||||
					ClusterName: clusterName,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
				glog.Infof("Updating %s in %s", currentLd.Name, clusterName)
 | 
			
		||||
			}
 | 
			
		||||
@@ -572,10 +567,7 @@ func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliation
 | 
			
		||||
		// Everything is in order
 | 
			
		||||
		return statusAllOk, nil
 | 
			
		||||
	}
 | 
			
		||||
	err = fdc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) {
 | 
			
		||||
		fdc.eventRecorder.Eventf(fd, api.EventTypeWarning, "FailedUpdateInCluster",
 | 
			
		||||
			"Deployment update in cluster %s failed: %v", op.ClusterName, operror)
 | 
			
		||||
	})
 | 
			
		||||
	err = fdc.fedUpdater.Update(operations, updateTimeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to execute updates for %s: %v", key, err)
 | 
			
		||||
		return statusError, err
 | 
			
		||||
 
 | 
			
		||||
@@ -229,7 +229,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Federated ingress updater along with Create/Update/Delete operations.
 | 
			
		||||
	ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer,
 | 
			
		||||
	ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, "ingress", ic.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			ingress := obj.(*extensionsv1beta1.Ingress)
 | 
			
		||||
			glog.V(4).Infof("Attempting to create Ingress: %v", ingress)
 | 
			
		||||
@@ -261,7 +261,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	// Federated configmap updater along with Create/Update/Delete operations.  Only Update should ever be called.
 | 
			
		||||
	ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer,
 | 
			
		||||
	ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer, "configmap", ic.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			configMap := obj.(*v1.ConfigMap)
 | 
			
		||||
			configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
 | 
			
		||||
@@ -297,7 +297,6 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
 | 
			
		||||
			return fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name)
 | 
			
		||||
		},
 | 
			
		||||
		ic.updateTimeout,
 | 
			
		||||
		ic.eventRecorder,
 | 
			
		||||
		ic.ingressFederatedInformer,
 | 
			
		||||
		ic.federatedIngressUpdater,
 | 
			
		||||
	)
 | 
			
		||||
@@ -566,12 +565,12 @@ func (ic *IngressController) reconcileConfigMap(cluster *federationapi.Cluster,
 | 
			
		||||
			Type:        util.OperationTypeUpdate,
 | 
			
		||||
			Obj:         configMap,
 | 
			
		||||
			ClusterName: cluster.Name,
 | 
			
		||||
			Key:         configMapNsName.String(),
 | 
			
		||||
		}}
 | 
			
		||||
		glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations)
 | 
			
		||||
		err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
 | 
			
		||||
			glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapName, cluster.Name, err)
 | 
			
		||||
			glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapNsName, cluster.Name, err)
 | 
			
		||||
			ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -770,8 +769,6 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
			glog.V(4).Infof("No existing Ingress %s in cluster %s - checking if appropriate to queue a create operation", ingress, cluster.Name)
 | 
			
		||||
			// We can't supply server-created fields when creating a new object.
 | 
			
		||||
			desiredIngress.ObjectMeta = util.DeepCopyRelevantObjectMeta(baseIngress.ObjectMeta)
 | 
			
		||||
			ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "CreateInCluster",
 | 
			
		||||
				"Creating ingress in cluster %s", cluster.Name)
 | 
			
		||||
 | 
			
		||||
			// We always first create an ingress in the first available cluster. Once that ingress
 | 
			
		||||
			// has been created and allocated a global IP (visible via an annotation),
 | 
			
		||||
@@ -797,6 +794,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
					Type:        util.OperationTypeAdd,
 | 
			
		||||
					Obj:         desiredIngress,
 | 
			
		||||
					ClusterName: cluster.Name,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
			} else {
 | 
			
		||||
				glog.V(4).Infof("No annotation %q exists on ingress %q in federation and waiting for ingress in cluster %s. Not queueing create operation for ingress until annotation exists", staticIPNameKeyWritable, ingress, firstClusterName)
 | 
			
		||||
@@ -867,13 +865,12 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
				for key, val := range baseIngress.ObjectMeta.Labels {
 | 
			
		||||
					desiredIngress.ObjectMeta.Labels[key] = val
 | 
			
		||||
				}
 | 
			
		||||
				ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "UpdateInCluster",
 | 
			
		||||
					"Updating ingress in cluster %s", cluster.Name)
 | 
			
		||||
 | 
			
		||||
				operations = append(operations, util.FederatedOperation{
 | 
			
		||||
					Type:        util.OperationTypeUpdate,
 | 
			
		||||
					Obj:         desiredIngress,
 | 
			
		||||
					ClusterName: cluster.Name,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
				// TODO: Transfer any readonly (target-proxy, url-map etc) annotations from the master cluster to the federation, if this is the master cluster.
 | 
			
		||||
				// This is only for consistency, so that the federation ingress metadata matches the underlying clusters.  It's not actually required				}
 | 
			
		||||
@@ -887,10 +884,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
 | 
			
		||||
	err = ic.federatedIngressUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) {
 | 
			
		||||
		ic.eventRecorder.Eventf(baseIngress, api.EventTypeWarning, "FailedClusterUpdate",
 | 
			
		||||
			"Ingress update in cluster %s failed: %v", op.ClusterName, operror)
 | 
			
		||||
	})
 | 
			
		||||
	err = ic.federatedIngressUpdater.Update(operations, ic.updateTimeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to execute updates for %s: %v", ingress, err)
 | 
			
		||||
		ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
 | 
			
		||||
 
 | 
			
		||||
@@ -156,7 +156,7 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Federated updater along with Create/Update/Delete operations.
 | 
			
		||||
	nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer,
 | 
			
		||||
	nc.federatedUpdater = util.NewFederatedUpdater(nc.namespaceFederatedInformer, "namespace", nc.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj runtime.Object) error {
 | 
			
		||||
			namespace := obj.(*apiv1.Namespace)
 | 
			
		||||
			_, err := client.Core().Namespaces().Create(namespace)
 | 
			
		||||
@@ -186,7 +186,6 @@ func NewNamespaceController(client federationclientset.Interface, dynamicClientP
 | 
			
		||||
			return fmt.Sprintf("%s/%s", namespace.Namespace, namespace.Name)
 | 
			
		||||
		},
 | 
			
		||||
		nc.updateTimeout,
 | 
			
		||||
		nc.eventRecorder,
 | 
			
		||||
		nc.namespaceFederatedInformer,
 | 
			
		||||
		nc.federatedUpdater,
 | 
			
		||||
	)
 | 
			
		||||
@@ -370,26 +369,22 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
 | 
			
		||||
		glog.V(5).Infof("Desired namespace in underlying clusters: %+v", desiredNamespace)
 | 
			
		||||
 | 
			
		||||
		if !found {
 | 
			
		||||
			nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "CreateInCluster",
 | 
			
		||||
				"Creating namespace in cluster %s", cluster.Name)
 | 
			
		||||
 | 
			
		||||
			operations = append(operations, util.FederatedOperation{
 | 
			
		||||
				Type:        util.OperationTypeAdd,
 | 
			
		||||
				Obj:         desiredNamespace,
 | 
			
		||||
				ClusterName: cluster.Name,
 | 
			
		||||
				Key:         namespace,
 | 
			
		||||
			})
 | 
			
		||||
		} else {
 | 
			
		||||
			clusterNamespace := clusterNamespaceObj.(*apiv1.Namespace)
 | 
			
		||||
 | 
			
		||||
			// Update existing namespace, if needed.
 | 
			
		||||
			if !util.ObjectMetaAndSpecEquivalent(desiredNamespace, clusterNamespace) {
 | 
			
		||||
				nc.eventRecorder.Eventf(baseNamespace, api.EventTypeNormal, "UpdateInCluster",
 | 
			
		||||
					"Updating namespace in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredNamespace, clusterNamespace)
 | 
			
		||||
 | 
			
		||||
				operations = append(operations, util.FederatedOperation{
 | 
			
		||||
					Type:        util.OperationTypeUpdate,
 | 
			
		||||
					Obj:         desiredNamespace,
 | 
			
		||||
					ClusterName: cluster.Name,
 | 
			
		||||
					Key:         namespace,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
@@ -401,10 +396,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string) {
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(2).Infof("Updating namespace %s in underlying clusters. Operations: %d", baseNamespace.Name, len(operations))
 | 
			
		||||
 | 
			
		||||
	err = nc.federatedUpdater.UpdateWithOnError(operations, nc.updateTimeout, func(op util.FederatedOperation, operror error) {
 | 
			
		||||
		nc.eventRecorder.Eventf(baseNamespace, api.EventTypeWarning, "UpdateInClusterFailed",
 | 
			
		||||
			"Namespace update in cluster %s failed: %v", op.ClusterName, operror)
 | 
			
		||||
	})
 | 
			
		||||
	err = nc.federatedUpdater.Update(operations, nc.updateTimeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
 | 
			
		||||
		nc.deliverNamespace(namespace, 0, true)
 | 
			
		||||
 
 | 
			
		||||
@@ -196,7 +196,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
 | 
			
		||||
	)
 | 
			
		||||
	frsc.replicaSetLister = extensionslisters.NewReplicaSetLister(replicaSetIndexer)
 | 
			
		||||
 | 
			
		||||
	frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
 | 
			
		||||
	frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer, "replicaset", frsc.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj runtime.Object) error {
 | 
			
		||||
			rs := obj.(*extensionsv1.ReplicaSet)
 | 
			
		||||
			_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
 | 
			
		||||
@@ -222,7 +222,6 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
 | 
			
		||||
			return fmt.Sprintf("%s/%s", replicaset.Namespace, replicaset.Name)
 | 
			
		||||
		},
 | 
			
		||||
		updateTimeout,
 | 
			
		||||
		frsc.eventRecorder,
 | 
			
		||||
		frsc.fedReplicaSetInformer,
 | 
			
		||||
		frsc.fedUpdater,
 | 
			
		||||
	)
 | 
			
		||||
@@ -543,26 +542,22 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
 | 
			
		||||
 | 
			
		||||
		if !exists {
 | 
			
		||||
			if replicas > 0 {
 | 
			
		||||
				frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "CreateInCluster",
 | 
			
		||||
					"Creating replicaset in cluster %s", clusterName)
 | 
			
		||||
 | 
			
		||||
				operations = append(operations, fedutil.FederatedOperation{
 | 
			
		||||
					Type:        fedutil.OperationTypeAdd,
 | 
			
		||||
					Obj:         lrs,
 | 
			
		||||
					ClusterName: clusterName,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			currentLrs := lrsObj.(*extensionsv1.ReplicaSet)
 | 
			
		||||
			// Update existing replica set, if needed.
 | 
			
		||||
			if !fedutil.ObjectMetaAndSpecEquivalent(lrs, currentLrs) {
 | 
			
		||||
				frsc.eventRecorder.Eventf(frs, api.EventTypeNormal, "UpdateInCluster",
 | 
			
		||||
					"Updating replicaset in cluster %s", clusterName)
 | 
			
		||||
 | 
			
		||||
				operations = append(operations, fedutil.FederatedOperation{
 | 
			
		||||
					Type:        fedutil.OperationTypeUpdate,
 | 
			
		||||
					Obj:         lrs,
 | 
			
		||||
					ClusterName: clusterName,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
			fedStatus.Replicas += currentLrs.Status.Replicas
 | 
			
		||||
@@ -584,10 +579,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
 | 
			
		||||
		// Everything is in order
 | 
			
		||||
		return statusAllOk, nil
 | 
			
		||||
	}
 | 
			
		||||
	err = frsc.fedUpdater.UpdateWithOnError(operations, updateTimeout, func(op fedutil.FederatedOperation, operror error) {
 | 
			
		||||
		frsc.eventRecorder.Eventf(frs, api.EventTypeWarning, "FailedUpdateInCluster",
 | 
			
		||||
			"Replicaset update in cluster %s failed: %v", op.ClusterName, operror)
 | 
			
		||||
	})
 | 
			
		||||
	err = frsc.fedUpdater.Update(operations, updateTimeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to execute updates for %s: %v", key, err)
 | 
			
		||||
		return statusError, err
 | 
			
		||||
 
 | 
			
		||||
@@ -185,7 +185,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | 
			
		||||
 | 
			
		||||
	s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle)
 | 
			
		||||
 | 
			
		||||
	s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer,
 | 
			
		||||
	s.federatedUpdater = fedutil.NewFederatedUpdater(s.federatedInformer, "service", s.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			svc := obj.(*v1.Service)
 | 
			
		||||
			_, err := client.Core().Services(svc.Namespace).Create(svc)
 | 
			
		||||
@@ -243,7 +243,6 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | 
			
		||||
			return fmt.Sprintf("%s/%s", service.Namespace, service.Name)
 | 
			
		||||
		},
 | 
			
		||||
		updateTimeout,
 | 
			
		||||
		s.eventRecorder,
 | 
			
		||||
		s.federatedInformer,
 | 
			
		||||
		s.federatedUpdater,
 | 
			
		||||
	)
 | 
			
		||||
@@ -601,11 +600,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(operations) != 0 {
 | 
			
		||||
		err = s.federatedUpdater.UpdateWithOnError(operations, s.updateTimeout,
 | 
			
		||||
			func(op fedutil.FederatedOperation, operror error) {
 | 
			
		||||
				runtime.HandleError(fmt.Errorf("Service update in cluster %s failed: %v", op.ClusterName, operror))
 | 
			
		||||
				s.eventRecorder.Eventf(fedService, api.EventTypeWarning, "UpdateInClusterFailed", "Service update in cluster %s failed: %v", op.ClusterName, operror)
 | 
			
		||||
			})
 | 
			
		||||
		err = s.federatedUpdater.Update(operations, s.updateTimeout)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if !errors.IsAlreadyExists(err) {
 | 
			
		||||
				runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))
 | 
			
		||||
@@ -642,12 +637,12 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
 | 
			
		||||
		desiredService.ResourceVersion = ""
 | 
			
		||||
 | 
			
		||||
		glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService)
 | 
			
		||||
		s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "CreateInCluster", "Creating service in cluster %s", cluster.Name)
 | 
			
		||||
 | 
			
		||||
		operation = &fedutil.FederatedOperation{
 | 
			
		||||
			Type:        fedutil.OperationTypeAdd,
 | 
			
		||||
			Obj:         desiredService,
 | 
			
		||||
			ClusterName: cluster.Name,
 | 
			
		||||
			Key:         key,
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		clusterService, ok := clusterServiceObj.(*v1.Service)
 | 
			
		||||
@@ -674,7 +669,6 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
 | 
			
		||||
		// Update existing service, if needed.
 | 
			
		||||
		if !Equivalent(desiredService, clusterService) {
 | 
			
		||||
			glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService)
 | 
			
		||||
			s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "UpdateInCluster", "Updating service in cluster %s. Desired: %+v\n Actual: %+v\n", cluster.Name, desiredService, clusterService)
 | 
			
		||||
 | 
			
		||||
			// ResourceVersion of cluster service can be different from federated service,
 | 
			
		||||
			// so do not update ResourceVersion while updating cluster service
 | 
			
		||||
@@ -684,6 +678,7 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
 | 
			
		||||
				Type:        fedutil.OperationTypeUpdate,
 | 
			
		||||
				Obj:         desiredService,
 | 
			
		||||
				ClusterName: cluster.Name,
 | 
			
		||||
				Key:         key,
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService)
 | 
			
		||||
 
 | 
			
		||||
@@ -166,7 +166,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Federated updeater along with Create/Update/Delete operations.
 | 
			
		||||
	s.updater = util.NewFederatedUpdater(s.informer,
 | 
			
		||||
	s.updater = util.NewFederatedUpdater(s.informer, adapter.Kind(), s.eventRecorder,
 | 
			
		||||
		func(client kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			_, err := adapter.ClusterCreate(client, obj)
 | 
			
		||||
			return err
 | 
			
		||||
@@ -189,7 +189,6 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
 | 
			
		||||
			return adapter.NamespacedName(obj).String()
 | 
			
		||||
		},
 | 
			
		||||
		s.updateTimeout,
 | 
			
		||||
		s.eventRecorder,
 | 
			
		||||
		s.informer,
 | 
			
		||||
		s.updater,
 | 
			
		||||
	)
 | 
			
		||||
@@ -352,25 +351,22 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
 | 
			
		||||
		desiredObj := s.adapter.Copy(obj)
 | 
			
		||||
 | 
			
		||||
		if !found {
 | 
			
		||||
			s.eventRecorder.Eventf(obj, api.EventTypeNormal, "CreateInCluster",
 | 
			
		||||
				"Creating %s in cluster %s", kind, cluster.Name)
 | 
			
		||||
 | 
			
		||||
			operations = append(operations, util.FederatedOperation{
 | 
			
		||||
				Type:        util.OperationTypeAdd,
 | 
			
		||||
				Obj:         desiredObj,
 | 
			
		||||
				ClusterName: cluster.Name,
 | 
			
		||||
				Key:         key,
 | 
			
		||||
			})
 | 
			
		||||
		} else {
 | 
			
		||||
			clusterObj := clusterObj.(pkgruntime.Object)
 | 
			
		||||
 | 
			
		||||
			// Update existing resource, if needed.
 | 
			
		||||
			if !s.adapter.Equivalent(desiredObj, clusterObj) {
 | 
			
		||||
				s.eventRecorder.Eventf(obj, api.EventTypeNormal, "UpdateInCluster",
 | 
			
		||||
					"Updating %s in cluster %s", kind, cluster.Name)
 | 
			
		||||
				operations = append(operations, util.FederatedOperation{
 | 
			
		||||
					Type:        util.OperationTypeUpdate,
 | 
			
		||||
					Obj:         desiredObj,
 | 
			
		||||
					ClusterName: cluster.Name,
 | 
			
		||||
					Key:         key,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
@@ -380,11 +376,7 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName
 | 
			
		||||
		// Everything is in order
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	err = s.updater.UpdateWithOnError(operations, s.updateTimeout,
 | 
			
		||||
		func(op util.FederatedOperation, operror error) {
 | 
			
		||||
			s.eventRecorder.Eventf(obj, api.EventTypeWarning, "UpdateInClusterFailed",
 | 
			
		||||
				"%s update in cluster %s failed: %v", strings.ToTitle(kind), op.ClusterName, operror)
 | 
			
		||||
		})
 | 
			
		||||
	err = s.updater.Update(operations, s.updateTimeout)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to execute updates for %s: %v", key, err)
 | 
			
		||||
 
 | 
			
		||||
@@ -43,6 +43,7 @@ go_library(
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/cache:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -14,12 +14,10 @@ go_library(
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//federation/pkg/federation-controller/util:go_default_library",
 | 
			
		||||
        "//federation/pkg/federation-controller/util/finalizers:go_default_library",
 | 
			
		||||
        "//pkg/api:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -27,10 +27,8 @@ import (
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	finalizersutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/finalizers"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
@@ -53,20 +51,17 @@ type DeletionHelper struct {
 | 
			
		||||
	updateObjFunc UpdateObjFunc
 | 
			
		||||
	objNameFunc   ObjNameFunc
 | 
			
		||||
	updateTimeout time.Duration
 | 
			
		||||
	eventRecorder record.EventRecorder
 | 
			
		||||
	informer      util.FederatedInformer
 | 
			
		||||
	updater       util.FederatedUpdater
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewDeletionHelper(
 | 
			
		||||
	updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc,
 | 
			
		||||
	updateTimeout time.Duration, eventRecorder record.EventRecorder,
 | 
			
		||||
	updateObjFunc UpdateObjFunc, objNameFunc ObjNameFunc, updateTimeout time.Duration,
 | 
			
		||||
	informer util.FederatedInformer, updater util.FederatedUpdater) *DeletionHelper {
 | 
			
		||||
	return &DeletionHelper{
 | 
			
		||||
		updateObjFunc: updateObjFunc,
 | 
			
		||||
		objNameFunc:   objNameFunc,
 | 
			
		||||
		updateTimeout: updateTimeout,
 | 
			
		||||
		eventRecorder: eventRecorder,
 | 
			
		||||
		informer:      informer,
 | 
			
		||||
		updater:       updater,
 | 
			
		||||
	}
 | 
			
		||||
@@ -157,13 +152,10 @@ func (dh *DeletionHelper) HandleObjectInUnderlyingClusters(obj runtime.Object) (
 | 
			
		||||
			Type:        util.OperationTypeDelete,
 | 
			
		||||
			ClusterName: clusterNsObj.ClusterName,
 | 
			
		||||
			Obj:         clusterNsObj.Object.(runtime.Object),
 | 
			
		||||
			Key:         objName,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	err = dh.updater.UpdateWithOnError(operations, dh.updateTimeout, func(op util.FederatedOperation, operror error) {
 | 
			
		||||
		objName := dh.objNameFunc(op.Obj)
 | 
			
		||||
		dh.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteInClusterFailed",
 | 
			
		||||
			"Failed to delete obj %s in cluster %s: %v", objName, op.ClusterName, operror)
 | 
			
		||||
	})
 | 
			
		||||
	err = dh.updater.Update(operations, dh.updateTimeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to execute updates for obj %s: %v", objName, err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,10 +18,14 @@ package util
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	pkgruntime "k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -39,6 +43,7 @@ type FederatedOperation struct {
 | 
			
		||||
	Type        FederatedOperationType
 | 
			
		||||
	ClusterName string
 | 
			
		||||
	Obj         pkgruntime.Object
 | 
			
		||||
	Key         string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// A helper that executes the given set of updates on federation, in parallel.
 | 
			
		||||
@@ -48,8 +53,6 @@ type FederatedUpdater interface {
 | 
			
		||||
	// stopped when it is reached. However the function will return after the timeout
 | 
			
		||||
	// with a non-nil error.
 | 
			
		||||
	Update([]FederatedOperation, time.Duration) error
 | 
			
		||||
 | 
			
		||||
	UpdateWithOnError([]FederatedOperation, time.Duration, func(FederatedOperation, error)) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// A function that executes some operation using the passed client and object.
 | 
			
		||||
@@ -58,25 +61,32 @@ type FederatedOperationHandler func(kubeclientset.Interface, pkgruntime.Object)
 | 
			
		||||
type federatedUpdaterImpl struct {
 | 
			
		||||
	federation FederationView
 | 
			
		||||
 | 
			
		||||
	kind string
 | 
			
		||||
 | 
			
		||||
	eventRecorder record.EventRecorder
 | 
			
		||||
 | 
			
		||||
	addFunction    FederatedOperationHandler
 | 
			
		||||
	updateFunction FederatedOperationHandler
 | 
			
		||||
	deleteFunction FederatedOperationHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewFederatedUpdater(federation FederationView, add, update, del FederatedOperationHandler) FederatedUpdater {
 | 
			
		||||
func NewFederatedUpdater(federation FederationView, kind string, recorder record.EventRecorder, add, update, del FederatedOperationHandler) FederatedUpdater {
 | 
			
		||||
	return &federatedUpdaterImpl{
 | 
			
		||||
		federation:     federation,
 | 
			
		||||
		kind:           kind,
 | 
			
		||||
		eventRecorder:  recorder,
 | 
			
		||||
		addFunction:    add,
 | 
			
		||||
		updateFunction: update,
 | 
			
		||||
		deleteFunction: del,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error {
 | 
			
		||||
	return fu.UpdateWithOnError(ops, timeout, nil)
 | 
			
		||||
func (fu *federatedUpdaterImpl) recordEvent(obj runtime.Object, eventType, eventVerb string, args ...interface{}) {
 | 
			
		||||
	messageFmt := eventVerb + " %s %q in cluster %s"
 | 
			
		||||
	fu.eventRecorder.Eventf(obj, api.EventTypeNormal, eventType, messageFmt, args...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fu *federatedUpdaterImpl) UpdateWithOnError(ops []FederatedOperation, timeout time.Duration, onError func(FederatedOperation, error)) error {
 | 
			
		||||
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error {
 | 
			
		||||
	done := make(chan error, len(ops))
 | 
			
		||||
	for _, op := range ops {
 | 
			
		||||
		go func(op FederatedOperation) {
 | 
			
		||||
@@ -89,21 +99,37 @@ func (fu *federatedUpdaterImpl) UpdateWithOnError(ops []FederatedOperation, time
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			eventArgs := []interface{}{fu.kind, op.Key, clusterName}
 | 
			
		||||
			baseEventType := fmt.Sprintf("%s", op.Type)
 | 
			
		||||
			eventType := fmt.Sprintf("%sInCluster", strings.Title(baseEventType))
 | 
			
		||||
 | 
			
		||||
			switch op.Type {
 | 
			
		||||
			case OperationTypeAdd:
 | 
			
		||||
				// TODO s+OperationTypeAdd+OperationTypeCreate+
 | 
			
		||||
				baseEventType = "create"
 | 
			
		||||
				eventType := "CreateInCluster"
 | 
			
		||||
 | 
			
		||||
				fu.recordEvent(op.Obj, eventType, "Creating", eventArgs...)
 | 
			
		||||
				err = fu.addFunction(clientset, op.Obj)
 | 
			
		||||
			case OperationTypeUpdate:
 | 
			
		||||
				fu.recordEvent(op.Obj, eventType, "Updating", eventArgs...)
 | 
			
		||||
				err = fu.updateFunction(clientset, op.Obj)
 | 
			
		||||
			case OperationTypeDelete:
 | 
			
		||||
				fu.recordEvent(op.Obj, eventType, "Deleting", eventArgs...)
 | 
			
		||||
				err = fu.deleteFunction(clientset, op.Obj)
 | 
			
		||||
				// IsNotFound error is fine since that means the object is deleted already.
 | 
			
		||||
				if errors.IsNotFound(err) {
 | 
			
		||||
				if err != nil && !errors.IsNotFound(err) {
 | 
			
		||||
					err = nil
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if err != nil && onError != nil {
 | 
			
		||||
				onError(op, err)
 | 
			
		||||
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				eventType := eventType + "Failed"
 | 
			
		||||
				messageFmt := "Failed to " + baseEventType + " %s %q in cluster %s: %v"
 | 
			
		||||
				eventArgs = append(eventArgs, err)
 | 
			
		||||
				fu.eventRecorder.Eventf(op.Obj, api.EventTypeWarning, eventType, messageFmt, eventArgs...)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			done <- err
 | 
			
		||||
		}(op)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -58,11 +58,19 @@ func (f *fakeFederationView) ClustersSynced() bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type fakeEventRecorder struct{}
 | 
			
		||||
 | 
			
		||||
func (f *fakeEventRecorder) Event(object pkgruntime.Object, eventtype, reason, message string) {}
 | 
			
		||||
func (f *fakeEventRecorder) Eventf(object pkgruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
 | 
			
		||||
}
 | 
			
		||||
func (f *fakeEventRecorder) PastEventf(object pkgruntime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFederatedUpdaterOK(t *testing.T) {
 | 
			
		||||
	addChan := make(chan string, 5)
 | 
			
		||||
	updateChan := make(chan string, 5)
 | 
			
		||||
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{},
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{},
 | 
			
		||||
		func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			service := obj.(*apiv1.Service)
 | 
			
		||||
			addChan <- service.Name
 | 
			
		||||
@@ -93,7 +101,7 @@ func TestFederatedUpdaterOK(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFederatedUpdaterError(t *testing.T) {
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{},
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{},
 | 
			
		||||
		func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			return fmt.Errorf("boom")
 | 
			
		||||
		}, noop, noop)
 | 
			
		||||
@@ -113,7 +121,7 @@ func TestFederatedUpdaterError(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestFederatedUpdaterTimeout(t *testing.T) {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{},
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{}, "foo", &fakeEventRecorder{},
 | 
			
		||||
		func(_ kubeclientset.Interface, obj pkgruntime.Object) error {
 | 
			
		||||
			time.Sleep(time.Minute)
 | 
			
		||||
			return nil
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user