mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #110951 from 249043822/br-nestedPendingOperations
fix nestedPendingOperations mount and umount parallel bug -- minimal change
This commit is contained in:
		@@ -244,6 +244,7 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i
 | 
			
		||||
		return false, -1
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	opIndex := -1
 | 
			
		||||
	for previousOpIndex, previousOp := range grm.operations {
 | 
			
		||||
		volumeNameMatch := previousOp.key.volumeName == key.volumeName
 | 
			
		||||
 | 
			
		||||
@@ -251,16 +252,28 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i
 | 
			
		||||
			key.podName == EmptyUniquePodName ||
 | 
			
		||||
			previousOp.key.podName == key.podName
 | 
			
		||||
 | 
			
		||||
		podNameExactMatch := previousOp.key.podName == key.podName
 | 
			
		||||
 | 
			
		||||
		nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
 | 
			
		||||
			key.nodeName == EmptyNodeName ||
 | 
			
		||||
			previousOp.key.nodeName == key.nodeName
 | 
			
		||||
 | 
			
		||||
		nodeNameExactMatch := previousOp.key.nodeName == key.nodeName
 | 
			
		||||
 | 
			
		||||
		if volumeNameMatch && podNameMatch && nodeNameMatch {
 | 
			
		||||
			// nonExactMatch pending first
 | 
			
		||||
			if previousOp.operationPending {
 | 
			
		||||
				return true, previousOpIndex
 | 
			
		||||
			}
 | 
			
		||||
			// nonExactMatch with no pending, set opIndex to the first nonExactMatch
 | 
			
		||||
			// exactMatch can override opIndex to expected
 | 
			
		||||
			if opIndex == -1 || (podNameExactMatch && nodeNameExactMatch) {
 | 
			
		||||
				opIndex = previousOpIndex
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return opIndex != -1, opIndex
 | 
			
		||||
 | 
			
		||||
	return false, -1
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
 | 
			
		||||
 
 | 
			
		||||
@@ -879,3 +879,140 @@ func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error
 | 
			
		||||
		return fmt.Errorf("timeout after %v", timeout)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_NestedPendingOperations_OperationExists_PendingFirst(t *testing.T) {
 | 
			
		||||
	// Arrange
 | 
			
		||||
	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
 | 
			
		||||
	volumeName := v1.UniqueVolumeName("test-volume")
 | 
			
		||||
	podName1 := volumetypes.UniquePodName("pod1")
 | 
			
		||||
	podName2 := volumetypes.UniquePodName("pod2")
 | 
			
		||||
	podName3 := volumetypes.UniquePodName("pod3")
 | 
			
		||||
	podName4 := EmptyUniquePodName
 | 
			
		||||
	nodeName := EmptyNodeName
 | 
			
		||||
 | 
			
		||||
	// delay after an operation is signaled to finish to ensure it actually
 | 
			
		||||
	// finishes before running the next operation.
 | 
			
		||||
	delay := 50 * time.Millisecond
 | 
			
		||||
 | 
			
		||||
	// fake operation1 for pod1 failed
 | 
			
		||||
	operation1DoneCh := make(chan interface{})
 | 
			
		||||
	operation1 := generateWaitWithErrorFunc(operation1DoneCh)
 | 
			
		||||
	err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
 | 
			
		||||
	if err1 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// fake operation2 for pod2 fails
 | 
			
		||||
	operation2DoneCh := make(chan interface{})
 | 
			
		||||
	operation2 := generateWaitWithErrorFunc(operation2DoneCh)
 | 
			
		||||
	err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
 | 
			
		||||
	if err2 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// fake operation3 for pod3 pending
 | 
			
		||||
	operation3DoneCh := make(chan interface{})
 | 
			
		||||
	operation3 := generateWaitFunc(operation3DoneCh)
 | 
			
		||||
	defer func() {
 | 
			
		||||
		close(operation3DoneCh)
 | 
			
		||||
	}()
 | 
			
		||||
	err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
 | 
			
		||||
	if err3 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	operation1DoneCh <- true
 | 
			
		||||
	operation2DoneCh <- true
 | 
			
		||||
	time.Sleep(delay)
 | 
			
		||||
 | 
			
		||||
	// fake operation4 for EmptyUniquePodName should be rejected as operation3 is still pending
 | 
			
		||||
	operation4DoneCh := make(chan interface{})
 | 
			
		||||
	operation4 := generateWaitFunc(operation4DoneCh)
 | 
			
		||||
	defer func() {
 | 
			
		||||
		close(operation4DoneCh)
 | 
			
		||||
	}()
 | 
			
		||||
	err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
 | 
			
		||||
 | 
			
		||||
	// Assert
 | 
			
		||||
	if err4 == nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
 | 
			
		||||
	}
 | 
			
		||||
	if !IsAlreadyExists(err4) {
 | 
			
		||||
		t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err4)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Test_NestedPendingOperations_OperationExists_ExactMatchFirstNoPending(t *testing.T) {
 | 
			
		||||
	// Arrange
 | 
			
		||||
	grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
 | 
			
		||||
	volumeName := v1.UniqueVolumeName("test-volume")
 | 
			
		||||
	podName1 := volumetypes.UniquePodName("pod1")
 | 
			
		||||
	podName2 := volumetypes.UniquePodName("pod2")
 | 
			
		||||
	podName3 := volumetypes.UniquePodName("pod3")
 | 
			
		||||
	podName4 := EmptyUniquePodName
 | 
			
		||||
	nodeName := EmptyNodeName
 | 
			
		||||
 | 
			
		||||
	// delay after an operation is signaled to finish to ensure it actually
 | 
			
		||||
	// finishes before running the next operation.
 | 
			
		||||
	delay := 50 * time.Millisecond
 | 
			
		||||
	backoffDelay := 500 * time.Millisecond
 | 
			
		||||
 | 
			
		||||
	// fake operation1 for pod1 fails
 | 
			
		||||
	operation1DoneCh := make(chan interface{})
 | 
			
		||||
	operation1 := generateWaitWithErrorFunc(operation1DoneCh)
 | 
			
		||||
	err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
 | 
			
		||||
	if err1 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// fake operation2 for pod2 fails
 | 
			
		||||
	operation2DoneCh := make(chan interface{})
 | 
			
		||||
	operation2 := generateWaitWithErrorFunc(operation2DoneCh)
 | 
			
		||||
	err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
 | 
			
		||||
	if err2 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// fake operation3 for pod3 fails
 | 
			
		||||
	operation3DoneCh := make(chan interface{})
 | 
			
		||||
	operation3 := generateWaitWithErrorFunc(operation3DoneCh)
 | 
			
		||||
	err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
 | 
			
		||||
	if err3 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	operation1DoneCh <- true
 | 
			
		||||
	operation2DoneCh <- true
 | 
			
		||||
	operation3DoneCh <- true
 | 
			
		||||
	time.Sleep(delay)
 | 
			
		||||
 | 
			
		||||
	// fake operation4 with EmptyUniquePodName fails
 | 
			
		||||
	operation4DoneCh := make(chan interface{})
 | 
			
		||||
	operation4 := generateWaitWithErrorFunc(operation4DoneCh)
 | 
			
		||||
	err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
 | 
			
		||||
	if err4 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	operation4DoneCh <- true
 | 
			
		||||
 | 
			
		||||
	// operation for pod2 retry
 | 
			
		||||
	time.Sleep(backoffDelay)
 | 
			
		||||
	operation5 := noopFunc
 | 
			
		||||
	err5 := grm.Run(volumeName, podName2, nodeName, volumetypes.GeneratedOperations{OperationFunc: operation5, OperationName: "umount"})
 | 
			
		||||
	if err5 != nil {
 | 
			
		||||
		t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err5)
 | 
			
		||||
	}
 | 
			
		||||
	time.Sleep(delay)
 | 
			
		||||
 | 
			
		||||
	// Assert
 | 
			
		||||
	// Operation5 will override operation2, since we successfully finished unmount operation on pod2, it should be removed from operations array
 | 
			
		||||
	grm.(*nestedPendingOperations).lock.Lock()
 | 
			
		||||
	defer grm.(*nestedPendingOperations).lock.Unlock()
 | 
			
		||||
	for _, op := range grm.(*nestedPendingOperations).operations {
 | 
			
		||||
		if op.key.podName == podName2 {
 | 
			
		||||
			t.Errorf("NestedPendingOperations failed. Operation for pod2 should be removed")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user