mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Revert "Merge pull request #87258 from verult/slow-rxm-attach"
This reverts commit15c3f1b119, reversing changes made to52d7614a8c.
This commit is contained in:
		| @@ -16,7 +16,7 @@ go_library( | |||||||
|         "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", |         "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", | ||||||
|         "//pkg/kubelet/events:go_default_library", |         "//pkg/kubelet/events:go_default_library", | ||||||
|         "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", |         "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", | ||||||
|         "//pkg/volume/util:go_default_library", |         "//pkg/volume:go_default_library", | ||||||
|         "//pkg/volume/util/operationexecutor:go_default_library", |         "//pkg/volume/util/operationexecutor:go_default_library", | ||||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", |         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||||
|         "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", |         "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", | ||||||
|   | |||||||
| @@ -24,7 +24,7 @@ import ( | |||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	v1 "k8s.io/api/core/v1" | 	"k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/apimachinery/pkg/util/wait" | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	"k8s.io/client-go/tools/record" | 	"k8s.io/client-go/tools/record" | ||||||
| @@ -34,7 +34,7 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" | 	"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" | ||||||
| 	kevents "k8s.io/kubernetes/pkg/kubelet/events" | 	kevents "k8s.io/kubernetes/pkg/kubelet/events" | ||||||
| 	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" | 	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" | ||||||
| 	"k8s.io/kubernetes/pkg/volume/util" | 	"k8s.io/kubernetes/pkg/volume" | ||||||
| 	"k8s.io/kubernetes/pkg/volume/util/operationexecutor" | 	"k8s.io/kubernetes/pkg/volume/util/operationexecutor" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -134,6 +134,42 @@ func (rc *reconciler) syncStates() { | |||||||
| 	rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) | 	rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. | ||||||
|  | // In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns | ||||||
|  | // false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the | ||||||
|  | // attacher to fail fast in such cases. | ||||||
|  | // Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 | ||||||
|  | func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool { | ||||||
|  | 	if volumeSpec.Volume != nil { | ||||||
|  | 		// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach | ||||||
|  | 		if volumeSpec.Volume.AzureDisk != nil || | ||||||
|  | 			volumeSpec.Volume.Cinder != nil { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to | ||||||
|  | 	// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes | ||||||
|  | 	if volumeSpec.PersistentVolume != nil { | ||||||
|  | 		// Check for persistent volume types which do not fail when trying to multi-attach | ||||||
|  | 		if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { | ||||||
|  | 			// No access mode specified so we don't know for sure. Let the attacher fail if needed | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false | ||||||
|  | 		for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { | ||||||
|  | 			if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { | ||||||
|  | 				return false | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// we don't know if it's supported or not and let the attacher fail later in cases it's not supported | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  |  | ||||||
| func (rc *reconciler) reconcile() { | func (rc *reconciler) reconcile() { | ||||||
| 	// Detaches are triggered before attaches so that volumes referenced by | 	// Detaches are triggered before attaches so that volumes referenced by | ||||||
| 	// pods that are rescheduled to a different node are detached first. | 	// pods that are rescheduled to a different node are detached first. | ||||||
| @@ -146,17 +182,10 @@ func (rc *reconciler) reconcile() { | |||||||
| 			// This check must be done before we do any other checks, as otherwise the other checks | 			// This check must be done before we do any other checks, as otherwise the other checks | ||||||
| 			// may pass while at the same time the volume leaves the pending state, resulting in | 			// may pass while at the same time the volume leaves the pending state, resulting in | ||||||
| 			// double detach attempts | 			// double detach attempts | ||||||
| 			if util.IsMultiAttachForbidden(attachedVolume.VolumeSpec) { | 			if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { | ||||||
| 				if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) { | 				klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) | ||||||
| 					klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) |  | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			} else { |  | ||||||
| 				if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) { |  | ||||||
| 					klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName) |  | ||||||
| 					continue |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// Set the detach request time | 			// Set the detach request time | ||||||
| 			elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) | 			elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) | ||||||
| @@ -231,17 +260,15 @@ func (rc *reconciler) attachDesiredVolumes() { | |||||||
| 			rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) | 			rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		// Don't even try to start an operation if there is already one running | ||||||
| 		if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) { | 		if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") { | ||||||
|  |  | ||||||
| 			// Don't even try to start an operation if there is already one running for the given volume |  | ||||||
| 			if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) { |  | ||||||
| 			if klog.V(10) { | 			if klog.V(10) { | ||||||
| 				klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) | 				klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) | ||||||
| 			} | 			} | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { | ||||||
| 			nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) | 			nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) | ||||||
| 			if len(nodes) > 0 { | 			if len(nodes) > 0 { | ||||||
| 				if !volumeToAttach.MultiAttachErrorReported { | 				if !volumeToAttach.MultiAttachErrorReported { | ||||||
| @@ -250,17 +277,6 @@ func (rc *reconciler) attachDesiredVolumes() { | |||||||
| 				} | 				} | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 		} else { |  | ||||||
|  |  | ||||||
| 			// Don't even try to start an operation if there is already one running for the given volume and node. |  | ||||||
| 			if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) { |  | ||||||
| 				if klog.V(10) { |  | ||||||
| 					klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName) |  | ||||||
| 				} |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// Volume/Node doesn't exist, spawn a goroutine to attach it | 		// Volume/Node doesn't exist, spawn a goroutine to attach it | ||||||
|   | |||||||
| @@ -294,7 +294,7 @@ func (rc *reconciler) unmountDetachDevices() { | |||||||
| 	for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { | 	for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { | ||||||
| 		// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. | 		// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. | ||||||
| 		if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && | 		if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && | ||||||
| 			!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { | 			!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { | ||||||
| 			if attachedVolume.DeviceMayBeMounted() { | 			if attachedVolume.DeviceMayBeMounted() { | ||||||
| 				// Volume is globally mounted to device, unmount it | 				// Volume is globally mounted to device, unmount it | ||||||
| 				klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) | 				klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) | ||||||
| @@ -422,7 +422,7 @@ func (rc *reconciler) syncStates() { | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		// There is no pod that uses the volume. | 		// There is no pod that uses the volume. | ||||||
| 		if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { | 		if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) { | ||||||
| 			klog.Warning("Volume is in pending operation, skip cleaning up mounts") | 			klog.Warning("Volume is in pending operation, skip cleaning up mounts") | ||||||
| 		} | 		} | ||||||
| 		klog.V(2).Infof( | 		klog.V(2).Infof( | ||||||
|   | |||||||
| @@ -14,7 +14,6 @@ go_library( | |||||||
|         "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", |         "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", | ||||||
|         "//pkg/volume/util/types:go_default_library", |         "//pkg/volume/util/types:go_default_library", | ||||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", |         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||||
|         "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", |  | ||||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", |         "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", | ||||||
|         "//vendor/k8s.io/klog:go_default_library", |         "//vendor/k8s.io/klog:go_default_library", | ||||||
|     ], |     ], | ||||||
| @@ -28,7 +27,6 @@ go_test( | |||||||
|         "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", |         "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", | ||||||
|         "//pkg/volume/util/types:go_default_library", |         "//pkg/volume/util/types:go_default_library", | ||||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", |         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||||
|         "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", |  | ||||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", |         "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", | ||||||
|     ], |     ], | ||||||
| ) | ) | ||||||
|   | |||||||
| @@ -29,73 +29,45 @@ import ( | |||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
| 	"k8s.io/api/core/v1" | 	"k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" |  | ||||||
| 	k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" | 	k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
| 	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" | 	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" | ||||||
| 	volumetypes "k8s.io/kubernetes/pkg/volume/util/types" | 	"k8s.io/kubernetes/pkg/volume/util/types" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| 	// EmptyUniquePodName is a UniquePodName for empty string. | 	// EmptyUniquePodName is a UniquePodName for empty string. | ||||||
| 	EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("") | 	EmptyUniquePodName types.UniquePodName = types.UniquePodName("") | ||||||
|  |  | ||||||
| 	// EmptyUniqueVolumeName is a UniqueVolumeName for empty string | 	// EmptyUniqueVolumeName is a UniqueVolumeName for empty string | ||||||
| 	EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("") | 	EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("") | ||||||
|  |  | ||||||
| 	// EmptyNodeName is a NodeName for empty string |  | ||||||
| 	EmptyNodeName types.NodeName = types.NodeName("") |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // NestedPendingOperations defines the supported set of operations. | // NestedPendingOperations defines the supported set of operations. | ||||||
| type NestedPendingOperations interface { | type NestedPendingOperations interface { | ||||||
|  | 	// Run adds the concatenation of volumeName and podName to the list of | ||||||
| 	// Run adds the concatenation of volumeName and one of podName or nodeName to | 	// running operations and spawns a new go routine to execute operationFunc. | ||||||
| 	// the list of running operations and spawns a new go routine to execute | 	// If an operation with the same volumeName, same or empty podName | ||||||
| 	// OperationFunc inside generatedOperations. | 	// and same operationName exits, an AlreadyExists or ExponentialBackoff | ||||||
|  | 	// error is returned. If an operation with same volumeName and podName | ||||||
| 	// volumeName, podName, and nodeName collectively form the operation key. | 	// has ExponentialBackoff error but operationName is different, exponential | ||||||
| 	// The following forms of operation keys are supported: | 	// backoff is reset and operation is allowed to proceed. | ||||||
| 	// - volumeName empty, podName empty, nodeName empty | 	// This enables multiple operations to execute in parallel for the same | ||||||
| 	//   This key does not have any conflicting keys. | 	// volumeName as long as they have different podName. | ||||||
| 	// - volumeName exists, podName empty, nodeName empty |  | ||||||
| 	//   This key conflicts with all other keys with the same volumeName. |  | ||||||
| 	// - volumeName exists, podName exists, nodeName empty |  | ||||||
| 	//   This key conflicts with: |  | ||||||
| 	//   - the same volumeName and podName |  | ||||||
| 	//   - the same volumeName, but no podName |  | ||||||
| 	// - volumeName exists, podName empty, nodeName exists |  | ||||||
| 	//   This key conflicts with: |  | ||||||
| 	//   - the same volumeName and nodeName |  | ||||||
| 	//   - the same volumeName but no nodeName |  | ||||||
|  |  | ||||||
| 	// If an operation with the same operationName and a conflicting key exists, |  | ||||||
| 	// an AlreadyExists or ExponentialBackoff error is returned. |  | ||||||
| 	// If an operation with a conflicting key has ExponentialBackoff error but |  | ||||||
| 	// operationName is different, exponential backoff is reset and operation is |  | ||||||
| 	// allowed to proceed. |  | ||||||
|  |  | ||||||
| 	// Once the operation is complete, the go routine is terminated and the | 	// Once the operation is complete, the go routine is terminated and the | ||||||
| 	// concatenation of volumeName and (podName or nodeName) is removed from the | 	// concatenation of volumeName and podName is removed from the list of | ||||||
| 	// list of executing operations allowing a new operation to be started with | 	// executing operations allowing a new operation to be started with the | ||||||
| 	// the volumeName without error. | 	// volumeName without error. | ||||||
| 	Run( | 	Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error | ||||||
| 		volumeName v1.UniqueVolumeName, |  | ||||||
| 		podName volumetypes.UniquePodName, |  | ||||||
| 		nodeName types.NodeName, |  | ||||||
| 		generatedOperations volumetypes.GeneratedOperations) error |  | ||||||
|  |  | ||||||
| 	// Wait blocks until all operations are completed. This is typically | 	// Wait blocks until all operations are completed. This is typically | ||||||
| 	// necessary during tests - the test should wait until all operations finish | 	// necessary during tests - the test should wait until all operations finish | ||||||
| 	// and evaluate results after that. | 	// and evaluate results after that. | ||||||
| 	Wait() | 	Wait() | ||||||
|  |  | ||||||
| 	// IsOperationPending returns true if an operation for the given volumeName | 	// IsOperationPending returns true if an operation for the given volumeName and podName is pending, | ||||||
| 	// and one of podName or nodeName is pending, otherwise it returns false | 	// otherwise it returns false | ||||||
| 	IsOperationPending( | 	IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool | ||||||
| 		volumeName v1.UniqueVolumeName, |  | ||||||
| 		podName volumetypes.UniquePodName, |  | ||||||
| 		nodeName types.NodeName) bool |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewNestedPendingOperations returns a new instance of NestedPendingOperations. | // NewNestedPendingOperations returns a new instance of NestedPendingOperations. | ||||||
| @@ -116,7 +88,8 @@ type nestedPendingOperations struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type operation struct { | type operation struct { | ||||||
| 	key              operationKey | 	volumeName       v1.UniqueVolumeName | ||||||
|  | 	podName          types.UniquePodName | ||||||
| 	operationName    string | 	operationName    string | ||||||
| 	operationPending bool | 	operationPending bool | ||||||
| 	expBackoff       exponentialbackoff.ExponentialBackoff | 	expBackoff       exponentialbackoff.ExponentialBackoff | ||||||
| @@ -124,24 +97,22 @@ type operation struct { | |||||||
|  |  | ||||||
| func (grm *nestedPendingOperations) Run( | func (grm *nestedPendingOperations) Run( | ||||||
| 	volumeName v1.UniqueVolumeName, | 	volumeName v1.UniqueVolumeName, | ||||||
| 	podName volumetypes.UniquePodName, | 	podName types.UniquePodName, | ||||||
| 	nodeName types.NodeName, | 	generatedOperations types.GeneratedOperations) error { | ||||||
| 	generatedOperations volumetypes.GeneratedOperations) error { |  | ||||||
| 	grm.lock.Lock() | 	grm.lock.Lock() | ||||||
| 	defer grm.lock.Unlock() | 	defer grm.lock.Unlock() | ||||||
|  | 	opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) | ||||||
| 	opKey := operationKey{volumeName, podName, nodeName} |  | ||||||
|  |  | ||||||
| 	opExists, previousOpIndex := grm.isOperationExists(opKey) |  | ||||||
| 	if opExists { | 	if opExists { | ||||||
| 		previousOp := grm.operations[previousOpIndex] | 		previousOp := grm.operations[previousOpIndex] | ||||||
| 		// Operation already exists | 		// Operation already exists | ||||||
| 		if previousOp.operationPending { | 		if previousOp.operationPending { | ||||||
| 			// Operation is pending | 			// Operation is pending | ||||||
| 			return NewAlreadyExistsError(opKey) | 			operationKey := getOperationKey(volumeName, podName) | ||||||
|  | 			return NewAlreadyExistsError(operationKey) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		backOffErr := previousOp.expBackoff.SafeToRetry(opKey.String()) | 		operationKey := getOperationKey(volumeName, podName) | ||||||
|  | 		backOffErr := previousOp.expBackoff.SafeToRetry(operationKey) | ||||||
| 		if backOffErr != nil { | 		if backOffErr != nil { | ||||||
| 			if previousOp.operationName == generatedOperations.OperationName { | 			if previousOp.operationName == generatedOperations.OperationName { | ||||||
| 				return backOffErr | 				return backOffErr | ||||||
| @@ -153,13 +124,15 @@ func (grm *nestedPendingOperations) Run( | |||||||
|  |  | ||||||
| 		// Update existing operation to mark as pending. | 		// Update existing operation to mark as pending. | ||||||
| 		grm.operations[previousOpIndex].operationPending = true | 		grm.operations[previousOpIndex].operationPending = true | ||||||
| 		grm.operations[previousOpIndex].key = opKey | 		grm.operations[previousOpIndex].volumeName = volumeName | ||||||
|  | 		grm.operations[previousOpIndex].podName = podName | ||||||
| 	} else { | 	} else { | ||||||
| 		// Create a new operation | 		// Create a new operation | ||||||
| 		grm.operations = append(grm.operations, | 		grm.operations = append(grm.operations, | ||||||
| 			operation{ | 			operation{ | ||||||
| 				key:              opKey, |  | ||||||
| 				operationPending: true, | 				operationPending: true, | ||||||
|  | 				volumeName:       volumeName, | ||||||
|  | 				podName:          podName, | ||||||
| 				operationName:    generatedOperations.OperationName, | 				operationName:    generatedOperations.OperationName, | ||||||
| 				expBackoff:       exponentialbackoff.ExponentialBackoff{}, | 				expBackoff:       exponentialbackoff.ExponentialBackoff{}, | ||||||
| 			}) | 			}) | ||||||
| @@ -169,7 +142,7 @@ func (grm *nestedPendingOperations) Run( | |||||||
| 		// Handle unhandled panics (very unlikely) | 		// Handle unhandled panics (very unlikely) | ||||||
| 		defer k8sRuntime.HandleCrash() | 		defer k8sRuntime.HandleCrash() | ||||||
| 		// Handle completion of and error, if any, from operationFunc() | 		// Handle completion of and error, if any, from operationFunc() | ||||||
| 		defer grm.operationComplete(opKey, &detailedErr) | 		defer grm.operationComplete(volumeName, podName, &detailedErr) | ||||||
| 		return generatedOperations.Run() | 		return generatedOperations.Run() | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| @@ -178,14 +151,12 @@ func (grm *nestedPendingOperations) Run( | |||||||
|  |  | ||||||
| func (grm *nestedPendingOperations) IsOperationPending( | func (grm *nestedPendingOperations) IsOperationPending( | ||||||
| 	volumeName v1.UniqueVolumeName, | 	volumeName v1.UniqueVolumeName, | ||||||
| 	podName volumetypes.UniquePodName, | 	podName types.UniquePodName) bool { | ||||||
| 	nodeName types.NodeName) bool { |  | ||||||
|  |  | ||||||
| 	grm.lock.RLock() | 	grm.lock.RLock() | ||||||
| 	defer grm.lock.RUnlock() | 	defer grm.lock.RUnlock() | ||||||
|  |  | ||||||
| 	opKey := operationKey{volumeName, podName, nodeName} | 	exist, previousOpIndex := grm.isOperationExists(volumeName, podName) | ||||||
| 	exist, previousOpIndex := grm.isOperationExists(opKey) |  | ||||||
| 	if exist && grm.operations[previousOpIndex].operationPending { | 	if exist && grm.operations[previousOpIndex].operationPending { | ||||||
| 		return true | 		return true | ||||||
| 	} | 	} | ||||||
| @@ -193,52 +164,59 @@ func (grm *nestedPendingOperations) IsOperationPending( | |||||||
| } | } | ||||||
|  |  | ||||||
| // This is an internal function and caller should acquire and release the lock | // This is an internal function and caller should acquire and release the lock | ||||||
| func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) { | func (grm *nestedPendingOperations) isOperationExists( | ||||||
|  | 	volumeName v1.UniqueVolumeName, | ||||||
|  | 	podName types.UniquePodName) (bool, int) { | ||||||
|  |  | ||||||
| 	// If volumeName is empty, operation can be executed concurrently | 	// If volumeName is empty, operation can be executed concurrently | ||||||
| 	if key.volumeName == EmptyUniqueVolumeName { | 	if volumeName == EmptyUniqueVolumeName { | ||||||
| 		return false, -1 | 		return false, -1 | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for previousOpIndex, previousOp := range grm.operations { | 	for previousOpIndex, previousOp := range grm.operations { | ||||||
| 		volumeNameMatch := previousOp.key.volumeName == key.volumeName | 		if previousOp.volumeName != volumeName { | ||||||
|  | 			// No match, keep searching | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		podNameMatch := previousOp.key.podName == EmptyUniquePodName || | 		if previousOp.podName != EmptyUniquePodName && | ||||||
| 			key.podName == EmptyUniquePodName || | 			podName != EmptyUniquePodName && | ||||||
| 			previousOp.key.podName == key.podName | 			previousOp.podName != podName { | ||||||
|  | 			// No match, keep searching | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		nodeNameMatch := previousOp.key.nodeName == EmptyNodeName || | 		// Match | ||||||
| 			key.nodeName == EmptyNodeName || |  | ||||||
| 			previousOp.key.nodeName == key.nodeName |  | ||||||
|  |  | ||||||
| 		if volumeNameMatch && podNameMatch && nodeNameMatch { |  | ||||||
| 		return true, previousOpIndex | 		return true, previousOpIndex | ||||||
| 	} | 	} | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return false, -1 | 	return false, -1 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) { | func (grm *nestedPendingOperations) getOperation( | ||||||
|  | 	volumeName v1.UniqueVolumeName, | ||||||
|  | 	podName types.UniquePodName) (uint, error) { | ||||||
| 	// Assumes lock has been acquired by caller. | 	// Assumes lock has been acquired by caller. | ||||||
|  |  | ||||||
| 	for i, op := range grm.operations { | 	for i, op := range grm.operations { | ||||||
| 		if op.key.volumeName == key.volumeName && | 		if op.volumeName == volumeName && | ||||||
| 			op.key.podName == key.podName { | 			op.podName == podName { | ||||||
| 			return uint(i), nil | 			return uint(i), nil | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return 0, fmt.Errorf("Operation %q not found", key) | 	logOperationKey := getOperationKey(volumeName, podName) | ||||||
|  | 	return 0, fmt.Errorf("Operation %q not found", logOperationKey) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (grm *nestedPendingOperations) deleteOperation(key operationKey) { | func (grm *nestedPendingOperations) deleteOperation( | ||||||
| 	// Assumes lock has been acquired by caller. | 	// Assumes lock has been acquired by caller. | ||||||
|  | 	volumeName v1.UniqueVolumeName, | ||||||
|  | 	podName types.UniquePodName) { | ||||||
|  |  | ||||||
| 	opIndex := -1 | 	opIndex := -1 | ||||||
| 	for i, op := range grm.operations { | 	for i, op := range grm.operations { | ||||||
| 		if op.key.volumeName == key.volumeName && | 		if op.volumeName == volumeName && | ||||||
| 			op.key.podName == key.podName { | 			op.podName == podName { | ||||||
| 			opIndex = i | 			opIndex = i | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| @@ -249,7 +227,8 @@ func (grm *nestedPendingOperations) deleteOperation(key operationKey) { | |||||||
| 	grm.operations = grm.operations[:len(grm.operations)-1] | 	grm.operations = grm.operations[:len(grm.operations)-1] | ||||||
| } | } | ||||||
|  |  | ||||||
| func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) { | func (grm *nestedPendingOperations) operationComplete( | ||||||
|  | 	volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) { | ||||||
| 	// Defer operations are executed in Last-In is First-Out order. In this case | 	// Defer operations are executed in Last-In is First-Out order. In this case | ||||||
| 	// the lock is acquired first when operationCompletes begins, and is | 	// the lock is acquired first when operationCompletes begins, and is | ||||||
| 	// released when the method finishes, after the lock is released cond is | 	// released when the method finishes, after the lock is released cond is | ||||||
| @@ -260,20 +239,24 @@ func (grm *nestedPendingOperations) operationComplete(key operationKey, err *err | |||||||
|  |  | ||||||
| 	if *err == nil || !grm.exponentialBackOffOnError { | 	if *err == nil || !grm.exponentialBackOffOnError { | ||||||
| 		// Operation completed without error, or exponentialBackOffOnError disabled | 		// Operation completed without error, or exponentialBackOffOnError disabled | ||||||
| 		grm.deleteOperation(key) | 		grm.deleteOperation(volumeName, podName) | ||||||
| 		if *err != nil { | 		if *err != nil { | ||||||
| 			// Log error | 			// Log error | ||||||
| 			klog.Errorf("operation %s failed with: %v", key, *err) | 			logOperationKey := getOperationKey(volumeName, podName) | ||||||
|  | 			klog.Errorf("operation %s failed with: %v", | ||||||
|  | 				logOperationKey, | ||||||
|  | 				*err) | ||||||
| 		} | 		} | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Operation completed with error and exponentialBackOffOnError Enabled | 	// Operation completed with error and exponentialBackOffOnError Enabled | ||||||
| 	existingOpIndex, getOpErr := grm.getOperation(key) | 	existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) | ||||||
| 	if getOpErr != nil { | 	if getOpErr != nil { | ||||||
| 		// Failed to find existing operation | 		// Failed to find existing operation | ||||||
|  | 		logOperationKey := getOperationKey(volumeName, podName) | ||||||
| 		klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", | 		klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", | ||||||
| 			key, | 			logOperationKey, | ||||||
| 			*err) | 			*err) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| @@ -282,8 +265,10 @@ func (grm *nestedPendingOperations) operationComplete(key operationKey, err *err | |||||||
| 	grm.operations[existingOpIndex].operationPending = false | 	grm.operations[existingOpIndex].operationPending = false | ||||||
|  |  | ||||||
| 	// Log error | 	// Log error | ||||||
|  | 	operationKey := | ||||||
|  | 		getOperationKey(volumeName, podName) | ||||||
| 	klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. | 	klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. | ||||||
| 		GenerateNoRetriesPermittedMsg(key.String())) | 		GenerateNoRetriesPermittedMsg(operationKey)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (grm *nestedPendingOperations) Wait() { | func (grm *nestedPendingOperations) Wait() { | ||||||
| @@ -295,22 +280,21 @@ func (grm *nestedPendingOperations) Wait() { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| type operationKey struct { | func getOperationKey( | ||||||
| 	volumeName v1.UniqueVolumeName | 	volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { | ||||||
| 	podName    volumetypes.UniquePodName | 	podNameStr := "" | ||||||
| 	nodeName   types.NodeName | 	if podName != EmptyUniquePodName { | ||||||
| } | 		podNameStr = fmt.Sprintf(" (%q)", podName) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| func (key operationKey) String() string { | 	return fmt.Sprintf("%q%s", | ||||||
| 	return fmt.Sprintf("{volumeName=%q, podName=%q, nodeName=%q}", | 		volumeName, | ||||||
| 		key.volumeName, | 		podNameStr) | ||||||
| 		key.podName, |  | ||||||
| 		key.nodeName) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewAlreadyExistsError returns a new instance of AlreadyExists error. | // NewAlreadyExistsError returns a new instance of AlreadyExists error. | ||||||
| func NewAlreadyExistsError(key operationKey) error { | func NewAlreadyExistsError(operationKey string) error { | ||||||
| 	return alreadyExistsError{key} | 	return alreadyExistsError{operationKey} | ||||||
| } | } | ||||||
|  |  | ||||||
| // IsAlreadyExists returns true if an error returned from | // IsAlreadyExists returns true if an error returned from | ||||||
| @@ -329,7 +313,7 @@ func IsAlreadyExists(err error) bool { | |||||||
| // new operation can not be started because an operation with the same operation | // new operation can not be started because an operation with the same operation | ||||||
| // name is already executing. | // name is already executing. | ||||||
| type alreadyExistsError struct { | type alreadyExistsError struct { | ||||||
| 	operationKey operationKey | 	operationKey string | ||||||
| } | } | ||||||
|  |  | ||||||
| var _ error = alreadyExistsError{} | var _ error = alreadyExistsError{} | ||||||
|   | |||||||
| @@ -22,10 +22,9 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"k8s.io/api/core/v1" | 	"k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" |  | ||||||
| 	"k8s.io/apimachinery/pkg/util/wait" | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" | 	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" | ||||||
| 	volumetypes "k8s.io/kubernetes/pkg/volume/util/types" | 	"k8s.io/kubernetes/pkg/volume/util/types" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -45,22 +44,22 @@ const ( | |||||||
| 	initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond | 	initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) { | func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation := func() (error, error) { return nil, nil } | 	operation := func() (error, error) { return nil, nil } | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) | 	err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { | func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volume1Name := v1.UniqueVolumeName("volume1-name") | 	volume1Name := v1.UniqueVolumeName("volume1-name") | ||||||
| @@ -68,65 +67,65 @@ func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { | |||||||
| 	operation := func() (error, error) { return nil, nil } | 	operation := func() (error, error) { return nil, nil } | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err1 := grm.Run(volume1Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) | 	err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) | ||||||
| 	err2 := grm.Run(volume2Name, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) | 	err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1) | 		t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2) | 		t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) { | func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1PodName := volumetypes.UniquePodName("operation1-podname") | 	operation1PodName := types.UniquePodName("operation1-podname") | ||||||
| 	operation2PodName := volumetypes.UniquePodName("operation2-podname") | 	operation2PodName := types.UniquePodName("operation2-podname") | ||||||
| 	operation := func() (error, error) { return nil, nil } | 	operation := func() (error, error) { return nil, nil } | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err1 := grm.Run(volumeName, operation1PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) | 	err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) | ||||||
| 	err2 := grm.Run(volumeName, operation2PodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) | 	err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1) | 		t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2) | 		t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation := func() (error, error) { return nil, nil } | 	operation := func() (error, error) { return nil, nil } | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation}) | 	err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) { | func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateCallbackFunc(operation1DoneCh) | 	operation1 := generateCallbackFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
| 	<-operation1DoneCh // Force operation1 to complete | 	<-operation1DoneCh // Force operation1 to complete | ||||||
| @@ -135,9 +134,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testin | |||||||
| 	err2 := retryWithExponentialBackOff( | 	err2 := retryWithExponentialBackOff( | ||||||
| 		time.Duration(initialOperationWaitTimeShort), | 		time.Duration(initialOperationWaitTimeShort), | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 			err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) | 				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 			return true, nil | ||||||
| @@ -146,19 +145,19 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testin | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateCallbackFunc(operation1DoneCh) | 	operation1 := generateCallbackFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
| 	<-operation1DoneCh // Force operation1 to complete | 	<-operation1DoneCh // Force operation1 to complete | ||||||
| @@ -167,9 +166,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBac | |||||||
| 	err2 := retryWithExponentialBackOff( | 	err2 := retryWithExponentialBackOff( | ||||||
| 		time.Duration(initialOperationWaitTimeShort), | 		time.Duration(initialOperationWaitTimeShort), | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 			err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) | 				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 			return true, nil | ||||||
| @@ -178,18 +177,18 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBac | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) { | func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1 := generatePanicFunc() | 	operation1 := generatePanicFunc() | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
|  |  | ||||||
| @@ -197,9 +196,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T | |||||||
| 	err2 := retryWithExponentialBackOff( | 	err2 := retryWithExponentialBackOff( | ||||||
| 		time.Duration(initialOperationWaitTimeShort), | 		time.Duration(initialOperationWaitTimeShort), | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 			err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) | 				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 			return true, nil | ||||||
| @@ -208,18 +207,18 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1 := generatePanicFunc() | 	operation1 := generatePanicFunc() | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
|  |  | ||||||
| @@ -227,9 +226,9 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackof | |||||||
| 	err2 := retryWithExponentialBackOff( | 	err2 := retryWithExponentialBackOff( | ||||||
| 		time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff | 		time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 			err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) | 				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 			return true, nil | ||||||
| @@ -238,43 +237,43 @@ func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackof | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { | func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 	err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 == nil { | 	if err2 == nil { | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | 		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | ||||||
| 	} | 	} | ||||||
| 	if !IsAlreadyExists(err2) { | 	if !IsAlreadyExists(err2) { | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) | 		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { | func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	op1Name := "mount_volume" | 	op1Name := "mount_volume" | ||||||
| 	operation1 := generateErrorFunc() | 	operation1 := generateErrorFunc() | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	// Shorter than exponential backoff period, so as to trigger exponential backoff error on second | 	// Shorter than exponential backoff period, so as to trigger exponential backoff error on second | ||||||
| 	// operation. | 	// operation. | ||||||
| @@ -284,8 +283,7 @@ func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *te | |||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, | 			err := grm.Run(volumeName, | ||||||
| 				"", /* operationSubName */ | 				"", /* operationSubName */ | ||||||
| 				"", /* nodeName */ | 				types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) | ||||||
| 				volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) |  | ||||||
|  |  | ||||||
| 			if exponentialbackoff.IsExponentialBackoff(err) { | 			if exponentialbackoff.IsExponentialBackoff(err) { | ||||||
| 				return true, nil | 				return true, nil | ||||||
| @@ -296,114 +294,114 @@ func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *te | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 != nil { | 	if err2 != nil { | ||||||
| 		t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) | 		t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	operation3 := generateNoopFunc() | 	operation3 := generateNoopFunc() | ||||||
| 	op3Name := "unmount_volume" | 	op3Name := "unmount_volume" | ||||||
| 	// Act | 	// Act | ||||||
| 	err3 := grm.Run(volumeName, "" /*pod name*/, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) | 	err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) | ||||||
| 	if err3 != nil { | 	if err3 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected <no error> Actual: <%v>", err3) | 		t.Fatalf("NewGoRoutine failed. Expected <no error> Actual: <%v>", err3) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { | func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operationPodName := volumetypes.UniquePodName("operation-podname") | 	operationPodName := types.UniquePodName("operation-podname") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 	err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 == nil { | 	if err2 == nil { | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | 		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | ||||||
| 	} | 	} | ||||||
| 	if !IsAlreadyExists(err2) { | 	if !IsAlreadyExists(err2) { | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) | 		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { | func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operationPodName := volumetypes.UniquePodName("operation-podname") | 	operationPodName := types.UniquePodName("operation-podname") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err2 := grm.Run(volumeName, operationPodName, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 	err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 == nil { | 	if err2 == nil { | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | 		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | ||||||
| 	} | 	} | ||||||
| 	if !IsAlreadyExists(err2) { | 	if !IsAlreadyExists(err2) { | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) | 		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 	err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 == nil { | 	if err2 == nil { | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | 		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | ||||||
| 	} | 	} | ||||||
| 	if !IsAlreadyExists(err2) { | 	if !IsAlreadyExists(err2) { | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) | 		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { | func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
| 	operation3 := generateNoopFunc() | 	operation3 := generateNoopFunc() | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 	err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 == nil { | 	if err2 == nil { | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | 		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | ||||||
| 	} | 	} | ||||||
| 	if !IsAlreadyExists(err2) { | 	if !IsAlreadyExists(err2) { | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) | 		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| @@ -411,9 +409,9 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing | |||||||
| 	err3 := retryWithExponentialBackOff( | 	err3 := retryWithExponentialBackOff( | ||||||
| 		time.Duration(initialOperationWaitTimeShort), | 		time.Duration(initialOperationWaitTimeShort), | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3}) | 			err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) | 				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 			return true, nil | ||||||
| @@ -422,32 +420,32 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err3 != nil { | 	if err3 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err1 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err1 != nil { | 	if err1 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1) | ||||||
| 	} | 	} | ||||||
| 	operation2 := generateNoopFunc() | 	operation2 := generateNoopFunc() | ||||||
| 	operation3 := generateNoopFunc() | 	operation3 := generateNoopFunc() | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| 	err2 := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2}) | 	err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) | ||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err2 == nil { | 	if err2 == nil { | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | 		t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) | ||||||
| 	} | 	} | ||||||
| 	if !IsAlreadyExists(err2) { | 	if !IsAlreadyExists(err2) { | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) | 		t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| @@ -455,9 +453,9 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBack | |||||||
| 	err3 := retryWithExponentialBackOff( | 	err3 := retryWithExponentialBackOff( | ||||||
| 		time.Duration(initialOperationWaitTimeShort), | 		time.Duration(initialOperationWaitTimeShort), | ||||||
| 		func() (bool, error) { | 		func() (bool, error) { | ||||||
| 			err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3}) | 			err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) | 				t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 			return true, nil | ||||||
| @@ -466,11 +464,11 @@ func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBack | |||||||
|  |  | ||||||
| 	// Assert | 	// Assert | ||||||
| 	if err3 != nil { | 	if err3 != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { | func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { | ||||||
| 	// Test than Wait() on empty GoRoutineMap always succeeds without blocking | 	// Test than Wait() on empty GoRoutineMap always succeeds without blocking | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| @@ -489,7 +487,7 @@ func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { | ||||||
| 	// Test than Wait() on empty GoRoutineMap always succeeds without blocking | 	// Test than Wait() on empty GoRoutineMap always succeeds without blocking | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| @@ -508,16 +506,16 @@ func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { | func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { | ||||||
| 	// Test that Wait() really blocks until the last operation succeeds | 	// Test that Wait() really blocks until the last operation succeeds | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| @@ -537,16 +535,16 @@ func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { | func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { | ||||||
| 	// Test that Wait() really blocks until the last operation succeeds | 	// Test that Wait() really blocks until the last operation succeeds | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | 	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) | ||||||
| 	volumeName := v1.UniqueVolumeName("volume-name") | 	volumeName := v1.UniqueVolumeName("volume-name") | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) | 	operation1 := generateWaitFunc(operation1DoneCh) | ||||||
| 	err := grm.Run(volumeName, "" /* operationSubName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) | 	err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) | 		t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Act | 	// Act | ||||||
| @@ -566,215 +564,6 @@ func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| /* Concurrent operations tests */ |  | ||||||
|  |  | ||||||
| // Covered cases: |  | ||||||
| // FIRST OP    | SECOND OP   | RESULT |  | ||||||
| // None        | None        | Positive |  | ||||||
| // None        | Volume      | Positive |  | ||||||
| // None        | Volume Pod  | Positive |  | ||||||
| // None        | Volume Node | Positive |  | ||||||
| // Volume      | None        | Positive |  | ||||||
| // Volume      | Volume      | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above) |  | ||||||
| // Volume      | Volume Pod  | Negative |  | ||||||
| // Volume      | Volume Node | Negative |  | ||||||
| // Volume Pod  | None        | Positive |  | ||||||
| // Volume Pod  | Volume      | Negative |  | ||||||
| // Volume Pod  | Volume Pod  | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above) |  | ||||||
| // Volume Node | None        | Positive |  | ||||||
| // Volume Node | Volume      | Negative |  | ||||||
| // Volume Node | Volume Node | Negative |  | ||||||
|  |  | ||||||
| // These cases are not covered because they will never occur within the same |  | ||||||
| // binary, so either result works. |  | ||||||
| // Volume Pod  | Volume Node |  | ||||||
| // Volume Node | Volume Pod |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondNone(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolume(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumePod(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		volumetypes.UniquePodName("operation-podname"), |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstNone_SecondVolumeNode(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		types.NodeName("operation-nodename")) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolume_SecondNone(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumePod(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsNegative(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		volumetypes.UniquePodName("operation-podname"), |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolume_SecondVolumeNode(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsNegative(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		types.NodeName("operation-nodename")) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondNone(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		volumetypes.UniquePodName("operation-podname"), |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumePod_SecondVolume(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsNegative(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		volumetypes.UniquePodName("operation-podname"), |  | ||||||
| 		"", /* nodeName1 */ |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Positive_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondNone(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsPositive(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		types.NodeName("operation-nodename"), |  | ||||||
| 		"", /* volumeName1 */ |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolume(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsNegative(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		types.NodeName("operation-nodename"), |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		"" /* nodeName2 */) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes_FirstVolumeNode_SecondVolumeNode(t *testing.T) { |  | ||||||
| 	testConcurrentOperationsNegative(t, |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName1 */ |  | ||||||
| 		types.NodeName("operation-nodename"), |  | ||||||
| 		v1.UniqueVolumeName("volume-name"), |  | ||||||
| 		"", /* podName2 */ |  | ||||||
| 		types.NodeName("operation-nodename")) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // testConcurrentOperationsPositive passes if the two operations keyed by the |  | ||||||
| // provided parameters are executed in parallel, and fails otherwise. |  | ||||||
| func testConcurrentOperationsPositive( |  | ||||||
| 	t *testing.T, |  | ||||||
| 	volumeName1 v1.UniqueVolumeName, |  | ||||||
| 	podName1 volumetypes.UniquePodName, |  | ||||||
| 	nodeName1 types.NodeName, |  | ||||||
| 	volumeName2 v1.UniqueVolumeName, |  | ||||||
| 	podName2 volumetypes.UniquePodName, |  | ||||||
| 	nodeName2 types.NodeName) { |  | ||||||
|  |  | ||||||
| 	// Arrange |  | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |  | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) |  | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) |  | ||||||
| 	err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) |  | ||||||
| 	if err1 != nil { |  | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |  | ||||||
| 	} |  | ||||||
| 	operation2 := generateNoopFunc() |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if err2 != nil { |  | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // testConcurrentOperationsNegative passes if the creation of the second |  | ||||||
| // operation returns an alreadyExists error, and fails otherwise. |  | ||||||
| func testConcurrentOperationsNegative( |  | ||||||
| 	t *testing.T, |  | ||||||
| 	volumeName1 v1.UniqueVolumeName, |  | ||||||
| 	podName1 volumetypes.UniquePodName, |  | ||||||
| 	nodeName1 types.NodeName, |  | ||||||
| 	volumeName2 v1.UniqueVolumeName, |  | ||||||
| 	podName2 volumetypes.UniquePodName, |  | ||||||
| 	nodeName2 types.NodeName) { |  | ||||||
|  |  | ||||||
| 	// Arrange |  | ||||||
| 	grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |  | ||||||
| 	operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) |  | ||||||
| 	operation1 := generateWaitFunc(operation1DoneCh) |  | ||||||
| 	err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) |  | ||||||
| 	if err1 != nil { |  | ||||||
| 		t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |  | ||||||
| 	} |  | ||||||
| 	operation2 := generateNoopFunc() |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if err2 == nil { |  | ||||||
| 		t.Fatalf("NestedPendingOperations did not fail. Expected an operation to already exist") |  | ||||||
| 	} |  | ||||||
| 	if !IsAlreadyExists(err2) { |  | ||||||
| 		t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /* END concurrent operations tests */ |  | ||||||
|  |  | ||||||
| func generateCallbackFunc(done chan<- interface{}) func() (error, error) { | func generateCallbackFunc(done chan<- interface{}) func() (error, error) { | ||||||
| 	return func() (error, error) { | 	return func() (error, error) { | ||||||
| 		done <- true | 		done <- true | ||||||
|   | |||||||
| @@ -138,9 +138,9 @@ type OperationExecutor interface { | |||||||
| 	// back off on retries. | 	// back off on retries. | ||||||
| 	VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error | 	VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error | ||||||
|  |  | ||||||
| 	// IsOperationPending returns true if an operation for the given volumeName | 	// IsOperationPending returns true if an operation for the given volumeName and podName is pending, | ||||||
| 	// and one of podName or nodeName is pending, otherwise it returns false | 	// otherwise it returns false | ||||||
| 	IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool | 	IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool | ||||||
| 	// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. | 	// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. | ||||||
| 	ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error | 	ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error | ||||||
| 	// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin | 	// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin | ||||||
| @@ -650,11 +650,8 @@ type operationExecutor struct { | |||||||
| 	operationGenerator OperationGenerator | 	operationGenerator OperationGenerator | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) IsOperationPending( | func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { | ||||||
| 	volumeName v1.UniqueVolumeName, | 	return oe.pendingOperations.IsOperationPending(volumeName, podName) | ||||||
| 	podName volumetypes.UniquePodName, |  | ||||||
| 	nodeName types.NodeName) bool { |  | ||||||
| 	return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) AttachVolume( | func (oe *operationExecutor) AttachVolume( | ||||||
| @@ -663,13 +660,8 @@ func (oe *operationExecutor) AttachVolume( | |||||||
| 	generatedOperations := | 	generatedOperations := | ||||||
| 		oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) | 		oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) | ||||||
|  |  | ||||||
| 	if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) { |  | ||||||
| 	return oe.pendingOperations.Run( | 	return oe.pendingOperations.Run( | ||||||
| 			volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) | 		volumeToAttach.VolumeName, "" /* podName */, generatedOperations) | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return oe.pendingOperations.Run( |  | ||||||
| 		volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) DetachVolume( | func (oe *operationExecutor) DetachVolume( | ||||||
| @@ -682,13 +674,8 @@ func (oe *operationExecutor) DetachVolume( | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if util.IsMultiAttachForbidden(volumeToDetach.VolumeSpec) { |  | ||||||
| 	return oe.pendingOperations.Run( | 	return oe.pendingOperations.Run( | ||||||
| 			volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) | 		volumeToDetach.VolumeName, "" /* podName */, generatedOperations) | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return oe.pendingOperations.Run( |  | ||||||
| 		volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) VerifyVolumesAreAttached( | func (oe *operationExecutor) VerifyVolumesAreAttached( | ||||||
| @@ -774,7 +761,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( | |||||||
|  |  | ||||||
| 		// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin | 		// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin | ||||||
| 		uniquePluginName := v1.UniqueVolumeName(pluginName) | 		uniquePluginName := v1.UniqueVolumeName(pluginName) | ||||||
| 		err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations) | 		err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q  with %v", pluginName, err) | 			klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q  with %v", pluginName, err) | ||||||
| 		} | 		} | ||||||
| @@ -792,7 +779,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Give an empty UniqueVolumeName so that this operation could be executed concurrently. | 	// Give an empty UniqueVolumeName so that this operation could be executed concurrently. | ||||||
| 	return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations) | 	return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) MountVolume( | func (oe *operationExecutor) MountVolume( | ||||||
| @@ -833,7 +820,7 @@ func (oe *operationExecutor) MountVolume( | |||||||
|  |  | ||||||
| 	// TODO mount_device | 	// TODO mount_device | ||||||
| 	return oe.pendingOperations.Run( | 	return oe.pendingOperations.Run( | ||||||
| 		volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations) | 		volumeToMount.VolumeName, podName, generatedOperations) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) UnmountVolume( | func (oe *operationExecutor) UnmountVolume( | ||||||
| @@ -864,7 +851,7 @@ func (oe *operationExecutor) UnmountVolume( | |||||||
| 	podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) | 	podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) | ||||||
|  |  | ||||||
| 	return oe.pendingOperations.Run( | 	return oe.pendingOperations.Run( | ||||||
| 		volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations) | 		volumeToUnmount.VolumeName, podName, generatedOperations) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) UnmountDevice( | func (oe *operationExecutor) UnmountDevice( | ||||||
| @@ -895,7 +882,7 @@ func (oe *operationExecutor) UnmountDevice( | |||||||
| 	podName := nestedpendingoperations.EmptyUniquePodName | 	podName := nestedpendingoperations.EmptyUniquePodName | ||||||
|  |  | ||||||
| 	return oe.pendingOperations.Run( | 	return oe.pendingOperations.Run( | ||||||
| 		deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) | 		deviceToDetach.VolumeName, podName, generatedOperations) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { | func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { | ||||||
| @@ -903,7 +890,7 @@ func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actu | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations) | 	return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (oe *operationExecutor) VerifyControllerAttachedVolume( | func (oe *operationExecutor) VerifyControllerAttachedVolume( | ||||||
| @@ -917,7 +904,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return oe.pendingOperations.Run( | 	return oe.pendingOperations.Run( | ||||||
| 		volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) | 		volumeToMount.VolumeName, "" /* podName */, generatedOperations) | ||||||
| } | } | ||||||
|  |  | ||||||
| // ReconstructVolumeOperation return a func to create volumeSpec from mount path | // ReconstructVolumeOperation return a func to create volumeSpec from mount path | ||||||
|   | |||||||
| @@ -17,7 +17,6 @@ limitations under the License. | |||||||
| package operationexecutor | package operationexecutor | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"fmt" |  | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
| @@ -181,7 +180,7 @@ func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) { | func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	ch, quit, oe := setup() | 	ch, quit, oe := setup() | ||||||
| 	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) | 	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) | ||||||
| @@ -192,13 +191,6 @@ func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testi | |||||||
| 		volumesToAttach[i] = VolumeToAttach{ | 		volumesToAttach[i] = VolumeToAttach{ | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), | 			VolumeName: v1.UniqueVolumeName(pdName), | ||||||
| 			NodeName:   "node", | 			NodeName:   "node", | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} | 		} | ||||||
| 		oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) | 		oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) | ||||||
| 	} | 	} | ||||||
| @@ -209,91 +201,7 @@ func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testi | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) { | func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
| 	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) |  | ||||||
| 	pdName := "pd-volume" |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := range volumesToAttach { |  | ||||||
| 		volumesToAttach[i] = VolumeToAttach{ |  | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), |  | ||||||
| 			NodeName:   "node", |  | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 		oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunSerially(ch, quit) { |  | ||||||
| 		t.Fatalf("Attach volume operations should not start concurrently") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { |  | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
| 	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) |  | ||||||
| 	pdName := "pd-volume" |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := range volumesToAttach { |  | ||||||
| 		volumesToAttach[i] = VolumeToAttach{ |  | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), |  | ||||||
| 			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)), |  | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 		oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunSerially(ch, quit) { |  | ||||||
| 		t.Fatalf("Attach volume operations should not start concurrently") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { |  | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
| 	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) |  | ||||||
| 	pdName := "pd-volume" |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := range volumesToAttach { |  | ||||||
| 		volumesToAttach[i] = VolumeToAttach{ |  | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), |  | ||||||
| 			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)), |  | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 		oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) { |  | ||||||
| 		t.Fatalf("Attach volume operations should not execute serially") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) { |  | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	ch, quit, oe := setup() | 	ch, quit, oe := setup() | ||||||
| 	attachedVolumes := make([]AttachedVolume, numVolumesToDetach) | 	attachedVolumes := make([]AttachedVolume, numVolumesToDetach) | ||||||
| @@ -304,13 +212,6 @@ func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *tes | |||||||
| 		attachedVolumes[i] = AttachedVolume{ | 		attachedVolumes[i] = AttachedVolume{ | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), | 			VolumeName: v1.UniqueVolumeName(pdName), | ||||||
| 			NodeName:   "node", | 			NodeName:   "node", | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} | 		} | ||||||
| 		oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) | 		oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) | ||||||
| 	} | 	} | ||||||
| @@ -321,91 +222,7 @@ func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *tes | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) { | func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
| 	attachedVolumes := make([]AttachedVolume, numVolumesToDetach) |  | ||||||
| 	pdName := "pd-volume" |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := range attachedVolumes { |  | ||||||
| 		attachedVolumes[i] = AttachedVolume{ |  | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), |  | ||||||
| 			NodeName:   "node", |  | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 		oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunSerially(ch, quit) { |  | ||||||
| 		t.Fatalf("DetachVolume operations should not run concurrently") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { |  | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
| 	attachedVolumes := make([]AttachedVolume, numVolumesToDetach) |  | ||||||
| 	pdName := "pd-volume" |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := range attachedVolumes { |  | ||||||
| 		attachedVolumes[i] = AttachedVolume{ |  | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), |  | ||||||
| 			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)), |  | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 		oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunSerially(ch, quit) { |  | ||||||
| 		t.Fatalf("DetachVolume operations should not run concurrently") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { |  | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
| 	attachedVolumes := make([]AttachedVolume, numVolumesToDetach) |  | ||||||
| 	pdName := "pd-volume" |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := range attachedVolumes { |  | ||||||
| 		attachedVolumes[i] = AttachedVolume{ |  | ||||||
| 			VolumeName: v1.UniqueVolumeName(pdName), |  | ||||||
| 			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)), |  | ||||||
| 			VolumeSpec: &volume.Spec{ |  | ||||||
| 				PersistentVolume: &v1.PersistentVolume{ |  | ||||||
| 					Spec: v1.PersistentVolumeSpec{ |  | ||||||
| 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 		} |  | ||||||
| 		oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) { |  | ||||||
| 		t.Fatalf("Attach volume operations should not execute serially") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) { |  | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	ch, quit, oe := setup() | 	ch, quit, oe := setup() | ||||||
|  |  | ||||||
| @@ -420,24 +237,6 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *tes | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) { |  | ||||||
| 	// Arrange |  | ||||||
| 	ch, quit, oe := setup() |  | ||||||
|  |  | ||||||
| 	// Act |  | ||||||
| 	for i := 0; i < numVolumesToVerifyAttached; i++ { |  | ||||||
| 		oe.VerifyVolumesAreAttachedPerNode( |  | ||||||
| 			nil, /* attachedVolumes */ |  | ||||||
| 			types.NodeName(fmt.Sprintf("node-name-%d", i)), |  | ||||||
| 			nil /* actualStateOfWorldAttacherUpdater */) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Assert |  | ||||||
| 	if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) { |  | ||||||
| 		t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { | func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { | ||||||
| 	// Arrange | 	// Arrange | ||||||
| 	ch, quit, oe := setup() | 	ch, quit, oe := setup() | ||||||
|   | |||||||
| @@ -644,44 +644,3 @@ func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error { | |||||||
| 	// For linux runtime, it skips because unmount will automatically flush disk data | 	// For linux runtime, it skips because unmount will automatically flush disk data | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // IsMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. |  | ||||||
| // In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns |  | ||||||
| // false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the |  | ||||||
| // attacher to fail fast in such cases. |  | ||||||
| // Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 |  | ||||||
| func IsMultiAttachForbidden(volumeSpec *volume.Spec) bool { |  | ||||||
| 	if volumeSpec == nil { |  | ||||||
| 		// we don't know if it's supported or not and let the attacher fail later in cases it's not supported |  | ||||||
| 		return false |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if volumeSpec.Volume != nil { |  | ||||||
| 		// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach |  | ||||||
| 		if volumeSpec.Volume.AzureDisk != nil || |  | ||||||
| 			volumeSpec.Volume.Cinder != nil { |  | ||||||
| 			return true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to |  | ||||||
| 	// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes |  | ||||||
| 	if volumeSpec.PersistentVolume != nil { |  | ||||||
| 		// Check for persistent volume types which do not fail when trying to multi-attach |  | ||||||
| 		if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { |  | ||||||
| 			// No access mode specified so we don't know for sure. Let the attacher fail if needed |  | ||||||
| 			return false |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false |  | ||||||
| 		for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { |  | ||||||
| 			if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { |  | ||||||
| 				return false |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		return true |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// we don't know if it's supported or not and let the attacher fail later in cases it's not supported |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Jordan Liggitt
					Jordan Liggitt