mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			324 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			324 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2017 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package cloud
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"k8s.io/klog"
 | 
						|
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/strategicpatch"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/apimachinery/pkg/watch"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/client-go/util/workqueue"
 | 
						|
	cloudprovider "k8s.io/cloud-provider"
 | 
						|
	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
	kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
 | 
						|
	volumeutil "k8s.io/kubernetes/pkg/volume/util"
 | 
						|
)
 | 
						|
 | 
						|
const initializerName = "pvlabel.kubernetes.io"
 | 
						|
 | 
						|
// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created
 | 
						|
type PersistentVolumeLabelController struct {
 | 
						|
	cloud         cloudprovider.Interface
 | 
						|
	kubeClient    kubernetes.Interface
 | 
						|
	pvlController cache.Controller
 | 
						|
	pvlIndexer    cache.Indexer
 | 
						|
	volumeLister  corelisters.PersistentVolumeLister
 | 
						|
 | 
						|
	syncHandler func(key string) error
 | 
						|
 | 
						|
	// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
 | 
						|
	queue workqueue.RateLimitingInterface
 | 
						|
}
 | 
						|
 | 
						|
// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object
 | 
						|
func NewPersistentVolumeLabelController(
 | 
						|
	kubeClient kubernetes.Interface,
 | 
						|
	cloud cloudprovider.Interface) *PersistentVolumeLabelController {
 | 
						|
 | 
						|
	pvlc := &PersistentVolumeLabelController{
 | 
						|
		cloud:      cloud,
 | 
						|
		kubeClient: kubeClient,
 | 
						|
		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"),
 | 
						|
	}
 | 
						|
	pvlc.syncHandler = pvlc.addLabelsAndAffinity
 | 
						|
	pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer(
 | 
						|
		&cache.ListWatch{
 | 
						|
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
						|
				options.IncludeUninitialized = true
 | 
						|
				return kubeClient.CoreV1().PersistentVolumes().List(options)
 | 
						|
			},
 | 
						|
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
						|
				options.IncludeUninitialized = true
 | 
						|
				return kubeClient.CoreV1().PersistentVolumes().Watch(options)
 | 
						|
			},
 | 
						|
		},
 | 
						|
		&v1.PersistentVolume{},
 | 
						|
		0,
 | 
						|
		cache.ResourceEventHandlerFuncs{
 | 
						|
			AddFunc: func(obj interface{}) {
 | 
						|
				key, err := cache.MetaNamespaceKeyFunc(obj)
 | 
						|
				if err == nil {
 | 
						|
					pvlc.queue.Add(key)
 | 
						|
				}
 | 
						|
			},
 | 
						|
		},
 | 
						|
		cache.Indexers{},
 | 
						|
	)
 | 
						|
	pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer)
 | 
						|
 | 
						|
	return pvlc
 | 
						|
}
 | 
						|
 | 
						|
// Run starts a controller that adds labels to persistent volumes
 | 
						|
func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) {
 | 
						|
	defer utilruntime.HandleCrash()
 | 
						|
	defer pvlc.queue.ShutDown()
 | 
						|
 | 
						|
	klog.Infof("Starting PersistentVolumeLabelController")
 | 
						|
	defer klog.Infof("Shutting down PersistentVolumeLabelController")
 | 
						|
 | 
						|
	go pvlc.pvlController.Run(stopCh)
 | 
						|
 | 
						|
	if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// start up your worker threads based on threadiness.  Some controllers have multiple kinds of workers
 | 
						|
	for i := 0; i < threadiness; i++ {
 | 
						|
		// runWorker will loop until "something bad" happens.  The .Until will then rekick the worker
 | 
						|
		// after one second
 | 
						|
		go wait.Until(pvlc.runWorker, time.Second, stopCh)
 | 
						|
	}
 | 
						|
 | 
						|
	// wait until we're told to stop
 | 
						|
	<-stopCh
 | 
						|
}
 | 
						|
 | 
						|
