diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 3cd457a9ea9..df33e286ae0 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/util/slice" "k8s.io/utils/ptr" ) @@ -76,6 +77,10 @@ const ( // BindingTimeoutDefaultSeconds is the default timeout for waiting for // BindingConditions to be ready. BindingTimeoutDefaultSeconds = 600 + + // AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting + // for the extended resource claim to be updated in assumed cache. + AssumeExtendedResourceTimeoutDefaultSeconds = 120 ) // The state is initialized in PreFilter phase. Because we save the pointer in @@ -1446,8 +1451,28 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind if finalErr == nil { // This can fail, but only for reasons that are okay (concurrent delete or update). // Shouldn't happen in this case. + if isExtendedResourceClaim { + // Unlike other claims, extended resource claim is created in API server below. + // AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet. + // Hence we must poll and wait for it. + pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true, + func(ctx context.Context) (bool, error) { + if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { + if errors.Is(err, assumecache.ErrNotFound) { + return false, nil + } + logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err) + return false, err + } + return true, nil + }) + if pollErr != nil { + logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr) + } + } + } else { if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { - logger.V(5).Info("Claim not stored in assume cache", "err", finalErr) + logger.V(5).Info("Claim not stored in assume cache", "err", err) } } for _, claimUID := range claimUIDs { @@ -1576,6 +1601,9 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind // and no binding failure conditions are true, // which includes the case that there are no binding conditions. func (pl *DynamicResources) isClaimReadyForBinding(claim *resourceapi.ResourceClaim) (bool, error) { + if claim.Status.Allocation == nil { + return false, nil + } for _, deviceRequest := range claim.Status.Allocation.Devices.Results { if len(deviceRequest.BindingConditions) == 0 { continue diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index a86bbcdcef6..6dbf98e40ce 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -37,6 +37,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" cgotesting "k8s.io/client-go/testing" @@ -1317,7 +1318,7 @@ func TestPlugin(t *testing.T) { }, postfilter: result{ status: fwk.NewStatus(fwk.Unschedulable, `deletion of ResourceClaim completed`), - removed: []metav1.Object{extendedResourceClaimNode2}, + removed: []metav1.Object{extendedResourceClaim}, }, }, }, @@ -1326,6 +1327,7 @@ func TestPlugin(t *testing.T) { pod: podWithExtendedResourceName, claims: []*resourceapi.ResourceClaim{extendedResourceClaimNode2}, classes: []*resourceapi.DeviceClass{deviceClassWithExtendResourceName}, + objs: []apiruntime.Object{workerNodeSlice, podWithExtendedResourceName}, want: want{ filter: perNodeResult{ workerNode.Name: { @@ -1827,10 +1829,6 @@ func TestPlugin(t *testing.T) { initialObjects = testCtx.listAll(t) testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name) t.Run("unreserverAfterBindFailure", func(t *testing.T) { - // in case we delete the claim API object - // wait for assumed cache to sync with informer - // then assumed cache should be empty - time.Sleep(800 * time.Millisecond) testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status) }) } else if status.IsSuccess() { @@ -1905,9 +1903,42 @@ func (tc *testContext) verify(t *testing.T, expected result, initialObjects []me if expected.assumedClaim != nil { expectAssumedClaims = append(expectAssumedClaims, expected.assumedClaim) } - actualAssumedClaims := tc.listAssumedClaims() - if diff := cmp.Diff(expectAssumedClaims, actualAssumedClaims, ignoreFieldsInResourceClaims...); diff != "" { - t.Errorf("Assumed claims are different (- expected, + actual):\n%s", diff) + // actualAssumedClaims are claims in assumed cache with different latest and api object + // sameAssumedClaims are claims in assumed cache with same latest and api object + actualAssumedClaims, sameAssumedClaims := tc.listAssumedClaims() + + // error when expecting no claims in assumed cache with different latest and api object + if len(expectAssumedClaims) == 0 && len(actualAssumedClaims) != 0 { + // In case we delete the claim API object, wait for assumed cache to sync with informer, + // then assumed cache should be empty. + err := wait.PollUntilContextTimeout(tc.ctx, 200*time.Millisecond, time.Minute, true, + func(ctx context.Context) (bool, error) { + actualAssumedClaims, sameAssumedClaims = tc.listAssumedClaims() + return len(actualAssumedClaims) == 0, nil + }) + if err != nil || len(actualAssumedClaims) != 0 { + t.Errorf("Assumed claims are different, err=%v, expected: nil, actual:\n%v", err, actualAssumedClaims) + } + } + if len(expectAssumedClaims) > 0 { + // it is not an error as long as the expected claim is present in the assumed cache, no + // matter its latest and api object are different or not. + for _, expected := range expectAssumedClaims { + seen := false + for _, actual := range actualAssumedClaims { + if cmp.Equal(expected, actual, ignoreFieldsInResourceClaims...) { + seen = true + } + } + for _, same := range sameAssumedClaims { + if cmp.Equal(expected, same, ignoreFieldsInResourceClaims...) { + seen = true + } + } + if !seen { + t.Errorf("Assumed claims are different, expected: %v not found", expected) + } + } } var expectInFlightClaims []metav1.Object @@ -1932,18 +1963,22 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) { return } -func (tc *testContext) listAssumedClaims() []metav1.Object { +func (tc *testContext) listAssumedClaims() ([]metav1.Object, []metav1.Object) { var assumedClaims []metav1.Object + var sameClaims []metav1.Object for _, obj := range tc.draManager.resourceClaimTracker.cache.List(nil) { claim := obj.(*resourceapi.ResourceClaim) obj, _ := tc.draManager.resourceClaimTracker.cache.Get(claim.Namespace + "/" + claim.Name) apiObj, _ := tc.draManager.resourceClaimTracker.cache.GetAPIObj(claim.Namespace + "/" + claim.Name) if obj != apiObj { assumedClaims = append(assumedClaims, claim) + } else { + sameClaims = append(sameClaims, claim) } } sortObjects(assumedClaims) - return assumedClaims + sortObjects(sameClaims) + return assumedClaims, sameClaims } func (tc *testContext) listInFlightClaims() []metav1.Object {