mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Refactor persistent volume initialization
There should be only one initialization function, shared by the real controller and unit tests.
This commit is contained in:
		@@ -385,6 +385,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
 | 
			
		||||
		ProbeRecyclableVolumePlugins(s.VolumeConfiguration),
 | 
			
		||||
		cloud,
 | 
			
		||||
		s.ClusterName,
 | 
			
		||||
		nil, nil, nil,
 | 
			
		||||
	)
 | 
			
		||||
	volumeController.Run()
 | 
			
		||||
	time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
 | 
			
		||||
 
 | 
			
		||||
@@ -283,6 +283,9 @@ func (s *CMServer) Run(_ []string) error {
 | 
			
		||||
		kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfiguration),
 | 
			
		||||
		cloud,
 | 
			
		||||
		s.ClusterName,
 | 
			
		||||
		nil,
 | 
			
		||||
		nil,
 | 
			
		||||
		nil,
 | 
			
		||||
	)
 | 
			
		||||
	volumeController.Run()
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -46,15 +46,20 @@ func NewPersistentVolumeController(
 | 
			
		||||
	provisioner vol.ProvisionableVolumePlugin,
 | 
			
		||||
	recyclers []vol.VolumePlugin,
 | 
			
		||||
	cloud cloudprovider.Interface,
 | 
			
		||||
	clusterName string) *PersistentVolumeController {
 | 
			
		||||
	clusterName string,
 | 
			
		||||
	volumeSource, claimSource cache.ListerWatcher,
 | 
			
		||||
	eventRecorder record.EventRecorder,
 | 
			
		||||
) *PersistentVolumeController {
 | 
			
		||||
 | 
			
		||||
	broadcaster := record.NewBroadcaster()
 | 
			
		||||
	broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
 | 
			
		||||
	recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
 | 
			
		||||
	if eventRecorder == nil {
 | 
			
		||||
		broadcaster := record.NewBroadcaster()
 | 
			
		||||
		broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
 | 
			
		||||
		eventRecorder = broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	controller := &PersistentVolumeController{
 | 
			
		||||
		kubeClient:                    kubeClient,
 | 
			
		||||
		eventRecorder:                 recorder,
 | 
			
		||||
		eventRecorder:                 eventRecorder,
 | 
			
		||||
		runningOperations:             make(map[string]bool),
 | 
			
		||||
		cloud:                         cloud,
 | 
			
		||||
		provisioner:                   provisioner,
 | 
			
		||||
@@ -62,6 +67,7 @@ func NewPersistentVolumeController(
 | 
			
		||||
		createProvisionedPVRetryCount: createProvisionedPVRetryCount,
 | 
			
		||||
		createProvisionedPVInterval:   createProvisionedPVInterval,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	controller.recyclePluginMgr.InitPlugins(recyclers, controller)
 | 
			
		||||
	if controller.provisioner != nil {
 | 
			
		||||
		if err := controller.provisioner.Init(controller); err != nil {
 | 
			
		||||
@@ -69,56 +75,50 @@ func NewPersistentVolumeController(
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	volumeSource := &cache.ListWatch{
 | 
			
		||||
		ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
			return kubeClient.Core().PersistentVolumes().List(options)
 | 
			
		||||
		},
 | 
			
		||||
		WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
			return kubeClient.Core().PersistentVolumes().Watch(options)
 | 
			
		||||
		},
 | 
			
		||||
	if volumeSource == nil {
 | 
			
		||||
		volumeSource = &cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				return kubeClient.Core().PersistentVolumes().List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return kubeClient.Core().PersistentVolumes().Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	claimSource := &cache.ListWatch{
 | 
			
		||||
		ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
			return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
 | 
			
		||||
		},
 | 
			
		||||
		WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
			return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
 | 
			
		||||
		},
 | 
			
		||||
	if claimSource == nil {
 | 
			
		||||
		claimSource = &cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	controller.initializeController(syncPeriod, volumeSource, claimSource)
 | 
			
		||||
 | 
			
		||||
	return controller
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// initializeController prepares watching for PersistentVolume and
 | 
			
		||||
// PersistentVolumeClaim events from given sources. This should be used to
 | 
			
		||||
// initialize the controller for real operation (with real event sources) and
 | 
			
		||||
// also during testing (with fake ones).
 | 
			
		||||
func (ctrl *PersistentVolumeController) initializeController(syncPeriod time.Duration, volumeSource, claimSource cache.ListerWatcher) {
 | 
			
		||||
	glog.V(4).Infof("initializing PersistentVolumeController, sync every %s", syncPeriod.String())
 | 
			
		||||
	ctrl.volumes.store, ctrl.volumeController = framework.NewIndexerInformer(
 | 
			
		||||
	controller.volumes.store, controller.volumeController = framework.NewIndexerInformer(
 | 
			
		||||
		volumeSource,
 | 
			
		||||
		&api.PersistentVolume{},
 | 
			
		||||
		syncPeriod,
 | 
			
		||||
		framework.ResourceEventHandlerFuncs{
 | 
			
		||||
			AddFunc:    ctrl.addVolume,
 | 
			
		||||
			UpdateFunc: ctrl.updateVolume,
 | 
			
		||||
			DeleteFunc: ctrl.deleteVolume,
 | 
			
		||||
			AddFunc:    controller.addVolume,
 | 
			
		||||
			UpdateFunc: controller.updateVolume,
 | 
			
		||||
			DeleteFunc: controller.deleteVolume,
 | 
			
		||||
		},
 | 
			
		||||
		cache.Indexers{"accessmodes": accessModesIndexFunc},
 | 
			
		||||
	)
 | 
			
		||||
	ctrl.claims, ctrl.claimController = framework.NewInformer(
 | 
			
		||||
	controller.claims, controller.claimController = framework.NewInformer(
 | 
			
		||||
		claimSource,
 | 
			
		||||
		&api.PersistentVolumeClaim{},
 | 
			
		||||
		syncPeriod,
 | 
			
		||||
		framework.ResourceEventHandlerFuncs{
 | 
			
		||||
			AddFunc:    ctrl.addClaim,
 | 
			
		||||
			UpdateFunc: ctrl.updateClaim,
 | 
			
		||||
			DeleteFunc: ctrl.deleteClaim,
 | 
			
		||||
			AddFunc:    controller.addClaim,
 | 
			
		||||
			UpdateFunc: controller.updateClaim,
 | 
			
		||||
			DeleteFunc: controller.deleteClaim,
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	return controller
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// addVolume is callback from framework.Controller watching PersistentVolume
 | 
			
		||||
 
 | 
			
		||||
@@ -128,7 +128,7 @@ func TestControllerSync(t *testing.T) {
 | 
			
		||||
		client := &fake.Clientset{}
 | 
			
		||||
		volumeSource := framework.NewFakeControllerSource()
 | 
			
		||||
		claimSource := framework.NewFakeControllerSource()
 | 
			
		||||
		ctrl := newPersistentVolumeController(client, volumeSource, claimSource)
 | 
			
		||||
		ctrl := newTestController(client, volumeSource, claimSource)
 | 
			
		||||
		reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors)
 | 
			
		||||
		for _, claim := range test.initialClaims {
 | 
			
		||||
			claimSource.Add(claim)
 | 
			
		||||
 
 | 
			
		||||
@@ -484,27 +484,27 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController,
 | 
			
		||||
	return reactor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newPersistentVolumeController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher) *PersistentVolumeController {
 | 
			
		||||
	ctrl := &PersistentVolumeController{
 | 
			
		||||
		volumes:           newPersistentVolumeOrderedIndex(),
 | 
			
		||||
		claims:            cache.NewStore(cache.MetaNamespaceKeyFunc),
 | 
			
		||||
		kubeClient:        kubeClient,
 | 
			
		||||
		eventRecorder:     record.NewFakeRecorder(1000),
 | 
			
		||||
		runningOperations: make(map[string]bool),
 | 
			
		||||
 | 
			
		||||
		// Speed up the testing
 | 
			
		||||
		createProvisionedPVRetryCount: createProvisionedPVRetryCount,
 | 
			
		||||
		createProvisionedPVInterval:   5 * time.Millisecond,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create dummy volume/claim sources for controller watchers when needed
 | 
			
		||||
func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher) *PersistentVolumeController {
 | 
			
		||||
	if volumeSource == nil {
 | 
			
		||||
		volumeSource = framework.NewFakeControllerSource()
 | 
			
		||||
	}
 | 
			
		||||
	if claimSource == nil {
 | 
			
		||||
		claimSource = framework.NewFakeControllerSource()
 | 
			
		||||
	}
 | 
			
		||||
	ctrl.initializeController(5*time.Second, volumeSource, claimSource)
 | 
			
		||||
	ctrl := NewPersistentVolumeController(
 | 
			
		||||
		kubeClient,
 | 
			
		||||
		5*time.Second,        // sync period
 | 
			
		||||
		nil,                  // provisioner
 | 
			
		||||
		[]vol.VolumePlugin{}, // recyclers
 | 
			
		||||
		nil,                  // cloud
 | 
			
		||||
		"",
 | 
			
		||||
		volumeSource,
 | 
			
		||||
		claimSource,
 | 
			
		||||
		record.NewFakeRecorder(1000), // event recorder
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Speed up the test
 | 
			
		||||
	ctrl.createProvisionedPVInterval = 5 * time.Millisecond
 | 
			
		||||
	return ctrl
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -732,7 +732,7 @@ func runSyncTests(t *testing.T, tests []controllerTest) {
 | 
			
		||||
 | 
			
		||||
		// Initialize the controller
 | 
			
		||||
		client := &fake.Clientset{}
 | 
			
		||||
		ctrl := newPersistentVolumeController(client, nil, nil)
 | 
			
		||||
		ctrl := newTestController(client, nil, nil)
 | 
			
		||||
		reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
 | 
			
		||||
		for _, claim := range test.initialClaims {
 | 
			
		||||
			ctrl.claims.Add(claim)
 | 
			
		||||
@@ -776,7 +776,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest) {
 | 
			
		||||
 | 
			
		||||
		// Initialize the controller
 | 
			
		||||
		client := &fake.Clientset{}
 | 
			
		||||
		ctrl := newPersistentVolumeController(client, nil, nil)
 | 
			
		||||
		ctrl := newTestController(client, nil, nil)
 | 
			
		||||
		reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
 | 
			
		||||
		for _, claim := range test.initialClaims {
 | 
			
		||||
			ctrl.claims.Add(claim)
 | 
			
		||||
 
 | 
			
		||||
@@ -55,7 +55,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
 | 
			
		||||
	plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0, nil, nil, nil, nil}}
 | 
			
		||||
	cloud := &fake_cloud.FakeCloud{}
 | 
			
		||||
 | 
			
		||||
	ctrl := persistentvolumecontroller.NewPersistentVolumeController(testClient, 10*time.Second, nil, plugins, cloud, "")
 | 
			
		||||
	ctrl := persistentvolumecontroller.NewPersistentVolumeController(testClient, 10*time.Second, nil, plugins, cloud, "", nil, nil, nil)
 | 
			
		||||
	ctrl.Run()
 | 
			
		||||
	defer ctrl.Stop()
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user