func (pvlc *PersistentVolumeLabelController) runWorker() {
 | 
						|
	// hot loop until we're told to stop.  processNextWorkItem will automatically wait until there's work
 | 
						|
	// available, so we don't worry about secondary waits
 | 
						|
	for pvlc.processNextWorkItem() {
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
 | 
						|
func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool {
 | 
						|
	// pull the next work item from queue.  It should be a key we use to lookup something in a cache
 | 
						|
	keyObj, quit := pvlc.queue.Get()
 | 
						|
	if quit {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	// you always have to indicate to the queue that you've completed a piece of work
 | 
						|
	defer pvlc.queue.Done(keyObj)
 | 
						|
 | 
						|
	key := keyObj.(string)
 | 
						|
	// do your work on the key.  This method will contains your "do stuff" logic
 | 
						|
	err := pvlc.syncHandler(key)
 | 
						|
	if err == nil {
 | 
						|
		// if you had no error, tell the queue to stop tracking history for your key.  This will
 | 
						|
		// reset things like failure counts for per-item rate limiting
 | 
						|
		pvlc.queue.Forget(key)
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	// there was a failure so be sure to report it.  This method allows for pluggable error handling
 | 
						|
	// which can be used for things like cluster-monitoring
 | 
						|
	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
 | 
						|
 | 
						|
	// since we failed, we should requeue the item to work on later.  This method will add a backoff
 | 
						|
	// to avoid hotlooping on particular items (they're probably still not going to work right away)
 | 
						|
	// and overall controller protection (everything I've done is broken, this controller needs to
 | 
						|
	// calm down or it can starve other useful work) cases.
 | 
						|
	pvlc.queue.AddRateLimited(key)
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// AddLabels adds appropriate labels to persistent volumes and sets the
 | 
						|
// volume as available if successful.
 | 
						|
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error {
 | 
						|
	_, name, err := cache.SplitMetaNamespaceKey(key)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err)
 | 
						|
	}
 | 
						|
	volume, err := pvlc.volumeLister.Get(name)
 | 
						|
	if errors.IsNotFound(err) {
 | 
						|
		return nil
 | 
						|
	} else if err != nil {
 | 
						|
		return fmt.Errorf("error getting volume %s from informer: %v", name, err)
 | 
						|
	}
 | 
						|
 | 
						|
	return pvlc.addLabelsAndAffinityToVolume(volume)
 | 
						|
}
 | 
						|
 | 
						|
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error {
 | 
						|
	var volumeLabels map[string]string
 | 
						|
	// Only add labels if the next pending initializer.
 | 
						|
	if needsInitialization(vol.Initializers, initializerName) {
 | 
						|
		if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok {
 | 
						|
			labels, err := labeler.GetLabelsForVolume(context.TODO(), vol)
 | 
						|
			if err != nil {
 | 
						|
				return fmt.Errorf("error querying volume %v: %v", vol.Spec, err)
 | 
						|
			}
 | 
						|
			volumeLabels = labels
 | 
						|
		} else {
 | 
						|
			klog.V(4).Info("cloud provider does not support PVLabeler")
 | 
						|
		}
 | 
						|
		return pvlc.updateVolume(vol, volumeLabels)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) {
 | 
						|
	volName := vol.Name
 | 
						|
	newVolume := vol.DeepCopyObject().(*v1.PersistentVolume)
 | 
						|
	populateAffinity := len(volLabels) != 0
 | 
						|
 | 
						|
	if newVolume.Labels == nil {
 | 
						|
		newVolume.Labels = make(map[string]string)
 | 
						|
	}
 | 
						|
 | 
						|
	requirements := make([]v1.NodeSelectorRequirement, 0)
 | 
						|
	for k, v := range volLabels {
 | 
						|
		newVolume.Labels[k] = v
 | 
						|
		// Set NodeSelectorRequirements based on the labels
 | 
						|
		if populateAffinity {
 | 
						|
			var values []string
 | 
						|
			if k == kubeletapis.LabelZoneFailureDomain {
 | 
						|
				zones, err := volumeutil.LabelZonesToSet(v)
 | 
						|
				if err != nil {
 | 
						|
					return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v)
 | 
						|
				}
 | 
						|
				values = zones.List()
 | 
						|
			} else {
 | 
						|
				values = []string{v}
 | 
						|
			}
 | 
						|
			requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if populateAffinity {
 | 
						|
		if newVolume.Spec.NodeAffinity == nil {
 | 
						|
			newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
 | 
						|
		}
 | 
						|
		if newVolume.Spec.NodeAffinity.Required == nil {
 | 
						|
			newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector)
 | 
						|
		}
 | 
						|
		if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
 | 
						|
			// Need at least one term pre-allocated whose MatchExpressions can be appended to
 | 
						|
			newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
 | 
						|
		}
 | 
						|
		// Populate NodeAffinity with requirements if there are no conflicting keys found
 | 
						|
		if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
 | 
						|
			klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
 | 
						|
				requirements, newVolume.Spec.NodeAffinity)
 | 
						|
		} else {
 | 
						|
			for _, req := range requirements {
 | 
						|
				for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms {
 | 
						|
					newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName)
 | 
						|
	klog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name)
 | 
						|
 | 
						|
	oldData, err := json.Marshal(vol)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err)
 | 
						|
	}
 | 
						|
 | 
						|
	newData, err := json.Marshal(newVolume)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err)
 | 
						|
	}
 | 
						|
 | 
						|
	patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{})
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err)
 | 
						|
	}
 | 
						|
	return patch, nil
 | 
						|
}
 | 
						|
 | 
						|
func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error {
 | 
						|
	volName := vol.Name
 | 
						|
	klog.V(4).Infof("updating PersistentVolume %s", volName)
 | 
						|
	patchBytes, err := pvlc.createPatch(vol, volLabels)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = pvlc.kubeClient.CoreV1().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err)
 | 
						|
	}
 | 
						|
	klog.V(4).Infof("updated PersistentVolume %s", volName)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func removeInitializer(initializers *metav1.Initializers, name string) *metav1.Initializers {
 | 
						|
	if initializers == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	var updated []metav1.Initializer
 | 
						|
	for _, pending := range initializers.Pending {
 | 
						|
		if pending.Name != name {
 | 
						|
			updated = append(updated, pending)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if len(updated) == len(initializers.Pending) {
 | 
						|
		return initializers
 | 
						|
	}
 | 
						|
	if len(updated) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	return &metav1.Initializers{Pending: updated}
 | 
						|
}
 | 
						|
 | 
						|
// needsInitialization checks whether or not the PVL is the next pending initializer.
 | 
						|
func needsInitialization(initializers *metav1.Initializers, name string) bool {
 | 
						|
	if initializers == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if len(initializers.Pending) == 0 {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	// There is at least one initializer still pending so check to
 | 
						|
	// see if the PVL is the next in line.
 | 
						|
	return initializers.Pending[0].Name == name
 | 
						|
}
 |