mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #126745 from hungnguyen243/pvcScalabilityFix
Improve PVC protection controller's scalability by batch-processing PVCs by namespace & caching live pod list results [fixed dead loop issue with idle work queue]
This commit is contained in:
		@@ -19,6 +19,7 @@ package pvcprotection
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
@@ -41,6 +42,65 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// Controller is controller that removes PVCProtectionFinalizer
 | 
					// Controller is controller that removes PVCProtectionFinalizer
 | 
				
			||||||
// from PVCs that are used by no pods.
 | 
					// from PVCs that are used by no pods.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type LazyLivePodList struct {
 | 
				
			||||||
 | 
						cache      []v1.Pod
 | 
				
			||||||
 | 
						controller *Controller
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ll *LazyLivePodList) getCache() []v1.Pod {
 | 
				
			||||||
 | 
						return ll.cache
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ll *LazyLivePodList) setCache(pods []v1.Pod) {
 | 
				
			||||||
 | 
						ll.cache = pods
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type pvcData struct {
 | 
				
			||||||
 | 
						pvcKey  string
 | 
				
			||||||
 | 
						pvcName string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type pvcProcessingStore struct {
 | 
				
			||||||
 | 
						namespaceToPVCsMap map[string][]pvcData
 | 
				
			||||||
 | 
						namespaceQueue     workqueue.TypedInterface[string]
 | 
				
			||||||
 | 
						mu                 sync.Mutex
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewPVCProcessingStore() *pvcProcessingStore {
 | 
				
			||||||
 | 
						return &pvcProcessingStore{
 | 
				
			||||||
 | 
							namespaceToPVCsMap: make(map[string][]pvcData),
 | 
				
			||||||
 | 
							namespaceQueue:     workqueue.NewTyped[string](),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *pvcProcessingStore) addOrUpdate(namespace string, pvcKey, pvcName string) {
 | 
				
			||||||
 | 
						m.mu.Lock()
 | 
				
			||||||
 | 
						defer m.mu.Unlock()
 | 
				
			||||||
 | 
						if _, exists := m.namespaceToPVCsMap[namespace]; !exists {
 | 
				
			||||||
 | 
							m.namespaceToPVCsMap[namespace] = make([]pvcData, 0)
 | 
				
			||||||
 | 
							m.namespaceQueue.Add(namespace)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						m.namespaceToPVCsMap[namespace] = append(m.namespaceToPVCsMap[namespace], pvcData{pvcKey: pvcKey, pvcName: pvcName})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Returns a list of pvcs and the associated namespace to be processed downstream
 | 
				
			||||||
 | 
					func (m *pvcProcessingStore) flushNextPVCsByNamespace() ([]pvcData, string) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nextNamespace, quit := m.namespaceQueue.Get()
 | 
				
			||||||
 | 
						if quit {
 | 
				
			||||||
 | 
							return nil, nextNamespace
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m.mu.Lock()
 | 
				
			||||||
 | 
						defer m.mu.Unlock()
 | 
				
			||||||
 | 
						pvcs := m.namespaceToPVCsMap[nextNamespace]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						delete(m.namespaceToPVCsMap, nextNamespace)
 | 
				
			||||||
 | 
						m.namespaceQueue.Done(nextNamespace)
 | 
				
			||||||
 | 
						return pvcs, nextNamespace
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Controller struct {
 | 
					type Controller struct {
 | 
				
			||||||
	client clientset.Interface
 | 
						client clientset.Interface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -51,7 +111,8 @@ type Controller struct {
 | 
				
			|||||||
	podListerSynced cache.InformerSynced
 | 
						podListerSynced cache.InformerSynced
 | 
				
			||||||
	podIndexer      cache.Indexer
 | 
						podIndexer      cache.Indexer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	queue workqueue.TypedRateLimitingInterface[string]
 | 
						queue              workqueue.TypedRateLimitingInterface[string]
 | 
				
			||||||
 | 
						pvcProcessingStore *pvcProcessingStore
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewPVCProtectionController returns a new instance of PVCProtectionController.
 | 
					// NewPVCProtectionController returns a new instance of PVCProtectionController.
 | 
				
			||||||
@@ -62,6 +123,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
 | 
				
			|||||||
			workqueue.DefaultTypedControllerRateLimiter[string](),
 | 
								workqueue.DefaultTypedControllerRateLimiter[string](),
 | 
				
			||||||
			workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"},
 | 
								workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"},
 | 
				
			||||||
		),
 | 
							),
 | 
				
			||||||
 | 
							pvcProcessingStore: NewPVCProcessingStore(),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	e.pvcLister = pvcInformer.Lister()
 | 
						e.pvcLister = pvcInformer.Lister()
 | 
				
			||||||
@@ -100,6 +162,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
 | 
				
			|||||||
func (c *Controller) Run(ctx context.Context, workers int) {
 | 
					func (c *Controller) Run(ctx context.Context, workers int) {
 | 
				
			||||||
	defer utilruntime.HandleCrash()
 | 
						defer utilruntime.HandleCrash()
 | 
				
			||||||
	defer c.queue.ShutDown()
 | 
						defer c.queue.ShutDown()
 | 
				
			||||||
 | 
						defer c.pvcProcessingStore.namespaceQueue.ShutDown()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	logger := klog.FromContext(ctx)
 | 
						logger := klog.FromContext(ctx)
 | 
				
			||||||
	logger.Info("Starting PVC protection controller")
 | 
						logger.Info("Starting PVC protection controller")
 | 
				
			||||||
@@ -109,25 +172,31 @@ func (c *Controller) Run(ctx context.Context, workers int) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go wait.UntilWithContext(ctx, c.runMainWorker, time.Second)
 | 
				
			||||||
	for i := 0; i < workers; i++ {
 | 
						for i := 0; i < workers; i++ {
 | 
				
			||||||
		go wait.UntilWithContext(ctx, c.runWorker, time.Second)
 | 
							go wait.UntilWithContext(ctx, c.runProcessNamespaceWorker, time.Second)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	<-ctx.Done()
 | 
						<-ctx.Done()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Controller) runWorker(ctx context.Context) {
 | 
					// Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map
 | 
				
			||||||
	for c.processNextWorkItem(ctx) {
 | 
					func (c *Controller) runMainWorker(ctx context.Context) {
 | 
				
			||||||
 | 
						for c.processNextWorkItem() {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// processNextWorkItem deals with one pvcKey off the queue.  It returns false when it's time to quit.
 | 
					// Consumer worker pulls items off namespace queue and processes associated PVCs
 | 
				
			||||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
 | 
					func (c *Controller) runProcessNamespaceWorker(ctx context.Context) {
 | 
				
			||||||
 | 
						for c.processPVCsByNamespace(ctx) {
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Controller) processNextWorkItem() bool {
 | 
				
			||||||
	pvcKey, quit := c.queue.Get()
 | 
						pvcKey, quit := c.queue.Get()
 | 
				
			||||||
	if quit {
 | 
						if quit {
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer c.queue.Done(pvcKey)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
 | 
						pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -135,19 +204,32 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
 | 
				
			|||||||
		return true
 | 
							return true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = c.processPVC(ctx, pvcNamespace, pvcName)
 | 
						c.pvcProcessingStore.addOrUpdate(pvcNamespace, pvcKey, pvcName)
 | 
				
			||||||
	if err == nil {
 | 
					 | 
				
			||||||
		c.queue.Forget(pvcKey)
 | 
					 | 
				
			||||||
		return true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %w", pvcKey, err))
 | 
					 | 
				
			||||||
	c.queue.AddRateLimited(pvcKey)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error {
 | 
					func (c *Controller) processPVCsByNamespace(ctx context.Context) bool {
 | 
				
			||||||
 | 
						pvcList, namespace := c.pvcProcessingStore.flushNextPVCsByNamespace()
 | 
				
			||||||
 | 
						if pvcList == nil {
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						lazyLivePodList := &LazyLivePodList{controller: c}
 | 
				
			||||||
 | 
						for _, item := range pvcList {
 | 
				
			||||||
 | 
							pvcKey, pvcName := item.pvcKey, item.pvcName
 | 
				
			||||||
 | 
							err := c.processPVC(ctx, namespace, pvcName, lazyLivePodList)
 | 
				
			||||||
 | 
							if err == nil {
 | 
				
			||||||
 | 
								c.queue.Forget(pvcKey)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								c.queue.AddRateLimited(pvcKey)
 | 
				
			||||||
 | 
								utilruntime.HandleError(fmt.Errorf("PVC %v/%v failed with: %w", pvcName, namespace, err))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							c.queue.Done(pvcKey)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return true
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string, lazyLivePodList *LazyLivePodList) error {
 | 
				
			||||||
	logger := klog.FromContext(ctx)
 | 
						logger := klog.FromContext(ctx)
 | 
				
			||||||
	logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
 | 
						logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
 | 
				
			||||||
	startTime := time.Now()
 | 
						startTime := time.Now()
 | 
				
			||||||
@@ -167,7 +249,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin
 | 
				
			|||||||
	if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
 | 
						if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
 | 
				
			||||||
		// PVC should be deleted. Check if it's used and remove finalizer if
 | 
							// PVC should be deleted. Check if it's used and remove finalizer if
 | 
				
			||||||
		// it's not.
 | 
							// it's not.
 | 
				
			||||||
		isUsed, err := c.isBeingUsed(ctx, pvc)
 | 
							isUsed, err := c.isBeingUsed(ctx, pvc, lazyLivePodList)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -213,7 +295,7 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
 | 
					func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
 | 
				
			||||||
	// Look for a Pod using pvc in the Informer's cache. If one is found the
 | 
						// Look for a Pod using pvc in the Informer's cache. If one is found the
 | 
				
			||||||
	// correct decision to keep pvc is taken without doing an expensive live
 | 
						// correct decision to keep pvc is taken without doing an expensive live
 | 
				
			||||||
	// list.
 | 
						// list.
 | 
				
			||||||
@@ -228,8 +310,12 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl
 | 
				
			|||||||
	// Even if no Pod using pvc was found in the Informer's cache it doesn't
 | 
						// Even if no Pod using pvc was found in the Informer's cache it doesn't
 | 
				
			||||||
	// mean such a Pod doesn't exist: it might just not be in the cache yet. To
 | 
						// mean such a Pod doesn't exist: it might just not be in the cache yet. To
 | 
				
			||||||
	// be 100% confident that it is safe to delete pvc make sure no Pod is using
 | 
						// be 100% confident that it is safe to delete pvc make sure no Pod is using
 | 
				
			||||||
	// it among those returned by a live list.
 | 
						// it among those returned by a "lazy" live list.
 | 
				
			||||||
	return c.askAPIServer(ctx, pvc)
 | 
					
 | 
				
			||||||
 | 
						// Use a "lazy" live pod list: lazyLivePodList caches the first successful live pod list response,
 | 
				
			||||||
 | 
						// so for a large number of PVC deletions in a short duration, subsequent requests can use the cached pod list
 | 
				
			||||||
 | 
						// instead of issuing a lot of API requests. The cache is refreshed for each run of processNextWorkItem().
 | 
				
			||||||
 | 
						return c.askAPIServer(ctx, pvc, lazyLivePodList)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) {
 | 
					func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) {
 | 
				
			||||||
@@ -258,16 +344,24 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla
 | 
				
			|||||||
	return false, nil
 | 
						return false, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
 | 
					func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
 | 
				
			||||||
	logger := klog.FromContext(ctx)
 | 
						logger := klog.FromContext(ctx)
 | 
				
			||||||
	logger.V(4).Info("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc))
 | 
						logger.V(4).Info("Looking for Pods using PVC", "PVC", klog.KObj(pvc))
 | 
				
			||||||
 | 
						if lazyLivePodList.getCache() == nil {
 | 
				
			||||||
 | 
							logger.V(4).Info("Live listing Pods in namespace", "namespace", pvc.Namespace)
 | 
				
			||||||
 | 
							podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return false, fmt.Errorf("live list of pods failed: %s", err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
 | 
							if podsList.Items == nil {
 | 
				
			||||||
	if err != nil {
 | 
								lazyLivePodList.setCache(make([]v1.Pod, 0))
 | 
				
			||||||
		return false, fmt.Errorf("live list of pods failed: %s", err.Error())
 | 
							} else {
 | 
				
			||||||
 | 
								lazyLivePodList.setCache(podsList.Items)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, pod := range podsList.Items {
 | 
						for _, pod := range lazyLivePodList.getCache() {
 | 
				
			||||||
		if c.podUsesPVC(logger, &pod, pvc) {
 | 
							if c.podUsesPVC(logger, &pod, pvc) {
 | 
				
			||||||
			return true, nil
 | 
								return true, nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -47,6 +47,7 @@ type reaction struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	defaultNS       = "default"
 | 
						defaultNS       = "default"
 | 
				
			||||||
 | 
						namespace2      = "namespace-2"
 | 
				
			||||||
	defaultPVCName  = "pvc1"
 | 
						defaultPVCName  = "pvc1"
 | 
				
			||||||
	defaultPodName  = "pod1"
 | 
						defaultPodName  = "pod1"
 | 
				
			||||||
	defaultNodeName = "node1"
 | 
						defaultNodeName = "node1"
 | 
				
			||||||
@@ -69,6 +70,22 @@ func pod() *v1.Pod {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func podWithConfig(name string, namespace string) *v1.Pod {
 | 
				
			||||||
 | 
						return &v1.Pod{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Name:      name,
 | 
				
			||||||
 | 
								Namespace: namespace,
 | 
				
			||||||
 | 
								UID:       defaultUID,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: v1.PodSpec{
 | 
				
			||||||
 | 
								NodeName: defaultNodeName,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Status: v1.PodStatus{
 | 
				
			||||||
 | 
								Phase: v1.PodPending,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func unscheduled(pod *v1.Pod) *v1.Pod {
 | 
					func unscheduled(pod *v1.Pod) *v1.Pod {
 | 
				
			||||||
	pod.Spec.NodeName = ""
 | 
						pod.Spec.NodeName = ""
 | 
				
			||||||
	return pod
 | 
						return pod
 | 
				
			||||||
@@ -117,6 +134,15 @@ func pvc() *v1.PersistentVolumeClaim {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func pvcWithConfig(name string, namespace string) *v1.PersistentVolumeClaim {
 | 
				
			||||||
 | 
						return &v1.PersistentVolumeClaim{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Name:      name,
 | 
				
			||||||
 | 
								Namespace: namespace,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func withProtectionFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
 | 
					func withProtectionFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
 | 
				
			||||||
	pvc.Finalizers = append(pvc.Finalizers, volumeutil.PVCProtectionFinalizer)
 | 
						pvc.Finalizers = append(pvc.Finalizers, volumeutil.PVCProtectionFinalizer)
 | 
				
			||||||
	return pvc
 | 
						return pvc
 | 
				
			||||||
@@ -146,6 +172,23 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func generatePodListErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc {
 | 
				
			||||||
 | 
						i := 0
 | 
				
			||||||
 | 
						return func(action clienttesting.Action) (bool, runtime.Object, error) {
 | 
				
			||||||
 | 
							i++
 | 
				
			||||||
 | 
							if i <= failures {
 | 
				
			||||||
 | 
								// Pod List fails
 | 
				
			||||||
 | 
								list, ok := action.(clienttesting.ListAction)
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									t.Fatalf("Reactor got non-list action: %+v", action)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return true, nil, apierrors.NewForbidden(list.GetResource().GroupResource(), "mock pod", errors.New("Mock error"))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// List succeeds
 | 
				
			||||||
 | 
							return false, nil, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestPVCProtectionController(t *testing.T) {
 | 
					func TestPVCProtectionController(t *testing.T) {
 | 
				
			||||||
	pvcGVR := schema.GroupVersionResource{
 | 
						pvcGVR := schema.GroupVersionResource{
 | 
				
			||||||
		Group:    v1.GroupName,
 | 
							Group:    v1.GroupName,
 | 
				
			||||||
@@ -175,7 +218,7 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
		reactors []reaction
 | 
							reactors []reaction
 | 
				
			||||||
		// PVC event to simulate. This PVC will be automatically added to
 | 
							// PVC event to simulate. This PVC will be automatically added to
 | 
				
			||||||
		// initialObjects.
 | 
							// initialObjects.
 | 
				
			||||||
		updatedPVC *v1.PersistentVolumeClaim
 | 
							updatedPVCs []*v1.PersistentVolumeClaim
 | 
				
			||||||
		// Pod event to simulate. This Pod will be automatically added to
 | 
							// Pod event to simulate. This Pod will be automatically added to
 | 
				
			||||||
		// initialObjects.
 | 
							// initialObjects.
 | 
				
			||||||
		updatedPod *v1.Pod
 | 
							updatedPod *v1.Pod
 | 
				
			||||||
@@ -190,20 +233,21 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
		// PVC events
 | 
							// PVC events
 | 
				
			||||||
		//
 | 
							//
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:       "PVC without finalizer -> finalizer is added",
 | 
								name:        "PVC without finalizer -> finalizer is added",
 | 
				
			||||||
			updatedPVC: pvc(),
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{pvc(), pvcWithConfig("pvc2", "namespace-2")},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
				clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, "namespace-2", withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2"))),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:            "PVC with finalizer -> no action",
 | 
								name:            "PVC with finalizer -> no action",
 | 
				
			||||||
			updatedPVC:      withProtectionFinalizer(pvc()),
 | 
								updatedPVCs:     []*v1.PersistentVolumeClaim{withProtectionFinalizer(pvc()), withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2"))},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{},
 | 
								expectedActions: []clienttesting.Action{},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:       "saving PVC finalizer fails -> controller retries",
 | 
								name:        "saving PVC finalizer fails -> controller retries",
 | 
				
			||||||
			updatedPVC: pvc(),
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{pvc()},
 | 
				
			||||||
			reactors: []reaction{
 | 
								reactors: []reaction{
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
					verb:      "update",
 | 
										verb:      "update",
 | 
				
			||||||
@@ -221,16 +265,30 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:       "deleted PVC with finalizer -> finalizer is removed",
 | 
								name: "deleted PVC with finalizers across different namespaces -> finalizer is removed",
 | 
				
			||||||
			updatedPVC: deleted(withProtectionFinalizer(pvc())),
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())),
 | 
				
			||||||
 | 
									deleted(withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2")))},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
				clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
				clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, "namespace-2", metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, "namespace-2", deleted(pvcWithConfig("pvc2", "namespace-2"))),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:       "finalizer removal fails -> controller retries",
 | 
								name: "multiple PVCs with finalizer for the same namespace; no alive pods -> finalizer is removed",
 | 
				
			||||||
			updatedPVC: deleted(withProtectionFinalizer(pvc())),
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())),
 | 
				
			||||||
 | 
									deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS)))},
 | 
				
			||||||
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "finalizer removal fails -> controller retries",
 | 
				
			||||||
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
 | 
				
			||||||
			reactors: []reaction{
 | 
								reactors: []reaction{
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
					verb:      "update",
 | 
										verb:      "update",
 | 
				
			||||||
@@ -250,12 +308,33 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
				clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:        "delete multiple PVCs of the same namespace; pod list fails for one PVC -> add failing PVC back to queue and continue to the next PVC",
 | 
				
			||||||
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS)))},
 | 
				
			||||||
 | 
								reactors: []reaction{
 | 
				
			||||||
 | 
									{
 | 
				
			||||||
 | 
										verb:      "list",
 | 
				
			||||||
 | 
										resource:  "pods",
 | 
				
			||||||
 | 
										reactorfn: generatePodListErrorFunc(t, 1 /* update fails twice*/),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
 | 
									// Fails
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									// Succeed with next pvc in queue
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed",
 | 
								name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed",
 | 
				
			||||||
			initialObjects: []runtime.Object{
 | 
								initialObjects: []runtime.Object{
 | 
				
			||||||
				withPVC(defaultPVCName, pod()),
 | 
									withPVC(defaultPVCName, pod()),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			updatedPVC:      deleted(withProtectionFinalizer(pvc())),
 | 
								updatedPVCs:     []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{},
 | 
								expectedActions: []clienttesting.Action{},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
@@ -263,18 +342,36 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
			initialObjects: []runtime.Object{
 | 
								initialObjects: []runtime.Object{
 | 
				
			||||||
				withEmptyDir(withPVC("unrelatedPVC", pod())),
 | 
									withEmptyDir(withPVC("unrelatedPVC", pod())),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			updatedPVC: deleted(withProtectionFinalizer(pvc())),
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
				clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
				clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "deleted multiple PVCs with finalizer (same namespace) + pod with unrelated PVC and EmptyDir exists",
 | 
				
			||||||
 | 
								initialObjects: []runtime.Object{
 | 
				
			||||||
 | 
									withEmptyDir(withPVC("unrelatedPVC", pod())),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())),
 | 
				
			||||||
 | 
									deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS))),
 | 
				
			||||||
 | 
									deleted(withProtectionFinalizer(pvcWithConfig("pvc3", defaultNS))),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc3", defaultNS))),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed",
 | 
								name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed",
 | 
				
			||||||
			initialObjects: []runtime.Object{
 | 
								initialObjects: []runtime.Object{
 | 
				
			||||||
				withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())),
 | 
									withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			updatedPVC:      deleted(withProtectionFinalizer(pvc())),
 | 
								updatedPVCs:     []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{},
 | 
								expectedActions: []clienttesting.Action{},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
@@ -283,11 +380,27 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
				withPVC(defaultPVCName, pod()),
 | 
									withPVC(defaultPVCName, pod()),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			informersAreLate: true,
 | 
								informersAreLate: true,
 | 
				
			||||||
			updatedPVC:       deleted(withProtectionFinalizer(pvc())),
 | 
								updatedPVCs:      []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
 | 
				
			||||||
			expectedActions: []clienttesting.Action{
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
				clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "mix of deleted PVCs with some used by a pod and some unused -> finalizer is removed only for unused PVCs",
 | 
				
			||||||
 | 
								initialObjects: []runtime.Object{
 | 
				
			||||||
 | 
									withPVC(defaultPVCName, pod()),
 | 
				
			||||||
 | 
									withPVC("pvc3", podWithConfig("pod2", "namespace-3")),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								informersAreLate: true,
 | 
				
			||||||
 | 
								updatedPVCs:      []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS))), deleted(withProtectionFinalizer(pvcWithConfig("pvc3", "namespace-3")))},
 | 
				
			||||||
 | 
								expectedActions: []clienttesting.Action{
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
 | 
				
			||||||
 | 
									clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
 | 
				
			||||||
 | 
									clienttesting.NewListAction(podGVR, podGVK, "namespace-3", metav1.ListOptions{}),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		//
 | 
							//
 | 
				
			||||||
		// Pod events
 | 
							// Pod events
 | 
				
			||||||
		//
 | 
							//
 | 
				
			||||||
@@ -376,9 +489,11 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
			clientObjs    []runtime.Object
 | 
								clientObjs    []runtime.Object
 | 
				
			||||||
			informersObjs []runtime.Object
 | 
								informersObjs []runtime.Object
 | 
				
			||||||
		)
 | 
							)
 | 
				
			||||||
		if test.updatedPVC != nil {
 | 
							if test.updatedPVCs != nil {
 | 
				
			||||||
			clientObjs = append(clientObjs, test.updatedPVC)
 | 
								for i := 0; i < len(test.updatedPVCs); i++ {
 | 
				
			||||||
			informersObjs = append(informersObjs, test.updatedPVC)
 | 
									clientObjs = append(clientObjs, test.updatedPVCs[i])
 | 
				
			||||||
 | 
									informersObjs = append(informersObjs, test.updatedPVCs[i])
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if test.updatedPod != nil {
 | 
							if test.updatedPod != nil {
 | 
				
			||||||
			clientObjs = append(clientObjs, test.updatedPod)
 | 
								clientObjs = append(clientObjs, test.updatedPod)
 | 
				
			||||||
@@ -388,7 +503,6 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
		if !test.informersAreLate {
 | 
							if !test.informersAreLate {
 | 
				
			||||||
			informersObjs = append(informersObjs, test.initialObjects...)
 | 
								informersObjs = append(informersObjs, test.initialObjects...)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					 | 
				
			||||||
		// Create client with initial data
 | 
							// Create client with initial data
 | 
				
			||||||
		client := fake.NewSimpleClientset(clientObjs...)
 | 
							client := fake.NewSimpleClientset(clientObjs...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -423,8 +537,10 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Start the test by simulating an event
 | 
							// Start the test by simulating an event
 | 
				
			||||||
		if test.updatedPVC != nil {
 | 
							if test.updatedPVCs != nil {
 | 
				
			||||||
			ctrl.pvcAddedUpdated(logger, test.updatedPVC)
 | 
								for i := 0; i < len(test.updatedPVCs); i++ {
 | 
				
			||||||
 | 
									ctrl.pvcAddedUpdated(logger, test.updatedPVCs[i])
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		switch {
 | 
							switch {
 | 
				
			||||||
		case test.deletedPod != nil && test.updatedPod != nil && test.deletedPod.Namespace == test.updatedPod.Namespace && test.deletedPod.Name == test.updatedPod.Name:
 | 
							case test.deletedPod != nil && test.updatedPod != nil && test.deletedPod.Namespace == test.updatedPod.Namespace && test.deletedPod.Name == test.updatedPod.Name:
 | 
				
			||||||
@@ -445,12 +561,18 @@ func TestPVCProtectionController(t *testing.T) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			if ctrl.queue.Len() > 0 {
 | 
								if ctrl.queue.Len() > 0 {
 | 
				
			||||||
				logger.V(5).Info("Non-empty queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len())
 | 
									logger.V(5).Info("Non-empty queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len())
 | 
				
			||||||
				ctrl.processNextWorkItem(context.TODO())
 | 
									ctx := context.TODO()
 | 
				
			||||||
 | 
									ctrl.processNextWorkItem()
 | 
				
			||||||
 | 
									for ctrl.pvcProcessingStore.namespaceQueue.Len() != 0 {
 | 
				
			||||||
 | 
										ctrl.processPVCsByNamespace(ctx)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if ctrl.queue.Len() > 0 {
 | 
								if ctrl.queue.Len() > 0 {
 | 
				
			||||||
				// There is still some work in the queue, process it now
 | 
									// There is still some work in the queue, process it now
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			currentActionCount := len(client.Actions())
 | 
								currentActionCount := len(client.Actions())
 | 
				
			||||||
			if currentActionCount < len(test.expectedActions) {
 | 
								if currentActionCount < len(test.expectedActions) {
 | 
				
			||||||
				// Do not log every wait, only when the action count changes.
 | 
									// Do not log every wait, only when the action count changes.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -81,6 +81,7 @@ var CSISuites = append(BaseSuites,
 | 
				
			|||||||
	InitSnapshottableTestSuite,
 | 
						InitSnapshottableTestSuite,
 | 
				
			||||||
	InitSnapshottableStressTestSuite,
 | 
						InitSnapshottableStressTestSuite,
 | 
				
			||||||
	InitVolumePerformanceTestSuite,
 | 
						InitVolumePerformanceTestSuite,
 | 
				
			||||||
 | 
						InitPvcDeletionPerformanceTestSuite,
 | 
				
			||||||
	InitReadWriteOncePodTestSuite,
 | 
						InitReadWriteOncePodTestSuite,
 | 
				
			||||||
	InitVolumeModifyTestSuite,
 | 
						InitVolumeModifyTestSuite,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										248
									
								
								test/e2e/storage/testsuites/pvcdeletionperf.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										248
									
								
								test/e2e/storage/testsuites/pvcdeletionperf.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,248 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package testsuites
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/onsi/ginkgo/v2"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
 | 
						storagev1 "k8s.io/api/storage/v1"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						clientset "k8s.io/client-go/kubernetes"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/test/e2e/framework"
 | 
				
			||||||
 | 
						e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
 | 
				
			||||||
 | 
						e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
 | 
				
			||||||
 | 
						e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
 | 
				
			||||||
 | 
						storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
 | 
				
			||||||
 | 
						admissionapi "k8s.io/pod-security-admission/api"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type pvcDeletionPerformanceTestSuite struct {
 | 
				
			||||||
 | 
						tsInfo storageframework.TestSuiteInfo
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ storageframework.TestSuite = &pvcDeletionPerformanceTestSuite{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const pvcDeletionTestTimeout = 30 * time.Minute
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// InitPvcDeletionPerformanceTestSuite returns pvcDeletionPerformanceTestSuite that implements TestSuite interface
 | 
				
			||||||
 | 
					// This test suite brings up a number of pods and PVCS (configured upstream), deletes the pods, and then deletes the PVCs.
 | 
				
			||||||
 | 
					// The main goal is to record the duration for the PVC/PV deletion process for each run, and so the test doesn't set explicit expectations to match against.
 | 
				
			||||||
 | 
					func InitPvcDeletionPerformanceTestSuite() storageframework.TestSuite {
 | 
				
			||||||
 | 
						return &pvcDeletionPerformanceTestSuite{
 | 
				
			||||||
 | 
							tsInfo: storageframework.TestSuiteInfo{
 | 
				
			||||||
 | 
								Name: "pvc-deletion-performance",
 | 
				
			||||||
 | 
								TestPatterns: []storageframework.TestPattern{
 | 
				
			||||||
 | 
									storageframework.BlockVolModeDynamicPV,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *pvcDeletionPerformanceTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
 | 
				
			||||||
 | 
						return t.tsInfo
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *pvcDeletionPerformanceTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *pvcDeletionPerformanceTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
 | 
				
			||||||
 | 
						type local struct {
 | 
				
			||||||
 | 
							config  *storageframework.PerTestConfig
 | 
				
			||||||
 | 
							cs      clientset.Interface
 | 
				
			||||||
 | 
							ns      *v1.Namespace
 | 
				
			||||||
 | 
							scName  string
 | 
				
			||||||
 | 
							pvcs    []*v1.PersistentVolumeClaim
 | 
				
			||||||
 | 
							options *storageframework.PerformanceTestOptions
 | 
				
			||||||
 | 
							stopCh  chan struct{}
 | 
				
			||||||
 | 
							pods    []*v1.Pod
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var (
 | 
				
			||||||
 | 
							dInfo *storageframework.DriverInfo
 | 
				
			||||||
 | 
							l     *local
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						ginkgo.BeforeEach(func() {
 | 
				
			||||||
 | 
							// Check preconditions
 | 
				
			||||||
 | 
							dDriver := driver.(storageframework.DynamicPVTestDriver)
 | 
				
			||||||
 | 
							if dDriver == nil {
 | 
				
			||||||
 | 
								e2eskipper.Skipf("Test driver does not support dynamically created volumes")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							dInfo = dDriver.GetDriverInfo()
 | 
				
			||||||
 | 
							if dInfo == nil {
 | 
				
			||||||
 | 
								e2eskipper.Skipf("Failed to get Driver info -- skipping")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if dInfo.PerformanceTestOptions == nil || dInfo.PerformanceTestOptions.ProvisioningOptions == nil {
 | 
				
			||||||
 | 
								e2eskipper.Skipf("Driver %s doesn't specify performance test options -- skipping", dInfo.Name)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Set high QPS for the framework to avoid client-side throttling from the test itself,
 | 
				
			||||||
 | 
						// which can interfere with measuring deletion time
 | 
				
			||||||
 | 
						frameworkOptions := framework.Options{
 | 
				
			||||||
 | 
							ClientQPS:   500,
 | 
				
			||||||
 | 
							ClientBurst: 1000,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						f := framework.NewFramework("pvc-deletion-performance", frameworkOptions, nil)
 | 
				
			||||||
 | 
						f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ginkgo.AfterEach(func(ctx context.Context) {
 | 
				
			||||||
 | 
							if l != nil {
 | 
				
			||||||
 | 
								if l.stopCh != nil {
 | 
				
			||||||
 | 
									ginkgo.By("Closing informer channel")
 | 
				
			||||||
 | 
									close(l.stopCh)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								deletingStats := &performanceStats{
 | 
				
			||||||
 | 
									mutex:             &sync.Mutex{},
 | 
				
			||||||
 | 
									perObjectInterval: make(map[string]*interval),
 | 
				
			||||||
 | 
									operationMetrics:  &storageframework.Metrics{},
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								var (
 | 
				
			||||||
 | 
									errs []error
 | 
				
			||||||
 | 
									mu   sync.Mutex
 | 
				
			||||||
 | 
									wg   sync.WaitGroup
 | 
				
			||||||
 | 
								)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								wg.Add(len(l.pods))
 | 
				
			||||||
 | 
								for _, pod := range l.pods {
 | 
				
			||||||
 | 
									go func(pod *v1.Pod) {
 | 
				
			||||||
 | 
										defer ginkgo.GinkgoRecover()
 | 
				
			||||||
 | 
										defer wg.Done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										framework.Logf("Deleting pod %v", pod.Name)
 | 
				
			||||||
 | 
										err := e2epod.DeletePodWithWait(ctx, l.cs, pod)
 | 
				
			||||||
 | 
										mu.Lock()
 | 
				
			||||||
 | 
										defer mu.Unlock()
 | 
				
			||||||
 | 
										errs = append(errs, err)
 | 
				
			||||||
 | 
									}(pod)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								wg.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("Deleting all PVCs")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								startTime := time.Now()
 | 
				
			||||||
 | 
								wg.Add(len(l.pvcs))
 | 
				
			||||||
 | 
								for _, pvc := range l.pvcs {
 | 
				
			||||||
 | 
									go func(pvc *v1.PersistentVolumeClaim) { // Start a goroutine for each PVC
 | 
				
			||||||
 | 
										defer wg.Done() // Decrement the counter when the goroutine finishes
 | 
				
			||||||
 | 
										startDeletingPvcTime := time.Now()
 | 
				
			||||||
 | 
										framework.Logf("Start deleting PVC %v", pvc.GetName())
 | 
				
			||||||
 | 
										deletingStats.mutex.Lock()
 | 
				
			||||||
 | 
										deletingStats.perObjectInterval[pvc.Name] = &interval{
 | 
				
			||||||
 | 
											create: startDeletingPvcTime,
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										deletingStats.mutex.Unlock()
 | 
				
			||||||
 | 
										err := e2epv.DeletePersistentVolumeClaim(ctx, l.cs, pvc.Name, pvc.Namespace)
 | 
				
			||||||
 | 
										framework.ExpectNoError(err)
 | 
				
			||||||
 | 
										startDeletingPVTime := time.Now()
 | 
				
			||||||
 | 
										err = e2epv.WaitForPersistentVolumeDeleted(ctx, l.cs, pvc.Spec.VolumeName, 1*time.Second, 100*time.Minute)
 | 
				
			||||||
 | 
										framework.Logf("Deleted PV %v, PVC %v in %v", pvc.Spec.VolumeName, pvc.GetName(), time.Since(startDeletingPVTime))
 | 
				
			||||||
 | 
										framework.ExpectNoError(err)
 | 
				
			||||||
 | 
									}(pvc)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								wg.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								endTime := time.Now() // Capture overall end time
 | 
				
			||||||
 | 
								totalDuration := endTime.Sub(startTime)
 | 
				
			||||||
 | 
								framework.Logf("Deleted all PVC/PVs in %v", totalDuration) // Log total deletion time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By(fmt.Sprintf("Deleting Storage Class %s", l.scName))
 | 
				
			||||||
 | 
								err := l.cs.StorageV1().StorageClasses().Delete(ctx, l.scName, metav1.DeleteOptions{})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								ginkgo.By("Local l setup is nil")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						f.It("should delete volumes at scale within performance constraints", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
 | 
				
			||||||
 | 
							l = &local{
 | 
				
			||||||
 | 
								cs:      f.ClientSet,
 | 
				
			||||||
 | 
								ns:      f.Namespace,
 | 
				
			||||||
 | 
								options: dInfo.PerformanceTestOptions,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							l.config = driver.PrepareTest(ctx, f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							sc := driver.(storageframework.DynamicPVTestDriver).GetDynamicProvisionStorageClass(ctx, l.config, pattern.FsType)
 | 
				
			||||||
 | 
							ginkgo.By(fmt.Sprintf("Creating Storage Class %v", sc))
 | 
				
			||||||
 | 
							if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
 | 
				
			||||||
 | 
								e2eskipper.Skipf("WaitForFirstConsumer binding mode currently not supported for this test")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							ginkgo.By(fmt.Sprintf("Creating Storage Class %s", sc.Name))
 | 
				
			||||||
 | 
							sc, err := l.cs.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{})
 | 
				
			||||||
 | 
							framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							l.scName = sc.Name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Stats for volume provisioning operation; we only need this because imported function newPVCWatch from volumeperf.go requires this as an argument
 | 
				
			||||||
 | 
							// (this test itself doesn't use these stats)
 | 
				
			||||||
 | 
							provisioningStats := &performanceStats{
 | 
				
			||||||
 | 
								mutex:             &sync.Mutex{},
 | 
				
			||||||
 | 
								perObjectInterval: make(map[string]*interval),
 | 
				
			||||||
 | 
								operationMetrics:  &storageframework.Metrics{},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// Create a controller to watch on PVCs
 | 
				
			||||||
 | 
							// When all PVCs provisioned by this test are in the Bound state, the controller
 | 
				
			||||||
 | 
							// sends a signal to the channel
 | 
				
			||||||
 | 
							controller := newPVCWatch(ctx, f, l.options.ProvisioningOptions.Count, provisioningStats)
 | 
				
			||||||
 | 
							l.stopCh = make(chan struct{})
 | 
				
			||||||
 | 
							go controller.Run(l.stopCh)
 | 
				
			||||||
 | 
							waitForProvisionCh = make(chan []*v1.PersistentVolumeClaim)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.By(fmt.Sprintf("Creating %d PVCs of size %s", l.options.ProvisioningOptions.Count, l.options.ProvisioningOptions.VolumeSize))
 | 
				
			||||||
 | 
							for i := 0; i < l.options.ProvisioningOptions.Count; i++ {
 | 
				
			||||||
 | 
								pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
 | 
				
			||||||
 | 
									ClaimSize:        l.options.ProvisioningOptions.VolumeSize,
 | 
				
			||||||
 | 
									StorageClassName: &sc.Name,
 | 
				
			||||||
 | 
								}, l.ns.Name)
 | 
				
			||||||
 | 
								pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(ctx, pvc, metav1.CreateOptions{})
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
								// Store create time for each PVC
 | 
				
			||||||
 | 
								provisioningStats.mutex.Lock()
 | 
				
			||||||
 | 
								provisioningStats.perObjectInterval[pvc.Name] = &interval{
 | 
				
			||||||
 | 
									create: pvc.CreationTimestamp.Time,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								provisioningStats.mutex.Unlock()
 | 
				
			||||||
 | 
								// Create pods
 | 
				
			||||||
 | 
								podConfig := e2epod.Config{
 | 
				
			||||||
 | 
									NS:           l.ns.Name,
 | 
				
			||||||
 | 
									SeLinuxLabel: e2epv.SELinuxLabel,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								pod, _ := e2epod.MakeSecPod(&podConfig)
 | 
				
			||||||
 | 
								_, err = l.cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									framework.Failf("Failed to create pod [%+v]. Error: %v", pod, err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								l.pods = append(l.pods, pod)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.By("Waiting for all PVCs to be Bound...")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case l.pvcs = <-waitForProvisionCh:
 | 
				
			||||||
 | 
								framework.Logf("All PVCs in Bound state")
 | 
				
			||||||
 | 
							case <-time.After(pvcDeletionTestTimeout):
 | 
				
			||||||
 | 
								ginkgo.Fail(fmt.Sprintf("expected all PVCs to be in Bound state within %v", pvcDeletionTestTimeout.Round(time.Second)))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user