mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Add CSINode initialization for CSIMigration on node startup before pod ready
This commit is contained in:
		| @@ -544,7 +544,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { | ||||
| 		nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), | ||||
| 		nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), | ||||
| 		nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), | ||||
| 		nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), | ||||
| 		nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), | ||||
| 		nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), | ||||
| 		nodestatus.RemoveOutOfDiskCondition(), | ||||
| 		// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event | ||||
|   | ||||
| @@ -440,6 +440,7 @@ func ReadyCondition( | ||||
| 	nowFunc func() time.Time, // typically Kubelet.clock.Now | ||||
| 	runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors | ||||
| 	networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors | ||||
| 	storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors | ||||
| 	appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator | ||||
| 	cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status | ||||
| 	recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent | ||||
| @@ -456,7 +457,7 @@ func ReadyCondition( | ||||
| 			Message:           "kubelet is posting ready status", | ||||
| 			LastHeartbeatTime: currentTime, | ||||
| 		} | ||||
| 		errs := []error{runtimeErrorsFunc(), networkErrorsFunc()} | ||||
| 		errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc()} | ||||
| 		requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods} | ||||
| 		if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { | ||||
| 			requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage) | ||||
|   | ||||
| @@ -895,6 +895,7 @@ func TestReadyCondition(t *testing.T) { | ||||
| 		node                     *v1.Node | ||||
| 		runtimeErrors            error | ||||
| 		networkErrors            error | ||||
| 		storageErrors            error | ||||
| 		appArmorValidateHostFunc func() error | ||||
| 		cmStatus                 cm.Status | ||||
| 		expectConditions         []v1.NodeCondition | ||||
| @@ -929,6 +930,12 @@ func TestReadyCondition(t *testing.T) { | ||||
| 			}, | ||||
| 			expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			desc:             "new, not ready: storage errors", | ||||
| 			node:             withCapacity.DeepCopy(), | ||||
| 			storageErrors:    errors.New("some storage error"), | ||||
| 			expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "some storage error", now, now)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			desc:             "new, not ready: runtime and network errors", | ||||
| 			node:             withCapacity.DeepCopy(), | ||||
| @@ -1003,6 +1010,9 @@ func TestReadyCondition(t *testing.T) { | ||||
| 			networkErrorsFunc := func() error { | ||||
| 				return tc.networkErrors | ||||
| 			} | ||||
| 			storageErrorsFunc := func() error { | ||||
| 				return tc.storageErrors | ||||
| 			} | ||||
| 			cmStatusFunc := func() cm.Status { | ||||
| 				return tc.cmStatus | ||||
| 			} | ||||
| @@ -1014,7 +1024,7 @@ func TestReadyCondition(t *testing.T) { | ||||
| 				}) | ||||
| 			} | ||||
| 			// construct setter | ||||
| 			setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) | ||||
| 			setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc) | ||||
| 			// call setter on node | ||||
| 			if err := setter(tc.node); err != nil { | ||||
| 				t.Fatalf("unexpected error: %v", err) | ||||
|   | ||||
| @@ -30,6 +30,7 @@ type runtimeState struct { | ||||
| 	lastBaseRuntimeSync      time.Time | ||||
| 	baseRuntimeSyncThreshold time.Duration | ||||
| 	networkError             error | ||||
| 	storageError             error | ||||
| 	cidr                     string | ||||
| 	healthChecks             []*healthCheck | ||||
| } | ||||
| @@ -61,6 +62,12 @@ func (s *runtimeState) setNetworkState(err error) { | ||||
| 	s.networkError = err | ||||
| } | ||||
|  | ||||
| func (s *runtimeState) setStorageState(err error) { | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
| 	s.storageError = err | ||||
| } | ||||
|  | ||||
| func (s *runtimeState) setPodCIDR(cidr string) { | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
| @@ -101,6 +108,16 @@ func (s *runtimeState) networkErrors() error { | ||||
| 	return utilerrors.NewAggregate(errs) | ||||
| } | ||||
|  | ||||
| func (s *runtimeState) storageErrors() error { | ||||
| 	s.RLock() | ||||
| 	defer s.RUnlock() | ||||
| 	errs := []error{} | ||||
| 	if s.storageError != nil { | ||||
| 		errs = append(errs, s.storageError) | ||||
| 	} | ||||
| 	return utilerrors.NewAggregate(errs) | ||||
| } | ||||
|  | ||||
| func newRuntimeState( | ||||
| 	runtimeSyncThreshold time.Duration, | ||||
| ) *runtimeState { | ||||
|   | ||||
| @@ -80,6 +80,7 @@ func NewInitializedVolumePluginMgr( | ||||
|  | ||||
| // Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface | ||||
| var _ volume.VolumeHost = &kubeletVolumeHost{} | ||||
| var _ volume.KubeletVolumeHost = &kubeletVolumeHost{} | ||||
|  | ||||
| func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string { | ||||
| 	return kvh.kubelet.getPluginDir(pluginName) | ||||
| @@ -94,6 +95,11 @@ type kubeletVolumeHost struct { | ||||
| 	mountPodManager  mountpod.Manager | ||||
| } | ||||
|  | ||||
| func (kvh *kubeletVolumeHost) SetKubeletError(err error) { | ||||
| 	kvh.kubelet.runtimeState.setStorageState(err) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string { | ||||
| 	return kvh.kubelet.getVolumeDevicePluginDir(pluginName) | ||||
| } | ||||
|   | ||||
| @@ -24,6 +24,7 @@ go_library( | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", | ||||
| @@ -32,6 +33,7 @@ go_library( | ||||
|         "//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/kubernetes:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", | ||||
|         "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", | ||||
|         "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", | ||||
|         "//vendor/google.golang.org/grpc:go_default_library", | ||||
|         "//vendor/k8s.io/klog:go_default_library", | ||||
|   | ||||
| @@ -33,6 +33,7 @@ import ( | ||||
| 	apierrs "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	meta "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	utilversion "k8s.io/apimachinery/pkg/util/version" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| @@ -40,6 +41,7 @@ import ( | ||||
| 	csiinformer "k8s.io/client-go/informers/storage/v1beta1" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	csilister "k8s.io/client-go/listers/storage/v1beta1" | ||||
| 	csitranslationplugins "k8s.io/csi-translation-lib/plugins" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| 	"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager" | ||||
| @@ -216,15 +218,77 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { | ||||
| 		go factory.Start(wait.NeverStop) | ||||
| 	} | ||||
|  | ||||
| 	var migratedPlugins = map[string](func() bool){ | ||||
| 		csitranslationplugins.GCEPDInTreePluginName: func() bool { | ||||
| 			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) | ||||
| 		}, | ||||
| 		// TODO(leakingtpan): Add AWS migration feature gates and place them here | ||||
| 	} | ||||
|  | ||||
| 	// Initializing the label management channels | ||||
| 	nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host) | ||||
| 	nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins) | ||||
|  | ||||
| 	// TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver | ||||
| 	// objects for migrated drivers. | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && | ||||
| 		utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { | ||||
| 		// This function prevents Kubelet from posting Ready status until CSINodeInfo | ||||
| 		// is both installed and initialized | ||||
| 		err := initializeCSINodeInfo(host) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("failed to initialize CSINodeInfo: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func initializeCSINodeInfo(host volume.VolumeHost) error { | ||||
| 	kvh, ok := host.(volume.KubeletVolumeHost) | ||||
| 	if !ok { | ||||
| 		klog.V(4).Infof("Skipping CSINodeInfo initialization, not running on Kubelet") | ||||
| 		return nil | ||||
| 	} | ||||
| 	kubeClient := host.GetKubeClient() | ||||
| 	if kubeClient == nil { | ||||
| 		// Kubelet running in standalone mode. Skip CSINodeInfo initialization | ||||
| 		klog.Warningf("Skipping CSINodeInfo initialization, Kubelet running in standalone mode") | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	kvh.SetKubeletError(fmt.Errorf("CSINodeInfo is not yet intialized")) | ||||
|  | ||||
| 	go func() { | ||||
| 		defer utilruntime.HandleCrash() | ||||
|  | ||||
| 		// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet | ||||
| 		// after max retry steps. | ||||
| 		initBackoff := wait.Backoff{ | ||||
| 			Steps:    6, | ||||
| 			Duration: 15 * time.Millisecond, | ||||
| 			Factor:   6.0, | ||||
| 			Jitter:   0.1, | ||||
| 		} | ||||
| 		err := wait.ExponentialBackoff(initBackoff, func() (bool, error) { | ||||
| 			klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo") | ||||
| 			// TODO(dyzz): Just augment CreateCSINodeInfo to create the annotation on itself. Also update all updating functions to double check that the annotation is correct (yes) | ||||
| 			_, err := nim.CreateCSINode() | ||||
| 			if err != nil { | ||||
| 				kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err)) | ||||
| 				klog.Errorf("Failed to initialize CSINodeInfo: %v", err) | ||||
| 				return false, nil | ||||
| 			} | ||||
|  | ||||
| 			// Successfully initialized drivers, allow Kubelet to post Ready | ||||
| 			kvh.SetKubeletError(nil) | ||||
| 			return true, nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			// Kill the Kubelet process and allow it to restart to retry initialization | ||||
| 			klog.Fatalf("Failed to initialize CSINodeInfo after retrying") | ||||
| 		} | ||||
| 	}() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (p *csiPlugin) GetPluginName() string { | ||||
| 	return CSIPluginName | ||||
| } | ||||
|   | ||||
| @@ -21,6 +21,7 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
|  | ||||
| 	"time" | ||||
|  | ||||
| @@ -62,6 +63,7 @@ var ( | ||||
| type nodeInfoManager struct { | ||||
| 	nodeName        types.NodeName | ||||
| 	volumeHost      volume.VolumeHost | ||||
| 	migratedPlugins map[string](func() bool) | ||||
| } | ||||
|  | ||||
| // If no updates is needed, the function must return the same Node object as the input. | ||||
| @@ -85,10 +87,12 @@ type Interface interface { | ||||
| // NewNodeInfoManager initializes nodeInfoManager | ||||
| func NewNodeInfoManager( | ||||
| 	nodeName types.NodeName, | ||||
| 	volumeHost volume.VolumeHost) Interface { | ||||
| 	volumeHost volume.VolumeHost, | ||||
| 	migratedPlugins map[string](func() bool)) Interface { | ||||
| 	return &nodeInfoManager{ | ||||
| 		nodeName:        nodeName, | ||||
| 		volumeHost:      volumeHost, | ||||
| 		migratedPlugins: migratedPlugins, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -418,9 +422,51 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	setMigrationAnnotation(nim.migratedPlugins, nodeInfo) | ||||
|  | ||||
| 	return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo) | ||||
| } | ||||
|  | ||||
| const ( | ||||
| 	migratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins" | ||||
| ) | ||||
|  | ||||
| func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storage.CSINode) (modified bool) { | ||||
| 	if migratedPlugins == nil { | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	nodeInfoAnnotations := nodeInfo.GetAnnotations() | ||||
| 	if nodeInfoAnnotations == nil { | ||||
| 		nodeInfoAnnotations = map[string]string{} | ||||
| 	} | ||||
|  | ||||
| 	mpa := nodeInfoAnnotations[migratedPluginsAnnotationKey] | ||||
| 	tok := strings.Split(mpa, ",") | ||||
| 	oldAnnotationSet := sets.NewString(tok...) | ||||
|  | ||||
| 	newAnnotationSet := sets.NewString() | ||||
| 	for pluginName, migratedFunc := range migratedPlugins { | ||||
| 		if migratedFunc() { | ||||
| 			newAnnotationSet.Insert(pluginName) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if oldAnnotationSet.Equal(newAnnotationSet) { | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	nas := strings.Join(newAnnotationSet.List(), ",") | ||||
| 	if len(nas) != 0 { | ||||
| 		nodeInfoAnnotations[migratedPluginsAnnotationKey] = nas | ||||
| 	} else { | ||||
| 		delete(nodeInfoAnnotations, migratedPluginsAnnotationKey) | ||||
| 	} | ||||
|  | ||||
| 	nodeInfo.Annotations = nodeInfoAnnotations | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (nim *nodeInfoManager) installDriverToCSINode( | ||||
| 	nodeInfo *storage.CSINode, | ||||
| 	driverName string, | ||||
| @@ -453,7 +499,9 @@ func (nim *nodeInfoManager) installDriverToCSINode( | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if !specModified && !statusModified { | ||||
| 	annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) | ||||
|  | ||||
| 	if !specModified && !statusModified && !annotationModified { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| @@ -517,7 +565,10 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode( | ||||
| 			hasModified = true | ||||
| 		} | ||||
| 	} | ||||
| 	if !hasModified { | ||||
|  | ||||
| 	annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo) | ||||
|  | ||||
| 	if !hasModified && !annotationModified { | ||||
| 		// No changes, don't update | ||||
| 		return nil | ||||
| 	} | ||||
|   | ||||
| @@ -19,6 +19,7 @@ package nodeinfomanager | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| @@ -545,6 +546,156 @@ func TestUninstallCSIDriverCSINodeInfoDisabled(t *testing.T) { | ||||
| 	test(t, false /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases) | ||||
| } | ||||
|  | ||||
| func TestSetMigrationAnnotation(t *testing.T) { | ||||
| 	testcases := []struct { | ||||
| 		name            string | ||||
| 		migratedPlugins map[string](func() bool) | ||||
| 		existingNode    *storage.CSINode | ||||
| 		expectedNode    *storage.CSINode | ||||
| 		expectModified  bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "nil migrated plugins", | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name: "node1", | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name: "node1", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "one modified plugin", | ||||
| 			migratedPlugins: map[string](func() bool){ | ||||
| 				"test": func() bool { return true }, | ||||
| 			}, | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name: "node1", | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectModified: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "existing plugin", | ||||
| 			migratedPlugins: map[string](func() bool){ | ||||
| 				"test": func() bool { return true }, | ||||
| 			}, | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectModified: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:            "remove plugin", | ||||
| 			migratedPlugins: map[string](func() bool){}, | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectModified: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "one modified plugin, other annotations stable", | ||||
| 			migratedPlugins: map[string](func() bool){ | ||||
| 				"test": func() bool { return true }, | ||||
| 			}, | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{"other": "annotation"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectModified: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "multiple plugins modified, other annotations stable", | ||||
| 			migratedPlugins: map[string](func() bool){ | ||||
| 				"test": func() bool { return true }, | ||||
| 				"foo":  func() bool { return false }, | ||||
| 			}, | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{"other": "annotation", migratedPluginsAnnotationKey: "foo"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectModified: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "multiple plugins added, other annotations stable", | ||||
| 			migratedPlugins: map[string](func() bool){ | ||||
| 				"test": func() bool { return true }, | ||||
| 				"foo":  func() bool { return true }, | ||||
| 			}, | ||||
| 			existingNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{"other": "annotation"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedNode: &storage.CSINode{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node1", | ||||
| 					Annotations: map[string]string{migratedPluginsAnnotationKey: "foo,test", "other": "annotation"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectModified: true, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range testcases { | ||||
| 		t.Logf("test case: %s", tc.name) | ||||
|  | ||||
| 		modified := setMigrationAnnotation(tc.migratedPlugins, tc.existingNode) | ||||
| 		if modified != tc.expectModified { | ||||
| 			t.Errorf("Expected modified to be %v but got %v instead", tc.expectModified, modified) | ||||
| 		} | ||||
|  | ||||
| 		if !reflect.DeepEqual(tc.expectedNode, tc.existingNode) { | ||||
| 			t.Errorf("Expected CSINode: %v, but got: %v", tc.expectedNode, tc.existingNode) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestInstallCSIDriverExistingAnnotation(t *testing.T) { | ||||
| 	defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)() | ||||
|  | ||||
| @@ -591,7 +742,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) { | ||||
| 			nodeName, | ||||
| 		) | ||||
|  | ||||
| 		nim := NewNodeInfoManager(types.NodeName(nodeName), host) | ||||
| 		nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) | ||||
|  | ||||
| 		// Act | ||||
| 		_, err = nim.CreateCSINode() | ||||
| @@ -649,7 +800,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t | ||||
| 			nil, | ||||
| 			nodeName, | ||||
| 		) | ||||
| 		nim := NewNodeInfoManager(types.NodeName(nodeName), host) | ||||
| 		nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) | ||||
|  | ||||
| 		//// Act | ||||
| 		nim.CreateCSINode() | ||||
|   | ||||
| @@ -278,6 +278,13 @@ type BlockVolumePlugin interface { | ||||
| 	ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error) | ||||
| } | ||||
|  | ||||
| // KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet. | ||||
| type KubeletVolumeHost interface { | ||||
| 	// SetKubeletError lets plugins set an error on the Kubelet runtime status | ||||
| 	// that will cause the Kubelet to post NotReady status with the error message provided | ||||
| 	SetKubeletError(err error) | ||||
| } | ||||
|  | ||||
| // VolumeHost is an interface that plugins can use to access the kubelet. | ||||
| type VolumeHost interface { | ||||
| 	// GetPluginDir returns the absolute path to a directory under which | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 David Zhu
					David Zhu