mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Move CSIDriver Lister to the controller
This commit is contained in:
		@@ -223,6 +223,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
 | 
			
		||||
			ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
 | 
			
		||||
			ctx.InformerFactory.Core().V1().PersistentVolumes(),
 | 
			
		||||
			ctx.InformerFactory.Storage().V1beta1().CSINodes(),
 | 
			
		||||
			ctx.InformerFactory.Storage().V1beta1().CSIDrivers(),
 | 
			
		||||
			ctx.Cloud,
 | 
			
		||||
			ProbeAttachableVolumePlugins(),
 | 
			
		||||
			GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
 | 
			
		||||
 
 | 
			
		||||
@@ -106,6 +106,7 @@ func NewAttachDetachController(
 | 
			
		||||
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
 | 
			
		||||
	pvInformer coreinformers.PersistentVolumeInformer,
 | 
			
		||||
	csiNodeInformer storageinformers.CSINodeInformer,
 | 
			
		||||
	csiDriverInformer storageinformers.CSIDriverInformer,
 | 
			
		||||
	cloud cloudprovider.Interface,
 | 
			
		||||
	plugins []volume.VolumePlugin,
 | 
			
		||||
	prober volume.DynamicPluginProber,
 | 
			
		||||
@@ -147,6 +148,11 @@ func NewAttachDetachController(
 | 
			
		||||
		adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		adc.csiDriverLister = csiDriverInformer.Lister()
 | 
			
		||||
		adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -271,6 +277,12 @@ type attachDetachController struct {
 | 
			
		||||
	csiNodeLister storagelisters.CSINodeLister
 | 
			
		||||
	csiNodeSynced kcache.InformerSynced
 | 
			
		||||
 | 
			
		||||
	// csiDriverLister is the shared CSIDriver lister used to fetch and store
 | 
			
		||||
	// CSIDriver objects from the API server. It is shared with other controllers
 | 
			
		||||
	// and therefore the CSIDriver objects in its store should be treated as immutable.
 | 
			
		||||
	csiDriverLister  storagelisters.CSIDriverLister
 | 
			
		||||
	csiDriversSynced kcache.InformerSynced
 | 
			
		||||
 | 
			
		||||
	// cloud provider used by volume host
 | 
			
		||||
	cloud cloudprovider.Interface
 | 
			
		||||
 | 
			
		||||
@@ -327,6 +339,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
 | 
			
		||||
	if adc.csiNodeSynced != nil {
 | 
			
		||||
		synced = append(synced, adc.csiNodeSynced)
 | 
			
		||||
	}
 | 
			
		||||
	if adc.csiDriversSynced != nil {
 | 
			
		||||
		synced = append(synced, adc.csiDriversSynced)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
 | 
			
		||||
		return
 | 
			
		||||
@@ -669,6 +684,10 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister
 | 
			
		||||
	return adc.csiNodeLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister {
 | 
			
		||||
	return adc.csiDriverLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (adc *attachDetachController) IsAttachDetachController() bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
@@ -793,3 +812,7 @@ func (adc *attachDetachController) GetSubpather() subpath.Interface {
 | 
			
		||||
	// Subpaths not needed in attachdetach controller
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister {
 | 
			
		||||
	return adc.csiDriverLister
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -45,6 +45,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
 | 
			
		||||
		informerFactory.Core().V1().PersistentVolumeClaims(),
 | 
			
		||||
		informerFactory.Core().V1().PersistentVolumes(),
 | 
			
		||||
		informerFactory.Storage().V1beta1().CSINodes(),
 | 
			
		||||
		informerFactory.Storage().V1beta1().CSIDrivers(),
 | 
			
		||||
		nil, /* cloud */
 | 
			
		||||
		nil, /* plugins */
 | 
			
		||||
		nil, /* prober */
 | 
			
		||||
@@ -220,6 +221,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
 | 
			
		||||
		informerFactory.Core().V1().PersistentVolumeClaims(),
 | 
			
		||||
		informerFactory.Core().V1().PersistentVolumes(),
 | 
			
		||||
		informerFactory.Storage().V1beta1().CSINodes(),
 | 
			
		||||
		informerFactory.Storage().V1beta1().CSIDrivers(),
 | 
			
		||||
		nil, /* cloud */
 | 
			
		||||
		plugins,
 | 
			
		||||
		prober,
 | 
			
		||||
 
 | 
			
		||||
@@ -130,9 +130,11 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/certificate:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -26,8 +26,12 @@ import (
 | 
			
		||||
	authenticationv1 "k8s.io/api/authentication/v1"
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	storagelisters "k8s.io/client-go/listers/storage/v1beta1"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	cloudprovider "k8s.io/cloud-provider"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
@@ -56,6 +60,24 @@ func NewInitializedVolumePluginMgr(
 | 
			
		||||
	plugins []volume.VolumePlugin,
 | 
			
		||||
	prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
 | 
			
		||||
 | 
			
		||||
	// Initialize csiDriverLister before calling InitPlugins
 | 
			
		||||
	var informerFactory informers.SharedInformerFactory
 | 
			
		||||
	var csiDriverLister storagelisters.CSIDriverLister
 | 
			
		||||
	var csiDriversSynced cache.InformerSynced
 | 
			
		||||
	const resyncPeriod = 0
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		// Don't initialize if kubeClient is nil
 | 
			
		||||
		if kubelet.kubeClient != nil {
 | 
			
		||||
			informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
 | 
			
		||||
			csiDriverInformer := informerFactory.Storage().V1beta1().CSIDrivers()
 | 
			
		||||
			csiDriverLister = csiDriverInformer.Lister()
 | 
			
		||||
			csiDriversSynced = csiDriverInformer.Informer().HasSynced
 | 
			
		||||
 | 
			
		||||
		} else {
 | 
			
		||||
			klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
@@ -67,6 +89,9 @@ func NewInitializedVolumePluginMgr(
 | 
			
		||||
		configMapManager: configMapManager,
 | 
			
		||||
		tokenManager:     tokenManager,
 | 
			
		||||
		mountPodManager:  mountPodManager,
 | 
			
		||||
		informerFactory:  informerFactory,
 | 
			
		||||
		csiDriverLister:  csiDriverLister,
 | 
			
		||||
		csiDriversSynced: csiDriversSynced,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
 | 
			
		||||
@@ -93,6 +118,9 @@ type kubeletVolumeHost struct {
 | 
			
		||||
	tokenManager     *token.Manager
 | 
			
		||||
	configMapManager configmap.Manager
 | 
			
		||||
	mountPodManager  mountpod.Manager
 | 
			
		||||
	informerFactory  informers.SharedInformerFactory
 | 
			
		||||
	csiDriverLister  storagelisters.CSIDriverLister
 | 
			
		||||
	csiDriversSynced cache.InformerSynced
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
 | 
			
		||||
@@ -131,6 +159,34 @@ func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
 | 
			
		||||
	return kvh.kubelet.subpather
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
 | 
			
		||||
	return kvh.informerFactory
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
 | 
			
		||||
	return kvh.csiDriverLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
 | 
			
		||||
	return kvh.csiDriversSynced
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
 | 
			
		||||
func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
 | 
			
		||||
	if kvh.csiDriversSynced == nil {
 | 
			
		||||
		klog.Error("csiDriversSynced not found on KubeletVolumeHost")
 | 
			
		||||
		return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	synced := []cache.InformerSynced{kvh.csiDriversSynced}
 | 
			
		||||
	if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
 | 
			
		||||
		klog.Warning("failed to wait for cache sync for CSIDriverLister")
 | 
			
		||||
		return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kvh *kubeletVolumeHost) NewWrapperMounter(
 | 
			
		||||
	volName string,
 | 
			
		||||
	spec volume.Spec,
 | 
			
		||||
 
 | 
			
		||||
@@ -245,6 +245,11 @@ func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str
 | 
			
		||||
 | 
			
		||||
	metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
 | 
			
		||||
 | 
			
		||||
	if vm.kubeClient != nil {
 | 
			
		||||
		// start informer for CSIDriver
 | 
			
		||||
		vm.volumePluginMgr.Run(stopCh)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	<-stopCh
 | 
			
		||||
	klog.Infof("Shutting down Kubelet Volume Manager")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,6 +18,7 @@ go_library(
 | 
			
		||||
    importpath = "k8s.io/kubernetes/pkg/volume",
 | 
			
		||||
    visibility = ["//visibility:public"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/features:go_default_library",
 | 
			
		||||
        "//pkg/util/mount:go_default_library",
 | 
			
		||||
        "//pkg/volume/util/fs:go_default_library",
 | 
			
		||||
        "//pkg/volume/util/recyclerclient:go_default_library",
 | 
			
		||||
@@ -29,8 +30,11 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/cloud-provider:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/klog:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -31,8 +31,6 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
 | 
			
		||||
@@ -73,6 +71,7 @@ go_test(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/testing:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -33,6 +33,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	fakeclient "k8s.io/client-go/kubernetes/fake"
 | 
			
		||||
	core "k8s.io/client-go/testing"
 | 
			
		||||
@@ -1433,11 +1434,19 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
 | 
			
		||||
	}
 | 
			
		||||
	fakeWatcher := watch.NewRaceFreeFake()
 | 
			
		||||
	fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
 | 
			
		||||
 | 
			
		||||
	// Start informer for CSIDrivers.
 | 
			
		||||
	factory := informers.NewSharedInformerFactory(fakeClient, csiResyncPeriod)
 | 
			
		||||
	csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
 | 
			
		||||
	csiDriverLister := csiDriverInformer.Lister()
 | 
			
		||||
	go factory.Start(wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	host := volumetest.NewFakeVolumeHostWithCSINodeName(
 | 
			
		||||
		tmpDir,
 | 
			
		||||
		fakeClient,
 | 
			
		||||
		nil,
 | 
			
		||||
		"node",
 | 
			
		||||
		csiDriverLister,
 | 
			
		||||
	)
 | 
			
		||||
	plugMgr := &volume.VolumePluginMgr{}
 | 
			
		||||
	plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
 | 
			
		||||
@@ -1455,7 +1464,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		// Wait until the informer in CSI volume plugin has all CSIDrivers.
 | 
			
		||||
		wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
 | 
			
		||||
			return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
 | 
			
		||||
			return csiDriverInformer.Informer().HasSynced(), nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -216,6 +216,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
 | 
			
		||||
		fakeClient,
 | 
			
		||||
		nil,
 | 
			
		||||
		"fakeNode",
 | 
			
		||||
		nil,
 | 
			
		||||
	)
 | 
			
		||||
	plug.host = host
 | 
			
		||||
 | 
			
		||||
@@ -282,6 +283,7 @@ func TestBlockMapperMapDevice(t *testing.T) {
 | 
			
		||||
		fakeClient,
 | 
			
		||||
		nil,
 | 
			
		||||
		"fakeNode",
 | 
			
		||||
		nil,
 | 
			
		||||
	)
 | 
			
		||||
	plug.host = host
 | 
			
		||||
 | 
			
		||||
@@ -364,6 +366,7 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
 | 
			
		||||
		fakeClient,
 | 
			
		||||
		nil,
 | 
			
		||||
		"fakeNode",
 | 
			
		||||
		nil,
 | 
			
		||||
	)
 | 
			
		||||
	plug.host = host
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -19,7 +19,6 @@ package csi
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"crypto/sha256"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path"
 | 
			
		||||
@@ -292,8 +291,14 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) {
 | 
			
		||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost)
 | 
			
		||||
	if ok {
 | 
			
		||||
		kletHost.WaitForCacheSync()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if c.plugin.csiDriverLister == nil {
 | 
			
		||||
		return nil, errors.New("CSIDriver lister does not exist")
 | 
			
		||||
		return nil, fmt.Errorf("CSIDriverLister not found")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
 | 
			
		||||
 
 | 
			
		||||
@@ -33,7 +33,6 @@ import (
 | 
			
		||||
	storagev1beta1 "k8s.io/api/storage/v1beta1"
 | 
			
		||||
	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
 | 
			
		||||
	fakeclient "k8s.io/client-go/kubernetes/fake"
 | 
			
		||||
@@ -155,13 +154,6 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
 | 
			
		||||
			plug, tmpDir := newTestPlugin(t, fakeClient)
 | 
			
		||||
			defer os.RemoveAll(tmpDir)
 | 
			
		||||
 | 
			
		||||
			if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
				// Wait until the informer in CSI volume plugin has all CSIDrivers.
 | 
			
		||||
				wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
 | 
			
		||||
					return plug.csiDriverInformer.Informer().HasSynced(), nil
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			registerFakePlugin(test.driver, "endpoint", []string{"1.0.0"}, t)
 | 
			
		||||
			pv := makeTestPV("test-pv", 10, test.driver, testVol)
 | 
			
		||||
			pv.Spec.CSI.VolumeAttributes = test.volumeContext
 | 
			
		||||
@@ -391,6 +383,7 @@ func TestMounterSetUpSimple(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMounterSetUpWithInline(t *testing.T) {
 | 
			
		||||
	defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
 | 
			
		||||
 | 
			
		||||
@@ -527,6 +520,7 @@ func TestMounterSetUpWithInline(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMounterSetUpWithFSGroup(t *testing.T) {
 | 
			
		||||
	fakeClient := fakeclient.NewSimpleClientset()
 | 
			
		||||
	plug, tmpDir := newTestPlugin(t, fakeClient)
 | 
			
		||||
 
 | 
			
		||||
@@ -37,10 +37,8 @@ import (
 | 
			
		||||
	utilversion "k8s.io/apimachinery/pkg/util/version"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	csiapiinformer "k8s.io/client-go/informers"
 | 
			
		||||
	csiinformer "k8s.io/client-go/informers/storage/v1beta1"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	csilister "k8s.io/client-go/listers/storage/v1beta1"
 | 
			
		||||
	storagelisters "k8s.io/client-go/listers/storage/v1beta1"
 | 
			
		||||
	csitranslationplugins "k8s.io/csi-translation-lib/plugins"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
@@ -70,8 +68,7 @@ var deprecatedSocketDirVersions = []string{"0.1.0", "0.2.0", "0.3.0", "0.4.0"}
 | 
			
		||||
type csiPlugin struct {
 | 
			
		||||
	host            volume.VolumeHost
 | 
			
		||||
	blockEnabled    bool
 | 
			
		||||
	csiDriverLister   csilister.CSIDriverLister
 | 
			
		||||
	csiDriverInformer csiinformer.CSIDriverInformer
 | 
			
		||||
	csiDriverLister storagelisters.CSIDriverLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//TODO (vladimirvivien) add this type to storage api
 | 
			
		||||
@@ -217,11 +214,21 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
 | 
			
		||||
		if csiClient == nil {
 | 
			
		||||
			klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
 | 
			
		||||
		} else {
 | 
			
		||||
			// Start informer for CSIDrivers.
 | 
			
		||||
			factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
 | 
			
		||||
			p.csiDriverInformer = factory.Storage().V1beta1().CSIDrivers()
 | 
			
		||||
			p.csiDriverLister = p.csiDriverInformer.Lister()
 | 
			
		||||
			go factory.Start(wait.NeverStop)
 | 
			
		||||
			// set CSIDriverLister
 | 
			
		||||
			adcHost, ok := host.(volume.AttachDetachVolumeHost)
 | 
			
		||||
			if ok {
 | 
			
		||||
				p.csiDriverLister = adcHost.CSIDriverLister()
 | 
			
		||||
				if p.csiDriverLister == nil {
 | 
			
		||||
					klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			kletHost, ok := host.(volume.KubeletVolumeHost)
 | 
			
		||||
			if ok {
 | 
			
		||||
				p.csiDriverLister = kletHost.CSIDriverLister()
 | 
			
		||||
				if p.csiDriverLister == nil {
 | 
			
		||||
					klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -757,6 +764,12 @@ func (p *csiPlugin) skipAttach(driver string) (bool, error) {
 | 
			
		||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	kletHost, ok := p.host.(volume.KubeletVolumeHost)
 | 
			
		||||
	if ok {
 | 
			
		||||
		kletHost.WaitForCacheSync()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if p.csiDriverLister == nil {
 | 
			
		||||
		return false, errors.New("CSIDriver lister does not exist")
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -31,6 +31,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	fakeclient "k8s.io/client-go/kubernetes/fake"
 | 
			
		||||
	utiltesting "k8s.io/client-go/util/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
@@ -48,11 +49,19 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
 | 
			
		||||
	if client == nil {
 | 
			
		||||
		client = fakeclient.NewSimpleClientset()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Start informer for CSIDrivers.
 | 
			
		||||
	factory := informers.NewSharedInformerFactory(client, csiResyncPeriod)
 | 
			
		||||
	csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
 | 
			
		||||
	csiDriverLister := csiDriverInformer.Lister()
 | 
			
		||||
	go factory.Start(wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	host := volumetest.NewFakeVolumeHostWithCSINodeName(
 | 
			
		||||
		tmpDir,
 | 
			
		||||
		client,
 | 
			
		||||
		nil,
 | 
			
		||||
		"fakeNode",
 | 
			
		||||
		csiDriverLister,
 | 
			
		||||
	)
 | 
			
		||||
	plugMgr := &volume.VolumePluginMgr{}
 | 
			
		||||
	plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
 | 
			
		||||
@@ -70,7 +79,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		// Wait until the informer in CSI volume plugin has all CSIDrivers.
 | 
			
		||||
		wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
 | 
			
		||||
			return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
 | 
			
		||||
			return csiDriverInformer.Informer().HasSynced(), nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -960,6 +960,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) {
 | 
			
		||||
			client,
 | 
			
		||||
			nil,
 | 
			
		||||
			nodeName,
 | 
			
		||||
			nil,
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
 | 
			
		||||
@@ -1021,6 +1022,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
 | 
			
		||||
			client,
 | 
			
		||||
			nil,
 | 
			
		||||
			nodeName,
 | 
			
		||||
			nil,
 | 
			
		||||
		)
 | 
			
		||||
		nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -29,11 +29,15 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	utilerrors "k8s.io/apimachinery/pkg/util/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/validation"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	storagelisters "k8s.io/client-go/listers/storage/v1beta1"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	cloudprovider "k8s.io/cloud-provider"
 | 
			
		||||
	"k8s.io/klog"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/mount"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util/subpath"
 | 
			
		||||
@@ -321,6 +325,15 @@ type KubeletVolumeHost interface {
 | 
			
		||||
	// SetKubeletError lets plugins set an error on the Kubelet runtime status
 | 
			
		||||
	// that will cause the Kubelet to post NotReady status with the error message provided
 | 
			
		||||
	SetKubeletError(err error)
 | 
			
		||||
 | 
			
		||||
	// GetInformerFactory returns the informer factory for CSIDriverLister
 | 
			
		||||
	GetInformerFactory() informers.SharedInformerFactory
 | 
			
		||||
	// CSIDriverLister returns the informer lister for the CSIDriver API Object
 | 
			
		||||
	CSIDriverLister() storagelisters.CSIDriverLister
 | 
			
		||||
	// CSIDriverSynced returns the informer synced for the CSIDriver API Object
 | 
			
		||||
	CSIDriversSynced() cache.InformerSynced
 | 
			
		||||
	// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
 | 
			
		||||
	WaitForCacheSync() error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use
 | 
			
		||||
@@ -329,6 +342,9 @@ type AttachDetachVolumeHost interface {
 | 
			
		||||
	// CSINodeLister returns the informer lister for the CSINode API Object
 | 
			
		||||
	CSINodeLister() storagelisters.CSINodeLister
 | 
			
		||||
 | 
			
		||||
	// CSIDriverLister returns the informer lister for the CSIDriver API Object
 | 
			
		||||
	CSIDriverLister() storagelisters.CSIDriverLister
 | 
			
		||||
 | 
			
		||||
	// IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
 | 
			
		||||
	// to the attachDetachController
 | 
			
		||||
	IsAttachDetachController() bool
 | 
			
		||||
@@ -1010,6 +1026,17 @@ func (pm *VolumePluginMgr) FindNodeExpandablePluginByName(name string) (NodeExpa
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pm *VolumePluginMgr) Run(stopCh <-chan struct{}) {
 | 
			
		||||
	kletHost, ok := pm.Host.(KubeletVolumeHost)
 | 
			
		||||
	if ok {
 | 
			
		||||
		// start informer for CSIDriver
 | 
			
		||||
		if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
			informerFactory := kletHost.GetInformerFactory()
 | 
			
		||||
			go informerFactory.Start(stopCh)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler
 | 
			
		||||
// pod.  By default, a recycler pod simply runs "rm -rf" on a volume and tests
 | 
			
		||||
// for emptiness.  Most attributes of the template will be correct for most
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/testing:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/cloud-provider:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -34,6 +34,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	storagelisters "k8s.io/client-go/listers/storage/v1beta1"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	utiltesting "k8s.io/client-go/util/testing"
 | 
			
		||||
	cloudprovider "k8s.io/cloud-provider"
 | 
			
		||||
@@ -71,8 +72,12 @@ type fakeVolumeHost struct {
 | 
			
		||||
	nodeLabels      map[string]string
 | 
			
		||||
	nodeName        string
 | 
			
		||||
	subpather       subpath.Interface
 | 
			
		||||
	csiDriverLister storagelisters.CSIDriverLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ VolumeHost = &fakeVolumeHost{}
 | 
			
		||||
var _ AttachDetachVolumeHost = &fakeVolumeHost{}
 | 
			
		||||
 | 
			
		||||
func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
 | 
			
		||||
	return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil)
 | 
			
		||||
}
 | 
			
		||||
@@ -87,9 +92,12 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf
 | 
			
		||||
	return volHost
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost {
 | 
			
		||||
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost {
 | 
			
		||||
	volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil)
 | 
			
		||||
	volHost.nodeName = nodeName
 | 
			
		||||
	if driverLister != nil {
 | 
			
		||||
		volHost.csiDriverLister = driverLister
 | 
			
		||||
	}
 | 
			
		||||
	return volHost
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -1477,3 +1485,16 @@ func ContainsAccessMode(modes []v1.PersistentVolumeAccessMode, mode v1.Persisten
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
 | 
			
		||||
	return f.csiDriverLister
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeVolumeHost) CSINodeLister() storagelisters.CSINodeLister {
 | 
			
		||||
	// not needed for testing
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeVolumeHost) IsAttachDetachController() bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -21,6 +21,7 @@ go_test(
 | 
			
		||||
        "//pkg/controller/volume/attachdetach/cache:go_default_library",
 | 
			
		||||
        "//pkg/controller/volume/persistentvolume:go_default_library",
 | 
			
		||||
        "//pkg/controller/volume/persistentvolume/options:go_default_library",
 | 
			
		||||
        "//pkg/features:go_default_library",
 | 
			
		||||
        "//pkg/volume:go_default_library",
 | 
			
		||||
        "//pkg/volume/testing:go_default_library",
 | 
			
		||||
        "//pkg/volume/util:go_default_library",
 | 
			
		||||
@@ -31,6 +32,7 @@ go_test(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/rest:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -27,7 +27,8 @@ import (
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	clientgoinformers "k8s.io/client-go/informers"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	restclient "k8s.io/client-go/rest"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
@@ -36,6 +37,7 @@ import (
 | 
			
		||||
	volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
 | 
			
		||||
	persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
	volumetest "k8s.io/kubernetes/pkg/volume/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util"
 | 
			
		||||
@@ -179,6 +181,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
 | 
			
		||||
	go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
 | 
			
		||||
	initCSIObjects(stopCh, informers)
 | 
			
		||||
	go ctrl.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	waitToObservePods(t, podInformer, 1)
 | 
			
		||||
@@ -207,6 +210,16 @@ func TestPodDeletionWithDswp(t *testing.T) {
 | 
			
		||||
	close(stopCh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) {
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
 | 
			
		||||
		utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
 | 
			
		||||
		go informers.Storage().V1beta1().CSINodes().Informer().Run(stopCh)
 | 
			
		||||
	}
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
 | 
			
		||||
		go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPodUpdateWithWithADC(t *testing.T) {
 | 
			
		||||
	_, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
 | 
			
		||||
	defer closeFn()
 | 
			
		||||
@@ -246,6 +259,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
 | 
			
		||||
	go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
 | 
			
		||||
	initCSIObjects(stopCh, informers)
 | 
			
		||||
	go ctrl.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	waitToObservePods(t, podInformer, 1)
 | 
			
		||||
@@ -314,6 +328,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
 | 
			
		||||
	go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
 | 
			
		||||
	initCSIObjects(stopCh, informers)
 | 
			
		||||
	go ctrl.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	waitToObservePods(t, podInformer, 1)
 | 
			
		||||
@@ -383,7 +398,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, informers.SharedInformerFactory) {
 | 
			
		||||
func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
 | 
			
		||||
	config := restclient.Config{
 | 
			
		||||
		Host:          server.URL,
 | 
			
		||||
		ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
 | 
			
		||||
@@ -408,7 +423,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
 | 
			
		||||
	}
 | 
			
		||||
	plugins := []volume.VolumePlugin{plugin}
 | 
			
		||||
	cloud := &fakecloud.FakeCloud{}
 | 
			
		||||
	informers := informers.NewSharedInformerFactory(testClient, resyncPeriod)
 | 
			
		||||
	informers := clientgoinformers.NewSharedInformerFactory(testClient, resyncPeriod)
 | 
			
		||||
	ctrl, err := attachdetach.NewAttachDetachController(
 | 
			
		||||
		testClient,
 | 
			
		||||
		informers.Core().V1().Pods(),
 | 
			
		||||
@@ -416,6 +431,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
 | 
			
		||||
		informers.Core().V1().PersistentVolumeClaims(),
 | 
			
		||||
		informers.Core().V1().PersistentVolumes(),
 | 
			
		||||
		informers.Storage().V1beta1().CSINodes(),
 | 
			
		||||
		informers.Storage().V1beta1().CSIDrivers(),
 | 
			
		||||
		cloud,
 | 
			
		||||
		plugins,
 | 
			
		||||
		nil, /* prober */
 | 
			
		||||
@@ -491,6 +507,7 @@ func TestPodAddedByDswp(t *testing.T) {
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
 | 
			
		||||
	go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
 | 
			
		||||
	initCSIObjects(stopCh, informers)
 | 
			
		||||
	go ctrl.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	waitToObservePods(t, podInformer, 1)
 | 
			
		||||
@@ -576,6 +593,7 @@ func TestPVCBoundWithADC(t *testing.T) {
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	informers.Start(stopCh)
 | 
			
		||||
	informers.WaitForCacheSync(stopCh)
 | 
			
		||||
	initCSIObjects(stopCh, informers)
 | 
			
		||||
	go ctrl.Run(stopCh)
 | 
			
		||||
	go pvCtrl.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user