mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	Merge pull request #30126 from mwielgus/federated_updater
Automatic merge from submit-queue Federation - common libs - FedratedUpdater A helper for executing multiple add/update/del operations on federation clusters. Contains a workaround against missing #28921. cc @nikhiljindal @wojtek-t @madhusudancs @kubernetes/sig-cluster-federation Fixes: #29869 #30030 Ref: #29347 <!-- Reviewable:start --> --- This change is [<img src="https://reviewable.kubernetes.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.kubernetes.io/reviews/kubernetes/kubernetes/30126) <!-- Reviewable:end -->
This commit is contained in:
		@@ -0,0 +1,59 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package util
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/meta"
 | 
			
		||||
	pkg_runtime "k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	//TODO: This will be removed once cluster name field is added to ObjectMeta.
 | 
			
		||||
	ClusterNameAnnotation = "federation.io/name"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TODO: This will be refactored once cluster name field is added to ObjectMeta.
 | 
			
		||||
func GetClusterName(obj pkg_runtime.Object) (string, error) {
 | 
			
		||||
	accessor, err := meta.Accessor(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
	annotations := accessor.GetAnnotations()
 | 
			
		||||
	if annotations != nil {
 | 
			
		||||
		if value, found := annotations[ClusterNameAnnotation]; found {
 | 
			
		||||
			return value, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return "", fmt.Errorf("Cluster information not available")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO: This will be removed once cluster name field is added to ObjectMeta.
 | 
			
		||||
func SetClusterName(obj pkg_runtime.Object, clusterName string) error {
 | 
			
		||||
	accessor, err := meta.Accessor(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	annotations := accessor.GetAnnotations()
 | 
			
		||||
	if annotations == nil {
 | 
			
		||||
		annotations = make(map[string]string)
 | 
			
		||||
		accessor.SetAnnotations(annotations)
 | 
			
		||||
	}
 | 
			
		||||
	annotations[ClusterNameAnnotation] = clusterName
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,55 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package util
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	api_v1 "k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestGetClusterName(t *testing.T) {
 | 
			
		||||
	// There is a single service ns1/s1 in cluster mycluster.
 | 
			
		||||
	service := api_v1.Service{
 | 
			
		||||
		ObjectMeta: api_v1.ObjectMeta{
 | 
			
		||||
			Namespace: "ns1",
 | 
			
		||||
			Name:      "s1",
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				ClusterNameAnnotation: "mycluster",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	name, err := GetClusterName(&service)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	assert.Equal(t, "mycluster", name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSetClusterName(t *testing.T) {
 | 
			
		||||
	// There is a single service ns1/s1 in cluster mycluster.
 | 
			
		||||
	service := api_v1.Service{
 | 
			
		||||
		ObjectMeta: api_v1.ObjectMeta{
 | 
			
		||||
			Namespace: "ns1",
 | 
			
		||||
			Name:      "s1",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	err := SetClusterName(&service, "mytestname")
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	clusterName := service.Annotations[ClusterNameAnnotation]
 | 
			
		||||
	assert.Equal(t, "mytestname", clusterName)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										116
									
								
								federation/pkg/federation-controller/util/federated_updater.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								federation/pkg/federation-controller/util/federated_updater.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,116 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package util
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
 | 
			
		||||
	pkg_runtime "k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Type of the operation that can be executed in Federated.
 | 
			
		||||
type FederatedOperationType string
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	OperationTypeAdd    = "add"
 | 
			
		||||
	OperationTypeUpdate = "update"
 | 
			
		||||
	OperationTypeDelete = "delete"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// FederatedOperation definition contains type (add/update/delete) and the object itself.
 | 
			
		||||
type FederatedOperation struct {
 | 
			
		||||
	Type FederatedOperationType
 | 
			
		||||
	Obj  pkg_runtime.Object
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// A helper that executes the given set of updates on federation, in parallel.
 | 
			
		||||
type FederatedUpdater interface {
 | 
			
		||||
	// Executes the given set of operations within the specified timeout.
 | 
			
		||||
	// Timeout is best-effort. There is no guarantee that the underlying operations are
 | 
			
		||||
	// stopped when it is reached. However the function will return after the timeout
 | 
			
		||||
	// with a non-nil error.
 | 
			
		||||
	Update([]FederatedOperation, time.Duration) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// A function that executes some operation using the passed client and object.
 | 
			
		||||
type FederatedOperationHandler func(federation_release_1_4.Interface, pkg_runtime.Object) error
 | 
			
		||||
 | 
			
		||||
type federatedUpdaterImpl struct {
 | 
			
		||||
	federation FederationView
 | 
			
		||||
 | 
			
		||||
	addFunction    FederatedOperationHandler
 | 
			
		||||
	updateFunction FederatedOperationHandler
 | 
			
		||||
	deleteFunction FederatedOperationHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewFederatedUpdater(federation FederationView, add, update, del FederatedOperationHandler) FederatedUpdater {
 | 
			
		||||
	return &federatedUpdaterImpl{
 | 
			
		||||
		federation:     federation,
 | 
			
		||||
		addFunction:    add,
 | 
			
		||||
		updateFunction: update,
 | 
			
		||||
		deleteFunction: del,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fu *federatedUpdaterImpl) Update(ops []FederatedOperation, timeout time.Duration) error {
 | 
			
		||||
	done := make(chan error, len(ops))
 | 
			
		||||
	for _, op := range ops {
 | 
			
		||||
		go func(op FederatedOperation) {
 | 
			
		||||
			clusterName, err := GetClusterName(op.Obj)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				done <- err
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// TODO: Ensure that the clientset has reasonable timeout.
 | 
			
		||||
			clientset, err := fu.federation.GetClientsetForCluster(clusterName)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				done <- err
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			switch op.Type {
 | 
			
		||||
			case OperationTypeAdd:
 | 
			
		||||
				err = fu.addFunction(clientset, op.Obj)
 | 
			
		||||
			case OperationTypeUpdate:
 | 
			
		||||
				err = fu.updateFunction(clientset, op.Obj)
 | 
			
		||||
			case OperationTypeDelete:
 | 
			
		||||
				err = fu.deleteFunction(clientset, op.Obj)
 | 
			
		||||
			}
 | 
			
		||||
			done <- err
 | 
			
		||||
		}(op)
 | 
			
		||||
	}
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	for i := 0; i < len(ops); i++ {
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		if !now.Before(start.Add(timeout)) {
 | 
			
		||||
			return fmt.Errorf("failed to finish all operations in %v", timeout)
 | 
			
		||||
		}
 | 
			
		||||
		select {
 | 
			
		||||
		case err := <-done:
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		case <-time.After(start.Add(timeout).Sub(now)):
 | 
			
		||||
			return fmt.Errorf("failed to finish all operations in %v", timeout)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// All operations finished in time.
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,144 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package util
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
 | 
			
		||||
	fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
 | 
			
		||||
	api_v1 "k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
	pkg_runtime "k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Fake federation view.
 | 
			
		||||
type fakeFederationView struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f fakeFederationView) GetClientsetForCluster(clusterName string) (federation_release_1_4.Interface, error) {
 | 
			
		||||
	return &fake_federation_release_1_4.Clientset{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeFederationView) GetReadyClusters() ([]*federation_api.Cluster, error) {
 | 
			
		||||
	return []*federation_api.Cluster{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeFederationView) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) {
 | 
			
		||||
	return nil, false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeFederationView) ClustersSynced() bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFederatedUpdaterOK(t *testing.T) {
 | 
			
		||||
	addChan := make(chan string, 5)
 | 
			
		||||
	updateChan := make(chan string, 5)
 | 
			
		||||
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{},
 | 
			
		||||
		func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			clusterName, _ := GetClusterName(obj)
 | 
			
		||||
			addChan <- clusterName
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			clusterName, _ := GetClusterName(obj)
 | 
			
		||||
			updateChan <- clusterName
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		noop)
 | 
			
		||||
 | 
			
		||||
	err := updater.Update([]FederatedOperation{
 | 
			
		||||
		{
 | 
			
		||||
			Type: OperationTypeAdd,
 | 
			
		||||
			Obj:  makeService("A", "s1"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			Type: OperationTypeUpdate,
 | 
			
		||||
			Obj:  makeService("B", "s1"),
 | 
			
		||||
		},
 | 
			
		||||
	}, time.Minute)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	add := <-addChan
 | 
			
		||||
	update := <-updateChan
 | 
			
		||||
	assert.Equal(t, "A", add)
 | 
			
		||||
	assert.Equal(t, "B", update)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFederatedUpdaterError(t *testing.T) {
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{},
 | 
			
		||||
		func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			return fmt.Errorf("boom")
 | 
			
		||||
		}, noop, noop)
 | 
			
		||||
 | 
			
		||||
	err := updater.Update([]FederatedOperation{
 | 
			
		||||
		{
 | 
			
		||||
			Type: OperationTypeAdd,
 | 
			
		||||
			Obj:  makeService("A", "s1"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			Type: OperationTypeUpdate,
 | 
			
		||||
			Obj:  makeService("B", "s1"),
 | 
			
		||||
		},
 | 
			
		||||
	}, time.Minute)
 | 
			
		||||
	assert.Error(t, err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFederatedUpdaterTimeout(t *testing.T) {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	updater := NewFederatedUpdater(&fakeFederationView{},
 | 
			
		||||
		func(_ federation_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			time.Sleep(time.Minute)
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		noop, noop)
 | 
			
		||||
 | 
			
		||||
	err := updater.Update([]FederatedOperation{
 | 
			
		||||
		{
 | 
			
		||||
			Type: OperationTypeAdd,
 | 
			
		||||
			Obj:  makeService("A", "s1"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			Type: OperationTypeUpdate,
 | 
			
		||||
			Obj:  makeService("B", "s1"),
 | 
			
		||||
		},
 | 
			
		||||
	}, time.Second)
 | 
			
		||||
	end := time.Now()
 | 
			
		||||
	assert.Error(t, err)
 | 
			
		||||
	assert.True(t, start.Add(10*time.Second).After(end))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeService(cluster, name string) *api_v1.Service {
 | 
			
		||||
	return &api_v1.Service{
 | 
			
		||||
		ObjectMeta: api_v1.ObjectMeta{
 | 
			
		||||
			Namespace: "ns1",
 | 
			
		||||
			Name:      name,
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				ClusterNameAnnotation: cluster,
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func noop(_ federation_release_1_4.Interface, _ pkg_runtime.Object) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user