Merge pull request #133321 from yliaog/assumecache

Fix dynamicresources_test flakiness
This commit is contained in:
Kubernetes Prow Robot
2025-08-09 18:03:44 -07:00
committed by GitHub
2 changed files with 74 additions and 11 deletions

View File

@@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/util/slice"
"k8s.io/utils/ptr"
)
@@ -76,6 +77,10 @@ const (
// BindingTimeoutDefaultSeconds is the default timeout for waiting for
// BindingConditions to be ready.
BindingTimeoutDefaultSeconds = 600
// AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting
// for the extended resource claim to be updated in assumed cache.
AssumeExtendedResourceTimeoutDefaultSeconds = 120
)
// The state is initialized in PreFilter phase. Because we save the pointer in
@@ -1446,8 +1451,28 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
if finalErr == nil {
// This can fail, but only for reasons that are okay (concurrent delete or update).
// Shouldn't happen in this case.
if isExtendedResourceClaim {
// Unlike other claims, extended resource claim is created in API server below.
// AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet.
// Hence we must poll and wait for it.
pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true,
func(ctx context.Context) (bool, error) {
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
if errors.Is(err, assumecache.ErrNotFound) {
return false, nil
}
logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err)
return false, err
}
return true, nil
})
if pollErr != nil {
logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr)
}
}
} else {
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
logger.V(5).Info("Claim not stored in assume cache", "err", finalErr)
logger.V(5).Info("Claim not stored in assume cache", "err", err)
}
}
for _, claimUID := range claimUIDs {
@@ -1576,6 +1601,9 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
// and no binding failure conditions are true,
// which includes the case that there are no binding conditions.
func (pl *DynamicResources) isClaimReadyForBinding(claim *resourceapi.ResourceClaim) (bool, error) {
if claim.Status.Allocation == nil {
return false, nil
}
for _, deviceRequest := range claim.Status.Allocation.Devices.Results {
if len(deviceRequest.BindingConditions) == 0 {
continue

View File

@@ -37,6 +37,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
cgotesting "k8s.io/client-go/testing"
@@ -1317,7 +1318,7 @@ func TestPlugin(t *testing.T) {
},
postfilter: result{
status: fwk.NewStatus(fwk.Unschedulable, `deletion of ResourceClaim completed`),
removed: []metav1.Object{extendedResourceClaimNode2},
removed: []metav1.Object{extendedResourceClaim},
},
},
},
@@ -1326,6 +1327,7 @@ func TestPlugin(t *testing.T) {
pod: podWithExtendedResourceName,
claims: []*resourceapi.ResourceClaim{extendedResourceClaimNode2},
classes: []*resourceapi.DeviceClass{deviceClassWithExtendResourceName},
objs: []apiruntime.Object{workerNodeSlice, podWithExtendedResourceName},
want: want{
filter: perNodeResult{
workerNode.Name: {
@@ -1827,10 +1829,6 @@ func TestPlugin(t *testing.T) {
initialObjects = testCtx.listAll(t)
testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
t.Run("unreserverAfterBindFailure", func(t *testing.T) {
// in case we delete the claim API object
// wait for assumed cache to sync with informer
// then assumed cache should be empty
time.Sleep(800 * time.Millisecond)
testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status)
})
} else if status.IsSuccess() {
@@ -1905,9 +1903,42 @@ func (tc *testContext) verify(t *testing.T, expected result, initialObjects []me
if expected.assumedClaim != nil {
expectAssumedClaims = append(expectAssumedClaims, expected.assumedClaim)
}
actualAssumedClaims := tc.listAssumedClaims()
if diff := cmp.Diff(expectAssumedClaims, actualAssumedClaims, ignoreFieldsInResourceClaims...); diff != "" {
t.Errorf("Assumed claims are different (- expected, + actual):\n%s", diff)
// actualAssumedClaims are claims in assumed cache with different latest and api object
// sameAssumedClaims are claims in assumed cache with same latest and api object
actualAssumedClaims, sameAssumedClaims := tc.listAssumedClaims()
// error when expecting no claims in assumed cache with different latest and api object
if len(expectAssumedClaims) == 0 && len(actualAssumedClaims) != 0 {
// In case we delete the claim API object, wait for assumed cache to sync with informer,
// then assumed cache should be empty.
err := wait.PollUntilContextTimeout(tc.ctx, 200*time.Millisecond, time.Minute, true,
func(ctx context.Context) (bool, error) {
actualAssumedClaims, sameAssumedClaims = tc.listAssumedClaims()
return len(actualAssumedClaims) == 0, nil
})
if err != nil || len(actualAssumedClaims) != 0 {
t.Errorf("Assumed claims are different, err=%v, expected: nil, actual:\n%v", err, actualAssumedClaims)
}
}
if len(expectAssumedClaims) > 0 {
// it is not an error as long as the expected claim is present in the assumed cache, no
// matter its latest and api object are different or not.
for _, expected := range expectAssumedClaims {
seen := false
for _, actual := range actualAssumedClaims {
if cmp.Equal(expected, actual, ignoreFieldsInResourceClaims...) {
seen = true
}
}
for _, same := range sameAssumedClaims {
if cmp.Equal(expected, same, ignoreFieldsInResourceClaims...) {
seen = true
}
}
if !seen {
t.Errorf("Assumed claims are different, expected: %v not found", expected)
}
}
}
var expectInFlightClaims []metav1.Object
@@ -1932,18 +1963,22 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
return
}
func (tc *testContext) listAssumedClaims() []metav1.Object {
func (tc *testContext) listAssumedClaims() ([]metav1.Object, []metav1.Object) {
var assumedClaims []metav1.Object
var sameClaims []metav1.Object
for _, obj := range tc.draManager.resourceClaimTracker.cache.List(nil) {
claim := obj.(*resourceapi.ResourceClaim)
obj, _ := tc.draManager.resourceClaimTracker.cache.Get(claim.Namespace + "/" + claim.Name)
apiObj, _ := tc.draManager.resourceClaimTracker.cache.GetAPIObj(claim.Namespace + "/" + claim.Name)
if obj != apiObj {
assumedClaims = append(assumedClaims, claim)
} else {
sameClaims = append(sameClaims, claim)
}
}
sortObjects(assumedClaims)
return assumedClaims
sortObjects(sameClaims)
return assumedClaims, sameClaims
}
func (tc *testContext) listInFlightClaims() []metav1.Object {