mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 11:48:15 +00:00 
			
		
		
		
	Replicate the persistent volume label admission plugin in a controller in
the cloud-controller-manager
This commit is contained in:
		@@ -44,7 +44,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller"
 | 
			
		||||
	nodecontroller "k8s.io/kubernetes/pkg/controller/cloud"
 | 
			
		||||
	cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
 | 
			
		||||
	routecontroller "k8s.io/kubernetes/pkg/controller/route"
 | 
			
		||||
	servicecontroller "k8s.io/kubernetes/pkg/controller/service"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/configz"
 | 
			
		||||
@@ -211,7 +211,7 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
 | 
			
		||||
	sharedInformers := informers.NewSharedInformerFactory(versionedClient, resyncPeriod(s)())
 | 
			
		||||
 | 
			
		||||
	// Start the CloudNodeController
 | 
			
		||||
	nodeController := nodecontroller.NewCloudNodeController(
 | 
			
		||||
	nodeController := cloudcontrollers.NewCloudNodeController(
 | 
			
		||||
		sharedInformers.Core().V1().Nodes(),
 | 
			
		||||
		client("cloud-node-controller"), cloud,
 | 
			
		||||
		s.NodeMonitorPeriod.Duration,
 | 
			
		||||
@@ -220,6 +220,12 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
 | 
			
		||||
	nodeController.Run()
 | 
			
		||||
	time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
 | 
			
		||||
 | 
			
		||||
	// Start the PersistentVolumeLabelController
 | 
			
		||||
	pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud)
 | 
			
		||||
	threads := 5
 | 
			
		||||
	go pvlController.Run(threads, stop)
 | 
			
		||||
	time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
 | 
			
		||||
 | 
			
		||||
	// Start the service controller
 | 
			
		||||
	serviceController, err := servicecontroller.New(
 | 
			
		||||
		cloud,
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,13 @@
 | 
			
		||||
kind: InitializerConfiguration
 | 
			
		||||
apiVersion: admissionregistration.k8s.io/v1alpha1
 | 
			
		||||
metadata:
 | 
			
		||||
  name: pvlabel.kubernetes.io
 | 
			
		||||
initializers:
 | 
			
		||||
  - name: pvlabel.kubernetes.io
 | 
			
		||||
    rules:
 | 
			
		||||
    - apiGroups:
 | 
			
		||||
      - ""
 | 
			
		||||
      apiVersions:
 | 
			
		||||
      - "*"
 | 
			
		||||
      resources:
 | 
			
		||||
      - persistentvolumes
 | 
			
		||||
@@ -8,35 +8,53 @@ load(
 | 
			
		||||
 | 
			
		||||
go_library(
 | 
			
		||||
    name = "go_default_library",
 | 
			
		||||
    srcs = ["node_controller.go"],
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "node_controller.go",
 | 
			
		||||
        "pvlcontroller.go",
 | 
			
		||||
    ],
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/api/v1/node:go_default_library",
 | 
			
		||||
        "//pkg/cloudprovider:go_default_library",
 | 
			
		||||
        "//pkg/cloudprovider/providers/aws:go_default_library",
 | 
			
		||||
        "//pkg/cloudprovider/providers/gce:go_default_library",
 | 
			
		||||
        "//pkg/controller:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/apis:go_default_library",
 | 
			
		||||
        "//pkg/util/node:go_default_library",
 | 
			
		||||
        "//pkg/volume:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/algorithm:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/cache:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/util/retry:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/util/workqueue:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
go_test(
 | 
			
		||||
    name = "go_default_test",
 | 
			
		||||
    srcs = ["node_controller_test.go"],
 | 
			
		||||
    srcs = [
 | 
			
		||||
        "node_controller_test.go",
 | 
			
		||||
        "pvlcontroller_test.go",
 | 
			
		||||
    ],
 | 
			
		||||
    library = ":go_default_library",
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/cloudprovider:go_default_library",
 | 
			
		||||
        "//pkg/cloudprovider/providers/aws:go_default_library",
 | 
			
		||||
        "//pkg/cloudprovider/providers/fake:go_default_library",
 | 
			
		||||
        "//pkg/controller:go_default_library",
 | 
			
		||||
        "//pkg/controller/testutil:go_default_library",
 | 
			
		||||
@@ -45,11 +63,13 @@ go_test(
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/testing:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										373
									
								
								pkg/controller/cloud/pvlcontroller.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										373
									
								
								pkg/controller/cloud/pvlcontroller.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,373 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2017 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package cloud
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/strategicpatch"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/client-go/kubernetes"
 | 
			
		||||
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/client-go/util/workqueue"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller"
 | 
			
		||||
	kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
 | 
			
		||||
	vol "k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const initializerName = "pvlabel.kubernetes.io"
 | 
			
		||||
 | 
			
		||||
// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created
 | 
			
		||||
type PersistentVolumeLabelController struct {
 | 
			
		||||
	// Control access to cloud volumes
 | 
			
		||||
	mutex            sync.Mutex
 | 
			
		||||
	ebsVolumes       aws.Volumes
 | 
			
		||||
	gceCloudProvider *gce.GCECloud
 | 
			
		||||
 | 
			
		||||
	cloud         cloudprovider.Interface
 | 
			
		||||
	kubeClient    kubernetes.Interface
 | 
			
		||||
	pvlController cache.Controller
 | 
			
		||||
	pvlIndexer    cache.Indexer
 | 
			
		||||
	volumeLister  corelisters.PersistentVolumeLister
 | 
			
		||||
 | 
			
		||||
	syncHandler func(key string) error
 | 
			
		||||
 | 
			
		||||
	// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
 | 
			
		||||
	queue workqueue.RateLimitingInterface
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object
 | 
			
		||||
func NewPersistentVolumeLabelController(
 | 
			
		||||
	kubeClient kubernetes.Interface,
 | 
			
		||||
	cloud cloudprovider.Interface) *PersistentVolumeLabelController {
 | 
			
		||||
 | 
			
		||||
	pvlc := &PersistentVolumeLabelController{
 | 
			
		||||
		cloud:      cloud,
 | 
			
		||||
		kubeClient: kubeClient,
 | 
			
		||||
		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"),
 | 
			
		||||
	}
 | 
			
		||||
	pvlc.syncHandler = pvlc.addLabels
 | 
			
		||||
	pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				options.IncludeUninitialized = true
 | 
			
		||||
				return kubeClient.CoreV1().PersistentVolumes().List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				options.IncludeUninitialized = true
 | 
			
		||||
				return kubeClient.CoreV1().PersistentVolumes().Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&v1.PersistentVolume{},
 | 
			
		||||
		0,
 | 
			
		||||
		cache.ResourceEventHandlerFuncs{
 | 
			
		||||
			AddFunc: func(obj interface{}) {
 | 
			
		||||
				key, err := cache.MetaNamespaceKeyFunc(obj)
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					pvlc.queue.Add(key)
 | 
			
		||||
				}
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		cache.Indexers{},
 | 
			
		||||
	)
 | 
			
		||||
	pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer)
 | 
			
		||||
 | 
			
		||||
	return pvlc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Run starts a controller that adds labels to persistent volumes
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) {
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	defer pvlc.queue.ShutDown()
 | 
			
		||||
 | 
			
		||||
	glog.Infof("Starting PersistentVolumeLabelController")
 | 
			
		||||
	defer glog.Infof("Shutting down PersistentVolumeLabelController")
 | 
			
		||||
 | 
			
		||||
	go pvlc.pvlController.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// start up your worker threads based on threadiness.  Some controllers have multiple kinds of workers
 | 
			
		||||
	for i := 0; i < threadiness; i++ {
 | 
			
		||||
		// runWorker will loop until "something bad" happens.  The .Until will then rekick the worker
 | 
			
		||||
		// after one second
 | 
			
		||||
		go wait.Until(pvlc.runWorker, time.Second, stopCh)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// wait until we're told to stop
 | 
			
		||||
	<-stopCh
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) runWorker() {
 | 
			
		||||
	// hot loop until we're told to stop.  processNextWorkItem will automatically wait until there's work
 | 
			
		||||
	// available, so we don't worry about secondary waits
 | 
			
		||||
	for pvlc.processNextWorkItem() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool {
 | 
			
		||||
	// pull the next work item from queue.  It should be a key we use to lookup something in a cache
 | 
			
		||||
	keyObj, quit := pvlc.queue.Get()
 | 
			
		||||
	if quit {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	// you always have to indicate to the queue that you've completed a piece of work
 | 
			
		||||
	defer pvlc.queue.Done(keyObj)
 | 
			
		||||
 | 
			
		||||
	key := keyObj.(string)
 | 
			
		||||
	// do your work on the key.  This method will contains your "do stuff" logic
 | 
			
		||||
	err := pvlc.syncHandler(key)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		// if you had no error, tell the queue to stop tracking history for your key.  This will
 | 
			
		||||
		// reset things like failure counts for per-item rate limiting
 | 
			
		||||
		pvlc.queue.Forget(key)
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// there was a failure so be sure to report it.  This method allows for pluggable error handling
 | 
			
		||||
	// which can be used for things like cluster-monitoring
 | 
			
		||||
	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
 | 
			
		||||
 | 
			
		||||
	// since we failed, we should requeue the item to work on later.  This method will add a backoff
 | 
			
		||||
	// to avoid hotlooping on particular items (they're probably still not going to work right away)
 | 
			
		||||
	// and overall controller protection (everything I've done is broken, this controller needs to
 | 
			
		||||
	// calm down or it can starve other useful work) cases.
 | 
			
		||||
	pvlc.queue.AddRateLimited(key)
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddLabels adds appropriate labels to persistent volumes and sets the
 | 
			
		||||
// volume as available if successful.
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) addLabels(key string) error {
 | 
			
		||||
	_, name, err := cache.SplitMetaNamespaceKey(key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err)
 | 
			
		||||
	}
 | 
			
		||||
	volume, err := pvlc.volumeLister.Get(name)
 | 
			
		||||
	if errors.IsNotFound(err) {
 | 
			
		||||
		return nil
 | 
			
		||||
	} else if err != nil {
 | 
			
		||||
		return fmt.Errorf("error getting volume %s from informer: %v", name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return pvlc.addLabelsToVolume(volume)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error {
 | 
			
		||||
	var volumeLabels map[string]string
 | 
			
		||||
 | 
			
		||||
	// Only add labels if in the list of initializers
 | 
			
		||||
	if needsInitialization(vol.Initializers, initializerName) {
 | 
			
		||||
		if vol.Spec.AWSElasticBlockStore != nil {
 | 
			
		||||
			labels, err := pvlc.findAWSEBSLabels(vol)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return fmt.Errorf("error querying AWS EBS volume %s: %v", vol.Spec.AWSElasticBlockStore.VolumeID, err)
 | 
			
		||||
			}
 | 
			
		||||
			volumeLabels = labels
 | 
			
		||||
		}
 | 
			
		||||
		if vol.Spec.GCEPersistentDisk != nil {
 | 
			
		||||
			labels, err := pvlc.findGCEPDLabels(vol)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return fmt.Errorf("error querying GCE PD volume %s: %v", vol.Spec.GCEPersistentDisk.PDName, err)
 | 
			
		||||
			}
 | 
			
		||||
			volumeLabels = labels
 | 
			
		||||
		}
 | 
			
		||||
		return pvlc.updateVolume(vol, volumeLabels)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) findAWSEBSLabels(volume *v1.PersistentVolume) (map[string]string, error) {
 | 
			
		||||
	// Ignore any volumes that are being provisioned
 | 
			
		||||
	if volume.Spec.AWSElasticBlockStore.VolumeID == vol.ProvisionedVolumeName {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
	ebsVolumes, err := pvlc.getEBSVolumes()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: GetVolumeLabels is actually a method on the Volumes interface
 | 
			
		||||
	// If that gets standardized we can refactor to reduce code duplication
 | 
			
		||||
	spec := aws.KubernetesVolumeID(volume.Spec.AWSElasticBlockStore.VolumeID)
 | 
			
		||||
	labels, err := ebsVolumes.GetVolumeLabels(spec)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return labels, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getEBSVolumes returns the AWS Volumes interface for ebs
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) getEBSVolumes() (aws.Volumes, error) {
 | 
			
		||||
	pvlc.mutex.Lock()
 | 
			
		||||
	defer pvlc.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if pvlc.ebsVolumes == nil {
 | 
			
		||||
		awsCloudProvider := pvlc.cloud.(*aws.Cloud)
 | 
			
		||||
		awsCloudProvider, ok := pvlc.cloud.(*aws.Cloud)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			// GetCloudProvider has gone very wrong
 | 
			
		||||
			return nil, fmt.Errorf("error retrieving AWS cloud provider")
 | 
			
		||||
		}
 | 
			
		||||
		pvlc.ebsVolumes = awsCloudProvider
 | 
			
		||||
	}
 | 
			
		||||
	return pvlc.ebsVolumes, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) findGCEPDLabels(volume *v1.PersistentVolume) (map[string]string, error) {
 | 
			
		||||
	// Ignore any volumes that are being provisioned
 | 
			
		||||
	if volume.Spec.GCEPersistentDisk.PDName == vol.ProvisionedVolumeName {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	provider, err := pvlc.getGCECloudProvider()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// If the zone is already labeled, honor the hint
 | 
			
		||||
	zone := volume.Labels[kubeletapis.LabelZoneFailureDomain]
 | 
			
		||||
 | 
			
		||||
	labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return labels, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getGCECloudProvider returns the GCE cloud provider, for use for querying volume labels
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) getGCECloudProvider() (*gce.GCECloud, error) {
 | 
			
		||||
	pvlc.mutex.Lock()
 | 
			
		||||
	defer pvlc.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if pvlc.gceCloudProvider == nil {
 | 
			
		||||
		gceCloudProvider, ok := pvlc.cloud.(*gce.GCECloud)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			// GetCloudProvider has gone very wrong
 | 
			
		||||
			return nil, fmt.Errorf("error retrieving GCE cloud provider")
 | 
			
		||||
		}
 | 
			
		||||
		pvlc.gceCloudProvider = gceCloudProvider
 | 
			
		||||
	}
 | 
			
		||||
	return pvlc.gceCloudProvider, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) {
 | 
			
		||||
	volName := vol.Name
 | 
			
		||||
	newVolume := vol.DeepCopyObject().(*v1.PersistentVolume)
 | 
			
		||||
	if newVolume.Labels == nil {
 | 
			
		||||
		newVolume.Labels = make(map[string]string)
 | 
			
		||||
	}
 | 
			
		||||
	for k, v := range volLabels {
 | 
			
		||||
		newVolume.Labels[k] = v
 | 
			
		||||
	}
 | 
			
		||||
	newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName)
 | 
			
		||||
	glog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name)
 | 
			
		||||
 | 
			
		||||
	oldData, err := json.Marshal(vol)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	newData, err := json.Marshal(newVolume)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err)
 | 
			
		||||
	}
 | 
			
		||||
	return patch, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error {
 | 
			
		||||
	volName := vol.Name
 | 
			
		||||
	glog.V(4).Infof("updating PersistentVolume %s", volName)
 | 
			
		||||
	patchBytes, err := pvlc.createPatch(vol, volLabels)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = pvlc.kubeClient.Core().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err)
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("updated PersistentVolume %s", volName)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func removeInitializer(initializers *metav1.Initializers, name string) *metav1.Initializers {
 | 
			
		||||
	if initializers == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var updated []metav1.Initializer
 | 
			
		||||
	for _, pending := range initializers.Pending {
 | 
			
		||||
		if pending.Name != name {
 | 
			
		||||
			updated = append(updated, pending)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(updated) == len(initializers.Pending) {
 | 
			
		||||
		return initializers
 | 
			
		||||
	}
 | 
			
		||||
	if len(updated) == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &metav1.Initializers{Pending: updated}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func needsInitialization(initializers *metav1.Initializers, name string) bool {
 | 
			
		||||
	hasInitializer := false
 | 
			
		||||
 | 
			
		||||
	if initializers != nil {
 | 
			
		||||
		for _, pending := range initializers.Pending {
 | 
			
		||||
			if pending.Name == name {
 | 
			
		||||
				hasInitializer = true
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return hasInitializer
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										226
									
								
								pkg/controller/cloud/pvlcontroller_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										226
									
								
								pkg/controller/cloud/pvlcontroller_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,226 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2017 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package cloud
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/client-go/kubernetes/fake"
 | 
			
		||||
	core "k8s.io/client-go/testing"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
 | 
			
		||||
	fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type mockVolumes struct {
 | 
			
		||||
	volumeLabels      map[string]string
 | 
			
		||||
	volumeLabelsError error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ aws.Volumes = &mockVolumes{}
 | 
			
		||||
 | 
			
		||||
func (v *mockVolumes) AttachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName, readOnly bool) (string, error) {
 | 
			
		||||
	return "", fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (v *mockVolumes) DetachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (string, error) {
 | 
			
		||||
	return "", fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName aws.KubernetesVolumeID, err error) {
 | 
			
		||||
	return "", fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (v *mockVolumes) DeleteDisk(volumeName aws.KubernetesVolumeID) (bool, error) {
 | 
			
		||||
	return false, fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (v *mockVolumes) GetVolumeLabels(volumeName aws.KubernetesVolumeID) (map[string]string, error) {
 | 
			
		||||
	return v.volumeLabels, v.volumeLabelsError
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *mockVolumes) GetDiskPath(volumeName aws.KubernetesVolumeID) (string, error) {
 | 
			
		||||
	return "", fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) {
 | 
			
		||||
	return false, fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) {
 | 
			
		||||
	return nil, fmt.Errorf("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCreatePatch(t *testing.T) {
 | 
			
		||||
	ignoredPV := v1.PersistentVolume{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "noncloud",
 | 
			
		||||
			Initializers: &metav1.Initializers{
 | 
			
		||||
				Pending: []metav1.Initializer{
 | 
			
		||||
					{
 | 
			
		||||
						Name: initializerName,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PersistentVolumeSpec{
 | 
			
		||||
			PersistentVolumeSource: v1.PersistentVolumeSource{
 | 
			
		||||
				HostPath: &v1.HostPathVolumeSource{
 | 
			
		||||
					Path: "/",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	awsPV := v1.PersistentVolume{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "awsPV",
 | 
			
		||||
			Initializers: &metav1.Initializers{
 | 
			
		||||
				Pending: []metav1.Initializer{
 | 
			
		||||
					{
 | 
			
		||||
						Name: initializerName,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PersistentVolumeSpec{
 | 
			
		||||
			PersistentVolumeSource: v1.PersistentVolumeSource{
 | 
			
		||||
				AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
 | 
			
		||||
					VolumeID: "123",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testCases := map[string]struct {
 | 
			
		||||
		vol    v1.PersistentVolume
 | 
			
		||||
		labels map[string]string
 | 
			
		||||
	}{
 | 
			
		||||
		"non-cloud PV": {
 | 
			
		||||
			vol:    ignoredPV,
 | 
			
		||||
			labels: nil,
 | 
			
		||||
		},
 | 
			
		||||
		"no labels": {
 | 
			
		||||
			vol:    awsPV,
 | 
			
		||||
			labels: nil,
 | 
			
		||||
		},
 | 
			
		||||
		"cloudprovider returns nil, nil": {
 | 
			
		||||
			vol:    awsPV,
 | 
			
		||||
			labels: nil,
 | 
			
		||||
		},
 | 
			
		||||
		"cloudprovider labels": {
 | 
			
		||||
			vol:    awsPV,
 | 
			
		||||
			labels: map[string]string{"a": "1", "b": "2"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for d, tc := range testCases {
 | 
			
		||||
		cloud := &fakecloud.FakeCloud{}
 | 
			
		||||
		client := fake.NewSimpleClientset()
 | 
			
		||||
		pvlController := NewPersistentVolumeLabelController(client, cloud)
 | 
			
		||||
		patch, err := pvlController.createPatch(&tc.vol, tc.labels)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("%s: createPatch returned err: %v", d, err)
 | 
			
		||||
		}
 | 
			
		||||
		obj := &v1.PersistentVolume{}
 | 
			
		||||
		json.Unmarshal(patch, obj)
 | 
			
		||||
		if tc.labels != nil {
 | 
			
		||||
			for k, v := range tc.labels {
 | 
			
		||||
				if obj.ObjectMeta.Labels[k] != v {
 | 
			
		||||
					t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k])
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if obj.ObjectMeta.Initializers != nil {
 | 
			
		||||
			t.Errorf("%s: initializer wasn't removed: %v", d, obj.ObjectMeta.Initializers)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAddLabelsToVolume(t *testing.T) {
 | 
			
		||||
	pv := v1.PersistentVolume{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "awsPV",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PersistentVolumeSpec{
 | 
			
		||||
			PersistentVolumeSource: v1.PersistentVolumeSource{
 | 
			
		||||
				AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
 | 
			
		||||
					VolumeID: "123",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testCases := map[string]struct {
 | 
			
		||||
		vol          v1.PersistentVolume
 | 
			
		||||
		initializers *metav1.Initializers
 | 
			
		||||
		shouldLabel  bool
 | 
			
		||||
	}{
 | 
			
		||||
		"PV without initializer": {
 | 
			
		||||
			vol:          pv,
 | 
			
		||||
			initializers: nil,
 | 
			
		||||
			shouldLabel:  false,
 | 
			
		||||
		},
 | 
			
		||||
		"PV with initializer to remove": {
 | 
			
		||||
			vol:          pv,
 | 
			
		||||
			initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}},
 | 
			
		||||
			shouldLabel:  true,
 | 
			
		||||
		},
 | 
			
		||||
		"PV with other initializers": {
 | 
			
		||||
			vol:          pv,
 | 
			
		||||
			initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}},
 | 
			
		||||
			shouldLabel:  false,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for d, tc := range testCases {
 | 
			
		||||
		labeledCh := make(chan bool, 1)
 | 
			
		||||
		client := fake.NewSimpleClientset()
 | 
			
		||||
		client.PrependReactor("patch", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
 | 
			
		||||
			patch := action.(core.PatchActionImpl).GetPatch()
 | 
			
		||||
			obj := &v1.PersistentVolume{}
 | 
			
		||||
			json.Unmarshal(patch, obj)
 | 
			
		||||
			if obj.ObjectMeta.Labels["a"] != "1" {
 | 
			
		||||
				return false, nil, nil
 | 
			
		||||
			}
 | 
			
		||||
			labeledCh <- true
 | 
			
		||||
			return true, nil, nil
 | 
			
		||||
		})
 | 
			
		||||
		pvlController := &PersistentVolumeLabelController{kubeClient: client, ebsVolumes: &mockVolumes{volumeLabels: map[string]string{"a": "1"}}}
 | 
			
		||||
		tc.vol.ObjectMeta.Initializers = tc.initializers
 | 
			
		||||
		pvlController.addLabelsToVolume(&tc.vol)
 | 
			
		||||
 | 
			
		||||
		select {
 | 
			
		||||
		case l := <-labeledCh:
 | 
			
		||||
			if l != tc.shouldLabel {
 | 
			
		||||
				t.Errorf("%s: label of pv failed.  expected %t got %t", d, tc.shouldLabel, l)
 | 
			
		||||
			}
 | 
			
		||||
		case <-time.After(500 * time.Millisecond):
 | 
			
		||||
			if tc.shouldLabel != false {
 | 
			
		||||
				t.Errorf("%s: timed out waiting for label notification", d)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user