mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			302 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			302 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors All rights reserved.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package persistentvolume
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
	"k8s.io/kubernetes/pkg/api"
 | 
						|
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
						|
	"k8s.io/kubernetes/pkg/client/cache"
 | 
						|
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
						|
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						|
	"k8s.io/kubernetes/pkg/controller/framework"
 | 
						|
	"k8s.io/kubernetes/pkg/fields"
 | 
						|
	"k8s.io/kubernetes/pkg/labels"
 | 
						|
	"k8s.io/kubernetes/pkg/runtime"
 | 
						|
	"k8s.io/kubernetes/pkg/types"
 | 
						|
	ioutil "k8s.io/kubernetes/pkg/util/io"
 | 
						|
	"k8s.io/kubernetes/pkg/util/mount"
 | 
						|
	"k8s.io/kubernetes/pkg/volume"
 | 
						|
	"k8s.io/kubernetes/pkg/watch"
 | 
						|
)
 | 
						|
 | 
						|
var _ volume.VolumeHost = &PersistentVolumeRecycler{}
 | 
						|
 | 
						|
// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims.
 | 
						|
// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them
 | 
						|
// available again for a new claim.
 | 
						|
type PersistentVolumeRecycler struct {
 | 
						|
	volumeController *framework.Controller
 | 
						|
	stopChannel      chan struct{}
 | 
						|
	client           recyclerClient
 | 
						|
	kubeClient       client.Interface
 | 
						|
	pluginMgr        volume.VolumePluginMgr
 | 
						|
}
 | 
						|
 | 
						|
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
 | 
						|
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) {
 | 
						|
	recyclerClient := NewRecyclerClient(kubeClient)
 | 
						|
	recycler := &PersistentVolumeRecycler{
 | 
						|
		client:     recyclerClient,
 | 
						|
		kubeClient: kubeClient,
 | 
						|
	}
 | 
						|
 | 
						|
	if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
 | 
						|
		return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	_, volumeController := framework.NewInformer(
 | 
						|
		&cache.ListWatch{
 | 
						|
			ListFunc: func() (runtime.Object, error) {
 | 
						|
				return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything(), unversioned.ListOptions{})
 | 
						|
			},
 | 
						|
			WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
 | 
						|
				return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), options)
 | 
						|
			},
 | 
						|
		},
 | 
						|
		&api.PersistentVolume{},
 | 
						|
		// TODO: Can we have much longer period here?
 | 
						|
		syncPeriod,
 | 
						|
		framework.ResourceEventHandlerFuncs{
 | 
						|
			AddFunc: func(obj interface{}) {
 | 
						|
				pv := obj.(*api.PersistentVolume)
 | 
						|
				recycler.reclaimVolume(pv)
 | 
						|
			},
 | 
						|
			UpdateFunc: func(oldObj, newObj interface{}) {
 | 
						|
				pv := newObj.(*api.PersistentVolume)
 | 
						|
				recycler.reclaimVolume(pv)
 | 
						|
			},
 | 
						|
		},
 | 
						|
	)
 | 
						|
 | 
						|
	recycler.volumeController = volumeController
 | 
						|
	return recycler, nil
 | 
						|
}
 | 
						|
 | 
						|
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
 | 
						|
	if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil {
 | 
						|
		glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
 | 
						|
 | 
						|
		latest, err := recycler.client.GetPersistentVolume(pv.Name)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
 | 
						|
		}
 | 
						|
		if latest.Status.Phase != api.VolumeReleased {
 | 
						|
			return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s.  Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased)
 | 
						|
		}
 | 
						|
 | 
						|
		// both handleRecycle and handleDelete block until completion
 | 
						|
		// TODO: allow parallel recycling operations to increase throughput
 | 
						|
		switch pv.Spec.PersistentVolumeReclaimPolicy {
 | 
						|
		case api.PersistentVolumeReclaimRecycle:
 | 
						|
			err = recycler.handleRecycle(pv)
 | 
						|
		case api.PersistentVolumeReclaimDelete:
 | 
						|
			err = recycler.handleDelete(pv)
 | 
						|
		case api.PersistentVolumeReclaimRetain:
 | 
						|
			glog.V(5).Infof("Volume %s is set to retain after release.  Skipping.\n", pv.Name)
 | 
						|
		default:
 | 
						|
			err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv)
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err)
 | 
						|
			glog.Errorf(errMsg)
 | 
						|
			return fmt.Errorf(errMsg)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
 | 
						|
	glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
 | 
						|
 | 
						|
	currentPhase := pv.Status.Phase
 | 
						|
	nextPhase := currentPhase
 | 
						|
 | 
						|
	spec := volume.NewSpecFromPersistentVolume(pv, false)
 | 
						|
	plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("Could not find recyclable volume plugin for spec: %+v", err)
 | 
						|
	}
 | 
						|
	volRecycler, err := plugin.NewRecycler(spec)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("Could not obtain Recycler for spec: %+v", err)
 | 
						|
	}
 | 
						|
	// blocks until completion
 | 
						|
	err = volRecycler.Recycle()
 | 
						|
	if err != nil {
 | 
						|
		glog.Errorf("PersistentVolume[%s] failed recycling: %+v", pv.Name, err)
 | 
						|
		pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
 | 
						|
		nextPhase = api.VolumeFailed
 | 
						|
	} else {
 | 
						|
		glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
 | 
						|
		nextPhase = api.VolumePending
 | 
						|
		if err != nil {
 | 
						|
			glog.Errorf("Error updating pv.Status: %+v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if currentPhase != nextPhase {
 | 
						|
		glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
 | 
						|
		pv.Status.Phase = nextPhase
 | 
						|
		_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
 | 
						|
		if err != nil {
 | 
						|
			// Rollback to previous phase
 | 
						|
			pv.Status.Phase = currentPhase
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (recycler *PersistentVolumeRecycler) handleDelete(pv *api.PersistentVolume) error {
 | 
						|
	glog.V(5).Infof("Deleting PersistentVolume[%s]\n", pv.Name)
 | 
						|
 | 
						|
	currentPhase := pv.Status.Phase
 | 
						|
	nextPhase := currentPhase
 | 
						|
 | 
						|
	spec := volume.NewSpecFromPersistentVolume(pv, false)
 | 
						|
	plugin, err := recycler.pluginMgr.FindDeletablePluginBySpec(spec)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("Could not find deletable volume plugin for spec: %+v", err)
 | 
						|
	}
 | 
						|
	deleter, err := plugin.NewDeleter(spec)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("could not obtain Deleter for spec: %+v", err)
 | 
						|
	}
 | 
						|
	// blocks until completion
 | 
						|
	err = deleter.Delete()
 | 
						|
	if err != nil {
 | 
						|
		glog.Errorf("PersistentVolume[%s] failed deletion: %+v", pv.Name, err)
 | 
						|
		pv.Status.Message = fmt.Sprintf("Deletion error: %s", err)
 | 
						|
		nextPhase = api.VolumeFailed
 | 
						|
	} else {
 | 
						|
		glog.V(5).Infof("PersistentVolume[%s] successfully deleted through plugin\n", pv.Name)
 | 
						|
		// after successful deletion through the plugin, we can also remove the PV from the cluster
 | 
						|
		err = recycler.client.DeletePersistentVolume(pv)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("error deleting persistent volume: %+v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if currentPhase != nextPhase {
 | 
						|
		glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
 | 
						|
		pv.Status.Phase = nextPhase
 | 
						|
		_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
 | 
						|
		if err != nil {
 | 
						|
			// Rollback to previous phase
 | 
						|
			pv.Status.Phase = currentPhase
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Run starts this recycler's control loops
 | 
						|
func (recycler *PersistentVolumeRecycler) Run() {
 | 
						|
	glog.V(5).Infof("Starting PersistentVolumeRecycler\n")
 | 
						|
	if recycler.stopChannel == nil {
 | 
						|
		recycler.stopChannel = make(chan struct{})
 | 
						|
		go recycler.volumeController.Run(recycler.stopChannel)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Stop gracefully shuts down this binder
 | 
						|
func (recycler *PersistentVolumeRecycler) Stop() {
 | 
						|
	glog.V(5).Infof("Stopping PersistentVolumeRecycler\n")
 | 
						|
	if recycler.stopChannel != nil {
 | 
						|
		close(recycler.stopChannel)
 | 
						|
		recycler.stopChannel = nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// recyclerClient abstracts access to PVs
 | 
						|
type recyclerClient interface {
 | 
						|
	GetPersistentVolume(name string) (*api.PersistentVolume, error)
 | 
						|
	UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
 | 
						|
	DeletePersistentVolume(volume *api.PersistentVolume) error
 | 
						|
	UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
 | 
						|
}
 | 
						|
 | 
						|
func NewRecyclerClient(c client.Interface) recyclerClient {
 | 
						|
	return &realRecyclerClient{c}
 | 
						|
}
 | 
						|
 | 
						|
type realRecyclerClient struct {
 | 
						|
	client client.Interface
 | 
						|
}
 | 
						|
 | 
						|
func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
 | 
						|
	return c.client.PersistentVolumes().Get(name)
 | 
						|
}
 | 
						|
 | 
						|
func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
 | 
						|
	return c.client.PersistentVolumes().Update(volume)
 | 
						|
}
 | 
						|
 | 
						|
func (c *realRecyclerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
 | 
						|
	return c.client.PersistentVolumes().Delete(volume.Name)
 | 
						|
}
 | 
						|
 | 
						|
func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
 | 
						|
	return c.client.PersistentVolumes().UpdateStatus(volume)
 | 
						|
}
 | 
						|
 | 
						|
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
 | 
						|
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
 | 
						|
func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string {
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string {
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface {
 | 
						|
	return f.kubeClient
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) {
 | 
						|
	return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation")
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (volume.Cleaner, error) {
 | 
						|
	return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation")
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetCloudProvider() cloudprovider.Interface {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetMounter() mount.Interface {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetWriter() ioutil.Writer {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (f *PersistentVolumeRecycler) GetHostName() string {
 | 
						|
	return ""
 | 
						|
}
 |