KEP 5075: implement scheduler

Signed-off-by: Sunyanan Choochotkaew <sunyanan.choochotkaew1@ibm.com>
This commit is contained in:
Sunyanan Choochotkaew
2025-07-30 09:27:01 +09:00
parent 59bba92717
commit 7f052afaef
34 changed files with 2765 additions and 148 deletions

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
resourceinformers "k8s.io/client-go/informers/resource/v1"
resourcealphainformers "k8s.io/client-go/informers/resource/v1alpha3"
@@ -51,6 +52,7 @@ import (
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/devicetainteviction/metrics"
"k8s.io/kubernetes/pkg/controller/tainteviction"
"k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod"
)
@@ -458,11 +460,12 @@ func (tc *Controller) Run(ctx context.Context) error {
tc.haveSynced = append(tc.haveSynced, podHandler.HasSynced)
opts := resourceslicetracker.Options{
EnableDeviceTaints: true,
SliceInformer: tc.sliceInformer,
TaintInformer: tc.taintInformer,
ClassInformer: tc.classInformer,
KubeClient: tc.client,
EnableDeviceTaints: true,
EnableConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
SliceInformer: tc.sliceInformer,
TaintInformer: tc.taintInformer,
ClassInformer: tc.classInformer,
KubeClient: tc.client,
}
sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts)
if err != nil {

View File

@@ -101,6 +101,10 @@ func (r *resourceClaimTrackerContract) ListAllAllocatedDevices() (sets.Set[struc
return nil, nil
}
func (r *resourceClaimTrackerContract) GatherAllocatedState() (*structured.AllocatedState, error) {
return nil, nil
}
func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error {
return nil
}

View File

@@ -84,6 +84,9 @@ type ResourceClaimTracker interface {
// ListAllAllocatedDevices lists all allocated Devices from allocated ResourceClaims. The result is guaranteed to immediately include
// any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation().
ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error)
// GatherAllocatedState gathers information about allocated devices from allocated ResourceClaims. The result is guaranteed to immediately include
// any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation().
GatherAllocatedState() (*structured.AllocatedState, error)
// SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the
// binding phase. This change is immediately reflected in the result of List() and the other accessors.

View File

@@ -21,9 +21,11 @@ import (
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/ptr"
)
@@ -36,7 +38,11 @@ import (
// claims and are skipped without invoking the callback.
//
// foreachAllocatedDevice does nothing if the claim is not allocated.
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) {
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim,
dedicatedDeviceCallback func(deviceID structured.DeviceID),
enabledConsumableCapacity bool,
sharedDeviceCallback func(structured.SharedDeviceID),
consumedCapacityCallback func(structured.DeviceConsumedCapacity)) {
if claim.Status.Allocation == nil {
return
}
@@ -54,7 +60,24 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s
// None of the users of this helper need to abort iterating,
// therefore it's not supported as it only would add overhead.
cb(deviceID)
// Execute sharedDeviceCallback and consumedCapacityCallback correspondingly
// if DRAConsumableCapacity feature is enabled
if enabledConsumableCapacity {
shared := result.ShareID != nil
if shared {
sharedDeviceID := structured.MakeSharedDeviceID(deviceID, result.ShareID)
sharedDeviceCallback(sharedDeviceID)
if result.ConsumedCapacity != nil {
deviceConsumedCapacity := structured.NewDeviceConsumedCapacity(deviceID, result.ConsumedCapacity)
consumedCapacityCallback(deviceConsumedCapacity)
}
continue
}
}
// Otherwise, execute dedicatedDeviceCallback
dedicatedDeviceCallback(deviceID)
}
}
@@ -66,14 +89,20 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s
type allocatedDevices struct {
logger klog.Logger
mutex sync.RWMutex
ids sets.Set[structured.DeviceID]
mutex sync.RWMutex
ids sets.Set[structured.DeviceID]
shareIDs sets.Set[structured.SharedDeviceID]
capacities structured.ConsumedCapacityCollection
enabledConsumableCapacity bool
}
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
return &allocatedDevices{
logger: logger,
ids: sets.New[structured.DeviceID](),
logger: logger,
ids: sets.New[structured.DeviceID](),
shareIDs: sets.New[structured.SharedDeviceID](),
capacities: structured.NewConsumedCapacityCollection(),
enabledConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
}
}
@@ -84,6 +113,13 @@ func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
return a.ids.Clone()
}
func (a *allocatedDevices) Capacities() structured.ConsumedCapacityCollection {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.capacities.Clone()
}
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: a.onAdd,
@@ -142,16 +178,39 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
})
var shareIDs []structured.SharedDeviceID
var deviceCapacities []structured.DeviceConsumedCapacity
if a.enabledConsumableCapacity {
shareIDs = make([]structured.SharedDeviceID, 0, 20)
deviceCapacities = make([]structured.DeviceConsumedCapacity, 0, 20)
}
foreachAllocatedDevice(claim,
func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
},
a.enabledConsumableCapacity,
func(sharedDeviceID structured.SharedDeviceID) {
a.logger.V(6).Info("Observed shared device allocation", "shared device", sharedDeviceID, "claim", klog.KObj(claim))
shareIDs = append(shareIDs, sharedDeviceID)
},
func(capacity structured.DeviceConsumedCapacity) {
a.logger.V(6).Info("Observed consumed capacity", "device", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim))
deviceCapacities = append(deviceCapacities, capacity)
},
)
a.mutex.Lock()
defer a.mutex.Unlock()
for _, deviceID := range deviceIDs {
a.ids.Insert(deviceID)
}
for _, shareID := range shareIDs {
a.shareIDs.Insert(shareID)
}
for _, capacity := range deviceCapacities {
a.capacities.Insert(capacity)
}
}
func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
@@ -162,14 +221,35 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
})
var shareIDs []structured.SharedDeviceID
var deviceCapacities []structured.DeviceConsumedCapacity
if a.enabledConsumableCapacity {
shareIDs = make([]structured.SharedDeviceID, 0, 20)
deviceCapacities = make([]structured.DeviceConsumedCapacity, 0, 20)
}
foreachAllocatedDevice(claim,
func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
},
a.enabledConsumableCapacity,
func(sharedDeviceID structured.SharedDeviceID) {
a.logger.V(6).Info("Observed shared device deallocation", "shared device", sharedDeviceID, "claim", klog.KObj(claim))
shareIDs = append(shareIDs, sharedDeviceID)
},
func(capacity structured.DeviceConsumedCapacity) {
a.logger.V(6).Info("Observed consumed capacity release", "device id", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim))
deviceCapacities = append(deviceCapacities, capacity)
})
a.mutex.Lock()
defer a.mutex.Unlock()
for _, deviceID := range deviceIDs {
a.ids.Delete(deviceID)
}
for _, shareID := range shareIDs {
a.shareIDs.Delete(shareID)
}
for _, capacity := range deviceCapacities {
a.capacities.Remove(capacity)
}
}

View File

@@ -25,11 +25,13 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
resourcelisters "k8s.io/client-go/listers/resource/v1"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
@@ -212,13 +214,48 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID],
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
})
}, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {})
return true
})
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
return allocated, nil
}
func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error) {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := c.allocatedDevices.Get()
allocatedSharedDeviceIDs := sets.New[structured.SharedDeviceID]()
aggregatedCapacity := c.allocatedDevices.Capacities()
enabledConsumableCapacity := utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity)
// Whatever is in flight also has to be checked.
c.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
},
enabledConsumableCapacity,
func(sharedDeviceID structured.SharedDeviceID) {
c.logger.V(6).Info("Device is in flight for allocation", "shared device", sharedDeviceID, "claim", klog.KObj(claim))
allocatedSharedDeviceIDs.Insert(sharedDeviceID)
}, func(capacity structured.DeviceConsumedCapacity) {
c.logger.V(6).Info("Device is in flight for allocation", "consumed capacity", capacity, "claim", klog.KObj(claim))
aggregatedCapacity.Insert(capacity)
})
return true
})
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
return &structured.AllocatedState{
AllocatedDevices: allocated,
AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs,
AggregatedCapacity: aggregatedCapacity,
}, nil
}
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
return c.cache.Assume(claim)
}

View File

@@ -170,6 +170,7 @@ type DynamicResources struct {
enableExtendedResource bool
enableFilterTimeout bool
filterTimeout time.Duration
enableConsumableCapacity bool
fh framework.Handle
clientset kubernetes.Interface
@@ -201,6 +202,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
enablePartitionableDevices: fts.EnablePartitionableDevices,
enableExtendedResource: fts.EnableDRAExtendedResource,
enableConsumableCapacity: fts.EnableConsumableCapacity,
filterTimeout: ptr.Deref(args.FilterTimeout, metav1.Duration{}).Duration,
enableDeviceBindingConditions: fts.EnableDRADeviceBindingConditions,
enableDeviceStatus: fts.EnableDRAResourceClaimDeviceStatus,
@@ -210,7 +212,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
// This is a LRU cache for compiled CEL expressions. The most
// recent 10 of them get reused across different scheduling
// cycles.
celCache: cel.NewCache(10),
celCache: cel.NewCache(10, cel.Features{EnableConsumableCapacity: fts.EnableConsumableCapacity}),
draManager: fh.SharedDRAManager(),
}
@@ -658,9 +660,25 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
if err != nil {
return nil, statusError(logger, err)
var allocatedState *structured.AllocatedState
if pl.enableConsumableCapacity {
allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState()
if err != nil {
return nil, statusError(logger, err)
}
if allocatedState == nil {
return nil, statusError(logger, errors.New("nil allocated state"))
}
} else {
allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
if err != nil {
return nil, statusError(logger, err)
}
allocatedState = &structured.AllocatedState{
AllocatedDevices: allocatedDevices,
AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](),
AggregatedCapacity: structured.NewConsumedCapacityCollection(),
}
}
slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
if err != nil {
@@ -673,8 +691,9 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState,
DeviceTaints: pl.enableDeviceTaints,
DeviceBinding: pl.enableDeviceBindingConditions,
DeviceStatus: pl.enableDeviceStatus,
ConsumableCapacity: pl.enableConsumableCapacity,
}
allocator, err := structured.NewAllocator(ctx, features, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
allocator, err := structured.NewAllocator(ctx, features, *allocatedState, pl.draManager.DeviceClasses(), slices, pl.celCache)
if err != nil {
return nil, statusError(logger, err)
}

View File

@@ -326,9 +326,10 @@ func New(ctx context.Context,
resourceClaimInformer := informerFactory.Resource().V1().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
resourceSliceTrackerOpts := resourceslicetracker.Options{
EnableDeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
KubeClient: client,
EnableDeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
EnableConsumableCapacity: feature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
KubeClient: client,
}
// If device taints are disabled, the additional informers are not needed and
// the tracker turns into a simple wrapper around the slice informer.

View File

@@ -28,15 +28,19 @@ type Cache struct {
compileMutex keymutex.KeyMutex
cacheMutex sync.RWMutex
cache *lru.Cache
compiler *compiler
}
// NewCache creates a cache. The maximum number of entries determines
// how many entries are cached at most before dropping the oldest
// entry.
func NewCache(maxCacheEntries int) *Cache {
//
// The features are used to get a suitable compiler.
func NewCache(maxCacheEntries int, features Features) *Cache {
return &Cache{
compileMutex: keymutex.NewHashed(0),
cache: lru.New(maxCacheEntries),
compiler: GetCompiler(features),
}
}
@@ -57,7 +61,7 @@ func (c *Cache) GetOrCompile(expression string) CompilationResult {
return *cached
}
expr := GetCompiler().CompileCELExpression(expression, Options{DisableCostEstimation: true})
expr := c.compiler.CompileCELExpression(expression, Options{DisableCostEstimation: true})
if expr.Error == nil {
c.add(expression, &expr)
}
@@ -79,3 +83,22 @@ func (c *Cache) get(expression string) *CompilationResult {
}
return expr.(*CompilationResult)
}
func (c *Cache) Check(expression string) CompilationResult {
// Compiling a CEL expression is expensive enough that it is cheaper
// to lock a mutex than doing it several times in parallel.
c.compileMutex.LockKey(expression)
//nolint:errcheck // Only returns an error for unknown keys, which isn't the case here.
defer c.compileMutex.UnlockKey(expression)
cached := c.get(expression)
if cached != nil {
return *cached
}
expr := c.compiler.CompileCELExpression(expression, Options{DisableCostEstimation: true})
if expr.Error == nil {
c.add(expression, &expr)
}
return expr
}

View File

@@ -33,7 +33,7 @@ func TestCacheSemantic(t *testing.T) {
// compilation leads to different pointers, so the entries can be
// compared by value to figure out whether an entry was cached or
// compiled anew.
cache := NewCache(2)
cache := NewCache(2, Features{})
// Successful compilations get cached.
resultTrue := cache.GetOrCompile("true")
@@ -90,7 +90,7 @@ func TestCacheConcurrency(t *testing.T) {
// without benchmarking.
numWorkers := 10
cache := NewCache(2)
cache := NewCache(2, Features{})
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {

View File

@@ -46,13 +46,15 @@ import (
const (
deviceVar = "device"
driverVar = "driver"
multiAllocVar = "allowMultipleAllocations"
attributesVar = "attributes"
capacityVar = "capacity"
)
var (
lazyCompilerInit sync.Once
lazyCompiler *compiler
layzCompilerMutex sync.Mutex
lazyCompiler *compiler
lazyFeatures Features
// A variant of AnyType = https://github.com/kubernetes/kubernetes/blob/ec2e0de35a298363872897e5904501b029817af3/staging/src/k8s.io/apiserver/pkg/cel/types.go#L550:
// unknown actual type (could be bool, int, string, etc.) but with a known maximum size.
@@ -63,6 +65,11 @@ var (
idType = withMaxElements(apiservercel.StringType, resourceapi.DeviceMaxIDLength)
driverType = withMaxElements(apiservercel.StringType, resourceapi.DriverNameMaxLength)
// A variant of BoolType with a known cost. Usage of apiservercel.BoolType
// is underestimated without this (found when comparing estimated against
// actual cost in compile_test.go).
multiAllocType = withMaxElements(apiservercel.BoolType, 1)
// Each map is bound by the maximum number of different attributes.
innerAttributesMapType = apiservercel.NewMapType(idType, attributeType, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice)
outerAttributesMapType = apiservercel.NewMapType(domainType, innerAttributesMapType, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice)
@@ -72,10 +79,21 @@ var (
outerCapacityMapType = apiservercel.NewMapType(domainType, innerCapacityMapType, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice)
)
func GetCompiler() *compiler {
lazyCompilerInit.Do(func() {
lazyCompiler = newCompiler()
})
// Features contains feature gates supported by the package.
type Features struct {
EnableConsumableCapacity bool
}
func GetCompiler(features Features) *compiler {
layzCompilerMutex.Lock()
defer layzCompilerMutex.Unlock()
// In practice, features should not change back and forth between calls,
// so only one compiler gets cached.
if lazyCompiler == nil || lazyFeatures != features {
lazyCompiler = newCompiler(features)
lazyFeatures = features
}
return lazyCompiler
}
@@ -99,15 +117,17 @@ type Device struct {
// Driver gets used as domain for any attribute which does not already
// have a domain prefix. If set, then it is also made available as a
// string attribute.
Driver string
Attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute
Capacity map[resourceapi.QualifiedName]resourceapi.DeviceCapacity
Driver string
AllowMultipleAllocations *bool
Attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute
Capacity map[resourceapi.QualifiedName]resourceapi.DeviceCapacity
}
type compiler struct {
// deviceType is a definition for the type of the `device` variable.
// This is needed for the cost estimator. Both are currently version-independent.
// If that ever changes, some additional logic might be needed to make
// deviceType is a definition for the latest type of the `device` variable.
// This is needed for the cost estimator.
// If that ever changes such as involving type-checking expressions,
// some additional logic might be needed to make
// cost estimates version-dependent.
deviceType *apiservercel.DeclType
envset *environment.EnvSet
@@ -253,6 +273,7 @@ func (c CompilationResult) DeviceMatches(ctx context.Context, input Device) (boo
variables := map[string]any{
deviceVar: map[string]any{
driverVar: input.Driver,
multiAllocVar: ptr.Deref(input.AllowMultipleAllocations, false),
attributesVar: newStringInterfaceMapWithDefault(c.Environment.CELTypeAdapter(), attributes, c.emptyMapVal),
capacityVar: newStringInterfaceMapWithDefault(c.Environment.CELTypeAdapter(), capacity, c.emptyMapVal),
},
@@ -278,7 +299,7 @@ func (c CompilationResult) DeviceMatches(ctx context.Context, input Device) (boo
return resultBool, details, nil
}
func newCompiler() *compiler {
func newCompiler(features Features) *compiler {
envset := environment.MustBaseEnvSet(environment.DefaultCompatibilityVersion(), true /* strictCost */)
field := func(name string, declType *apiservercel.DeclType, required bool) *apiservercel.DeclField {
return apiservercel.NewDeclField(name, declType, required, nil, nil)
@@ -291,18 +312,22 @@ func newCompiler() *compiler {
return result
}
deviceType := apiservercel.NewObjectType("kubernetes.DRADevice", fields(
fieldsV131 := []*apiservercel.DeclField{
field(driverVar, driverType, true),
field(attributesVar, outerAttributesMapType, true),
field(capacityVar, outerCapacityMapType, true),
))
}
deviceTypeV131 := apiservercel.NewObjectType("kubernetes.DRADevice", fields(fieldsV131...))
// One additional field, feature-gated below.
fieldsV134ConsumableCapacity := []*apiservercel.DeclField{field(multiAllocVar, multiAllocType, true)}
fieldsV134ConsumableCapacity = append(fieldsV134ConsumableCapacity, fieldsV131...)
deviceTypeV134ConsumableCapacity := apiservercel.NewObjectType("kubernetes.DRADevice", fields(fieldsV134ConsumableCapacity...))
versioned := []environment.VersionedOptions{
{
IntroducedVersion: version.MajorMinor(1, 31),
EnvOptions: []cel.EnvOption{
cel.Variable(deviceVar, deviceType.CelType()),
// https://pkg.go.dev/github.com/google/cel-go/ext#Bindings
//
// This is useful to simplify attribute lookups because the
@@ -311,24 +336,31 @@ func newCompiler() *compiler {
// cel.bind(dra, device.attributes["dra.example.com"], dra.oneBool && dra.anotherBool)
ext.Bindings(ext.BindingsVersion(0)),
},
},
// deviceTypeV131 and deviceTypeV134ConsumableCapacity are complimentary and picked
// based on the feature gate.
{
IntroducedVersion: version.MajorMinor(1, 31),
FeatureEnabled: func() bool {
return !features.EnableConsumableCapacity
},
EnvOptions: []cel.EnvOption{
cel.Variable(deviceVar, deviceTypeV131.CelType()),
},
DeclTypes: []*apiservercel.DeclType{
deviceType,
deviceTypeV131,
},
},
{
IntroducedVersion: version.MajorMinor(1, 31),
// This library has added to base environment of Kubernetes
// in 1.33 at version 1. It will continue to be available for
// use in this environment, but does not need to be included
// directly since it becomes available indirectly via the base
// environment shared across Kubernetes.
// In Kubernetes 1.34, version 1 feature of this library will
// become available, and will be rollback safe to 1.33.
// TODO: In Kubernetes 1.34: Add compile tests that demonstrate that
// `isSemver("v1.0.0", true)` and `semver("v1.0.0", true)` are supported.
RemovedVersion: version.MajorMinor(1, 33),
IntroducedVersion: version.MajorMinor(1, 34),
FeatureEnabled: func() bool {
return features.EnableConsumableCapacity
},
EnvOptions: []cel.EnvOption{
library.SemverLib(library.SemverVersion(0)),
cel.Variable(deviceVar, deviceTypeV134ConsumableCapacity.CelType()),
},
DeclTypes: []*apiservercel.DeclType{
deviceTypeV134ConsumableCapacity,
},
},
}
@@ -336,7 +368,8 @@ func newCompiler() *compiler {
if err != nil {
panic(fmt.Errorf("internal error building CEL environment: %w", err))
}
return &compiler{envset: envset, deviceType: deviceType}
// return with newest deviceType
return &compiler{envset: envset, deviceType: deviceTypeV134ConsumableCapacity}
}
func withMaxElements(in *apiservercel.DeclType, maxElements uint64) *apiservercel.DeclType {

View File

@@ -26,18 +26,25 @@ import (
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apiserver/pkg/cel/environment"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
var testcases = map[string]struct {
expression string
driver string
attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute
capacity map[resourceapi.QualifiedName]resourceapi.DeviceCapacity
expectCompileError string
expectMatchError string
expectMatch bool
// environment.StoredExpressions is the default (= all CEL fields and features from the current version available).
// environment.NewExpressions can be used to enforce that only fields and features from the previous version are available.
envType *environment.Type
// The feature gate only has an effect in combination with environment.NewExpressions.
enableConsumableCapacity bool
expression string
driver string
allowMultipleAllocations *bool
attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute
capacity map[resourceapi.QualifiedName]resourceapi.DeviceCapacity
expectCompileError string
expectMatchError string
expectMatch bool
// There's no good way to verify that the cost of an expression
// really is what it should be other than eye-balling it. The
@@ -150,6 +157,14 @@ var testcases = map[string]struct {
expectMatch: true,
expectCost: 6,
},
"version_v": {
// Relaxed parsing with v prefix.
expression: `device.attributes["dra.example.com"].name.isGreaterThan(semver("v0.0.1", true)) && isSemver("v1.0.0", true)`,
attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{"name": {VersionValue: ptr.To("1.0.0")}},
driver: "dra.example.com",
expectMatch: true,
expectCost: 7,
},
"quantity": {
expression: `device.capacity["dra.example.com"].name.isGreaterThan(quantity("1Ki"))`,
capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{"name": {Value: resource.MustParse("1Mi")}},
@@ -272,13 +287,62 @@ device.attributes["dra.example.com"]["version"].isGreaterThan(semver("0.0.1"))
expectMatchError: "actual cost limit exceeded",
expectCost: 85555551, // Exceed limit!
},
"allow_multiple_allocations": {
enableConsumableCapacity: true,
expression: `device.allowMultipleAllocations == true`,
allowMultipleAllocations: ptr.To(true),
driver: "dra.example.com",
expectMatch: true,
expectCost: 3,
},
"allow_multiple_allocations_default": {
enableConsumableCapacity: true,
expression: `device.allowMultipleAllocations == false`,
allowMultipleAllocations: nil,
driver: "dra.example.com",
expectMatch: true,
expectCost: 3,
},
"allow_multiple_allocations_false": {
enableConsumableCapacity: true,
expression: `device.allowMultipleAllocations == false`,
allowMultipleAllocations: ptr.To(false),
driver: "dra.example.com",
expectMatch: true,
expectCost: 3,
},
"allow_multiple_allocations_new": {
enableConsumableCapacity: true,
envType: ptr.To(environment.NewExpressions),
expression: `device.allowMultipleAllocations == false`,
allowMultipleAllocations: ptr.To(false),
driver: "dra.example.com",
expectMatch: true,
expectCost: 3,
},
"allow_multiple_allocations_enabled": {
envType: ptr.To(environment.NewExpressions),
enableConsumableCapacity: true,
expression: `device.allowMultipleAllocations == false`,
allowMultipleAllocations: ptr.To(false),
driver: "dra.example.com",
expectMatch: true,
expectCost: 3,
},
"allow_multiple_allocations_disabled": {
envType: ptr.To(environment.NewExpressions),
enableConsumableCapacity: false,
expression: `device.allowMultipleAllocations == false`,
driver: "dra.example.com",
expectCompileError: `undefined field 'allowMultipleAllocations'`,
},
}
func TestCEL(t *testing.T) {
for name, scenario := range testcases {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
result := GetCompiler().CompileCELExpression(scenario.expression, Options{})
result := GetCompiler(Features{EnableConsumableCapacity: scenario.enableConsumableCapacity}).CompileCELExpression(scenario.expression, Options{EnvType: scenario.envType})
if scenario.expectCompileError != "" && result.Error == nil {
t.Fatalf("FAILURE: expected compile error %q, got none", scenario.expectCompileError)
}
@@ -298,7 +362,9 @@ func TestCEL(t *testing.T) {
t.Errorf("ERROR: expected CEL cost %d, got %d instead (%.0f%% of limit %d)", expect, actual, float64(actual)*100.0/float64(resourceapi.CELSelectorExpressionMaxCost), resourceapi.CELSelectorExpressionMaxCost)
}
match, details, err := result.DeviceMatches(ctx, Device{Attributes: scenario.attributes, Capacity: scenario.capacity, Driver: scenario.driver})
match, details, err := result.DeviceMatches(ctx, Device{
AllowMultipleAllocations: scenario.allowMultipleAllocations, Attributes: scenario.attributes, Capacity: scenario.capacity, Driver: scenario.driver,
})
// details.ActualCost can be called for nil details, no need to check.
actualCost := ptr.Deref(details.ActualCost(), 0)
if scenario.expectCost > 0 {
@@ -335,7 +401,7 @@ func TestInterrupt(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Adapted from https://github.com/kubernetes/kubernetes/blob/e0859f91b7d269bb7e2f43e23d202ccccaf34c0c/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/schema/cel/validation_test.go#L3006
expression := `device.attributes["dra.example.com"].map(key, device.attributes["dra.example.com"][key] * 20).filter(e, e > 50).exists(e, e == 60)`
result := GetCompiler().CompileCELExpression(expression, Options{})
result := GetCompiler(Features{}).CompileCELExpression(expression, Options{})
if result.Error != nil {
t.Fatalf("unexpected compile error: %v", result.Error)
}
@@ -382,7 +448,7 @@ func BenchmarkDeviceMatches(b *testing.B) {
}
b.Run(name, func(b *testing.B) {
_, ctx := ktesting.NewTestContext(b)
result := GetCompiler().CompileCELExpression(scenario.expression, Options{})
result := GetCompiler(Features{}).CompileCELExpression(scenario.expression, Options{})
if result.Error != nil {
b.Fatalf("unexpected compile error: %s", result.Error.Error())
}
@@ -393,7 +459,9 @@ func BenchmarkDeviceMatches(b *testing.B) {
// here also includes additional preparations
// in result.DeviceMatches and thus cannot be
// used.
match, _, err := result.DeviceMatches(ctx, Device{Attributes: scenario.attributes, Capacity: scenario.capacity, Driver: scenario.driver})
match, _, err := result.DeviceMatches(ctx, Device{
AllowMultipleAllocations: scenario.allowMultipleAllocations, Attributes: scenario.attributes, Capacity: scenario.capacity, Driver: scenario.driver,
})
if err != nil {
if scenario.expectMatchError == "" {
b.Fatalf("unexpected evaluation error: %v", err)

View File

@@ -0,0 +1,42 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
// Define functions and global variables to set FeatureEnabled field in VersionedOptions
// within this package (compile.go).
import "sync/atomic"
var (
consumableCapacity atomic.Value
)
func SetDRAConsumableCapacity() {
consumableCapacity.Store(true)
}
func UnsetDRAConsumableCapacity() {
consumableCapacity.Store(false)
}
func DRAConsumableCapacity() bool {
value := consumableCapacity.Load()
if value == nil {
return false
}
return value.(bool)
}

View File

@@ -0,0 +1,35 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"testing"
)
func TestCelEmulateFeatureFate(t *testing.T) {
if DRAConsumableCapacity() {
t.Fatal("emulated value must start from false")
}
SetDRAConsumableCapacity()
if !DRAConsumableCapacity() {
t.Fatal("emulated value must be set")
}
UnsetDRAConsumableCapacity()
if DRAConsumableCapacity() {
t.Fatal("emulated value must be unset")
}
}

View File

@@ -295,6 +295,14 @@ func (err *DroppedFieldsError) DisabledFeatures() []string {
}
}
// Dropped fields for consumable capacity can be detected with allowMultipleAllocations flag without looking at individual device capacity.
for i := 0; i < len(err.DesiredSlice.Spec.Devices) && i < len(err.ActualSlice.Spec.Devices); i++ {
if err.DesiredSlice.Spec.Devices[i].AllowMultipleAllocations != nil && err.ActualSlice.Spec.Devices[i].AllowMultipleAllocations == nil {
disabled = append(disabled, "DRAConsumableCapacity")
break
}
}
return disabled
}

View File

@@ -318,6 +318,44 @@ func TestControllerSyncPool(t *testing.T) {
},
expectedError: `update ResourceSlice: pool "pool", slice #0: some fields were dropped by the apiserver, probably because these features are disabled: DRADeviceTaints`,
},
"drop-consumable-capacity-field": {
features: features{disableConsumableCapacity: true},
nodeUID: nodeUID,
initialObjects: []runtime.Object{
MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
Driver(driverName).
Devices([]resourceapi.Device{newDevice(deviceName)}).
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
Obj(),
},
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Generation: 1,
Slices: []Slice{{Devices: []resourceapi.Device{
newDevice(
deviceName,
allowMultipleAllocationsField(true),
),
}}},
},
},
},
expectedStats: Stats{
NumUpdates: 1,
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
ResourceVersion("1").
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
Driver(driverName).
Devices([]resourceapi.Device{newDevice(deviceName)}).
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
Obj(),
},
expectedError: `update ResourceSlice: pool "pool", slice #0: some fields were dropped by the apiserver, probably because these features are disabled: DRAConsumableCapacity`,
},
"remove-pool": {
nodeUID: nodeUID,
syncDelay: ptr.To(time.Duration(0)), // Ensure that the initial object causes an immediate sync of the pool.
@@ -1120,6 +1158,7 @@ type features struct {
disableBindingConditions bool
disableDeviceTaints bool
disablePartitionableDevices bool
disableConsumableCapacity bool
}
func createTestClient(features features, timeAdded metav1.Time, objects ...runtime.Object) *fake.Clientset {
@@ -1191,6 +1230,11 @@ func dropDisabledFields(features features, resourceslice *resourceapi.ResourceSl
resourceslice.Spec.Devices[i].BindsToNode = nil
}
}
if features.disableConsumableCapacity {
for i := range resourceslice.Spec.Devices {
resourceslice.Spec.Devices[i].AllowMultipleAllocations = nil
}
}
}
func addTimeAdded(timeAdded metav1.Time, resourceslice *resourceapi.ResourceSlice) {
@@ -1333,6 +1377,7 @@ func (r *ResourceSliceWrapper) SharedCounters(counters []resourceapi.CounterSet)
}
type nodeNameField string
type allowMultipleAllocationsField bool
func newDevice(name string, fields ...any) resourceapi.Device {
device := resourceapi.Device{
@@ -1352,6 +1397,8 @@ func newDevice(name string, fields ...any) resourceapi.Device {
device.ConsumesCounters = append(device.ConsumesCounters, f...)
case nodeNameField:
device.NodeName = ptr.To(string(f))
case allowMultipleAllocationsField:
device.AllowMultipleAllocations = ptr.To(bool(f))
default:
panic(fmt.Sprintf("unsupported resourceapi.Device field type %T", field))
}

View File

@@ -109,6 +109,8 @@ type Options struct {
// a thin wrapper around the underlying
// SliceInformer, with no processing of its own.
EnableDeviceTaints bool
// EnableConsumableCapacity defines whether the CEL compiler supports the DRAConsumableCapacity feature.
EnableConsumableCapacity bool
SliceInformer resourceinformers.ResourceSliceInformer
TaintInformer resourcealphainformers.DeviceTaintRuleInformer
@@ -153,7 +155,7 @@ func newTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr er
resourceSlices: opts.SliceInformer.Informer(),
deviceTaints: opts.TaintInformer.Informer(),
deviceClasses: opts.ClassInformer.Informer(),
celCache: cel.NewCache(10),
celCache: cel.NewCache(10, cel.Features{EnableConsumableCapacity: opts.EnableConsumableCapacity}),
patchedResourceSlices: cache.NewStore(cache.MetaNamespaceKeyFunc),
handleError: utilruntime.HandleErrorWithContext,
eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}),

View File

@@ -22,7 +22,8 @@ import (
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured/internal"
"k8s.io/dynamic-resource-allocation/structured/internal/experimental"
@@ -43,6 +44,26 @@ func MakeDeviceID(driver, pool, device string) DeviceID {
return internal.MakeDeviceID(driver, pool, device)
}
// types_experimental
type AllocatedState = internal.AllocatedState
type SharedDeviceID = internal.SharedDeviceID
type DeviceConsumedCapacity = internal.DeviceConsumedCapacity
type ConsumedCapacityCollection = internal.ConsumedCapacityCollection
type ConsumedCapacity = internal.ConsumedCapacity
func MakeSharedDeviceID(deviceID DeviceID, shareID *types.UID) SharedDeviceID {
return internal.MakeSharedDeviceID(deviceID, shareID)
}
func NewConsumedCapacityCollection() ConsumedCapacityCollection {
return internal.NewConsumedCapacityCollection()
}
func NewDeviceConsumedCapacity(deviceID DeviceID,
consumedCapacity map[resourceapi.QualifiedName]resource.Quantity) DeviceConsumedCapacity {
return internal.NewDeviceConsumedCapacity(deviceID, consumedCapacity)
}
// Allocator calculates how to allocate a set of unallocated claims which use
// structured parameters.
//
@@ -86,7 +107,7 @@ type Allocator interface {
// The returned Allocator can be used multiple times and is thread-safe.
func NewAllocator(ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
@@ -118,7 +139,7 @@ func NewAllocator(ctx context.Context,
// All required features supported?
if allocator.supportedFeatures.Set().IsSuperset(features.Set()) {
// Use it!
return allocator.newAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return allocator.newAllocator(ctx, features, allocatedState, classLister, slices, celCache)
}
}
return nil, fmt.Errorf("internal error: no allocator available for feature set %v", features)
@@ -128,7 +149,7 @@ var availableAllocators = []struct {
supportedFeatures Features
newAllocator func(ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
@@ -139,36 +160,36 @@ var availableAllocators = []struct {
supportedFeatures: stable.SupportedFeatures,
newAllocator: func(ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (Allocator, error) {
return stable.NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return stable.NewAllocator(ctx, features, allocatedState.AllocatedDevices, classLister, slices, celCache)
},
},
{
supportedFeatures: incubating.SupportedFeatures,
newAllocator: func(ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (Allocator, error) {
return incubating.NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return incubating.NewAllocator(ctx, features, allocatedState.AllocatedDevices, classLister, slices, celCache)
},
},
{
supportedFeatures: experimental.SupportedFeatures,
newAllocator: func(ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocateState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (Allocator, error) {
return experimental.NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return experimental.NewAllocator(ctx, features, allocateState, classLister, slices, celCache)
},
},
}

View File

@@ -21,7 +21,6 @@ import (
"testing"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured/internal"
"k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting"
@@ -33,11 +32,11 @@ func TestAllocator(t *testing.T) {
func(
ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (allocatortesting.Allocator, error) {
return NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return NewAllocator(ctx, features, allocatedState, classLister, slices, celCache)
})
}

View File

@@ -28,6 +28,8 @@ import (
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel"
@@ -46,6 +48,30 @@ func MakeDeviceID(driver, pool, device string) DeviceID {
return internal.MakeDeviceID(driver, pool, device)
}
// types_experimental
type SharedDeviceID = internal.SharedDeviceID
type DeviceConsumedCapacity = internal.DeviceConsumedCapacity
type ConsumedCapacityCollection = internal.ConsumedCapacityCollection
type ConsumedCapacity = internal.ConsumedCapacity
type AllocatedState = internal.AllocatedState
func GenerateNewShareID() *types.UID {
return internal.GenerateShareID()
}
func NewConsumedCapacity() ConsumedCapacity {
return internal.NewConsumedCapacity()
}
func NewDeviceConsumedCapacity(deviceID DeviceID,
consumedCapacity map[resourceapi.QualifiedName]resource.Quantity) DeviceConsumedCapacity {
return internal.NewDeviceConsumedCapacity(deviceID, consumedCapacity)
}
func NewConsumedCapacityCollection() ConsumedCapacityCollection {
return internal.NewConsumedCapacityCollection()
}
// SupportedFeatures includes all additional features,
// making this the variant that is used when any of those
// are enabled.
@@ -56,12 +82,13 @@ var SupportedFeatures = internal.Features{
DeviceTaints: true,
DeviceBinding: true,
DeviceStatus: true,
ConsumableCapacity: true,
}
type Allocator struct {
features Features
claimsToAllocate []*resourceapi.ResourceClaim
allocatedDevices sets.Set[DeviceID]
allocatedState AllocatedState
classLister DeviceClassLister
slices []*resourceapi.ResourceSlice
celCache *cel.Cache
@@ -91,14 +118,14 @@ var _ internal.AllocatorExtended = &Allocator{}
// The returned Allocator can be used multiple times and is thread-safe.
func NewAllocator(ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (*Allocator, error) {
return &Allocator{
features: features,
allocatedDevices: allocatedDevices,
allocatedState: allocatedState,
classLister: classLister,
slices: slices,
celCache: celCache,
@@ -117,11 +144,13 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node, claims []*resou
consumedCounters: make(map[string]counterSets),
requestData: make(map[requestIndices]requestData),
result: make([]internalAllocationResult, len(claims)),
allocatingCapacity: NewConsumedCapacityCollection(),
}
alloc.claimsToAllocate = claims
alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate))
defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr)
alloc.logger.V(5).Info("Gathering pools", "slices", alloc.slices)
// First determine all eligible pools.
pools, err := GatherPools(ctx, alloc.slices, node, a.features)
if err != nil {
@@ -175,6 +204,25 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node, claims []*resou
return nil, fmt.Errorf("claim %s, request %s: has subrequests, but the DRAPrioritizedList feature is disabled", klog.KObj(claim), request.Name)
}
// Error out if the consumableCapacity feature is not enabled
// and the request contains capacity requests.
if !a.features.ConsumableCapacity {
containsCapacityRequest := false
if request.Exactly != nil && request.Exactly.Capacity != nil {
containsCapacityRequest = true
}
for _, request := range request.FirstAvailable {
if request.Capacity != nil {
containsCapacityRequest = true
break
}
}
if containsCapacityRequest {
return nil, fmt.Errorf("claim %s, request %s: has capacity requests, but the DRAConsumableCapacity feature is disabled",
klog.KObj(claim), request.Name)
}
}
if hasSubRequests {
// We need to find the minimum number of devices that can be allocated
// for the request, so setting this to a high number so we can do the
@@ -185,6 +233,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node, claims []*resou
// We can only predict a lower number of devices because it depends on which
// subrequest gets chosen.
for i, subReq := range request.FirstAvailable {
// Error out if the consumableCapacity feature is not enabled
// and the subrequest contains capacity requests.
if !a.features.ConsumableCapacity && subReq.Capacity != nil {
return nil, fmt.Errorf("claim %s, subrequest %s: has capacity requests, but the DRAConsumableCapacity feature is disabled",
klog.KObj(claim), subReq.Name)
}
reqData, err := alloc.validateDeviceRequest(&deviceSubRequestAccessor{subRequest: &subReq},
&exactDeviceRequestAccessor{request: request}, requestKey, pools)
if err != nil {
@@ -237,6 +291,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node, claims []*resou
attributeName: matchAttribute,
}
constraints[i] = m
case constraint.DistinctAttribute != nil:
distinctAttribute := draapi.FullyQualifiedName(*constraint.DistinctAttribute)
logger := alloc.logger
if loggerV := alloc.logger.V(6); loggerV.Enabled() {
logger = klog.LoggerWithName(logger, "distinctAttributeConstraint")
logger = klog.LoggerWithValues(logger, "distinctAttribute", distinctAttribute)
}
m := &distinctAttributeConstraint{
logger: logger,
requestNames: sets.New(constraint.Requests...),
attributeName: distinctAttribute,
attributes: make(map[string]draapi.DeviceAttribute),
}
constraints[i] = m
default:
// Unknown constraint type!
return nil, fmt.Errorf("claim %s, constraint #%d: empty constraint (unsupported constraint type?)", klog.KObj(claim), i)
@@ -256,7 +324,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node, claims []*resou
// We can estimate the size based on what we need to allocate.
alloc.allocatingDevices = make(map[DeviceID]sets.Set[int], minDevicesTotal)
alloc.logger.V(6).Info("Gathered information about devices", "numAllocated", len(alloc.allocatedDevices), "minDevicesToBeAllocated", minDevicesTotal)
alloc.logger.V(6).Info("Gathered information about devices", "numAllocated", len(alloc.allocatedState.AllocatedDevices), "minDevicesToBeAllocated", minDevicesTotal)
// In practice, there aren't going to be many different CEL
// expressions. Most likely, there is going to be handful of different
@@ -288,12 +356,28 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node, claims []*resou
allocationResult := &result[claimIndex]
allocationResult.Devices.Results = make([]resourceapi.DeviceRequestAllocationResult, len(internalResult.devices))
for i, internal := range internalResult.devices {
var consumedCapacity map[resourceapi.QualifiedName]resource.Quantity
if internal.consumedCapacity != nil {
consumedCapacity = make(map[resourceapi.QualifiedName]resource.Quantity, len(internal.consumedCapacity))
for key, val := range internal.consumedCapacity {
consumedCapacity[key] = val.DeepCopy()
}
}
allocationResult.Devices.Results[i] = resourceapi.DeviceRequestAllocationResult{
Request: internal.requestName(),
Driver: internal.id.Driver.String(),
Pool: internal.id.Pool.String(),
Device: internal.id.Device.String(),
AdminAccess: internal.adminAccess,
Request: internal.requestName(),
Driver: internal.id.Driver.String(),
Pool: internal.id.Pool.String(),
Device: internal.id.Device.String(),
AdminAccess: internal.adminAccess,
ShareID: internal.shareID,
ConsumedCapacity: consumedCapacity,
}
// Performance optimization: skip the for loop if the feature is off.
// Not needed for correctness because if the feature is off, the selected
// device should not have binding conditions.
if a.features.DeviceBinding {
allocationResult.Devices.Results[i].BindingConditions = internal.BindingConditions
allocationResult.Devices.Results[i].BindingFailureConditions = internal.BindingFailureConditions
}
// Performance optimization: skip the for loop if the feature is off.
// Not needed for correctness because if the feature is off, the selected
@@ -449,6 +533,19 @@ func (alloc *allocator) validateDeviceRequest(request requestAccessor, parentReq
Device: &slice.Spec.Devices[deviceIndex],
slice: slice,
}
if alloc.features.ConsumableCapacity {
// Next validate whether resource request over capacity
success, err := alloc.CmpRequestOverCapacity(requestData.request, slice, deviceIndex)
if err != nil {
alloc.logger.V(7).Info("Skip comparing device capacity request",
"device", device, "request", requestData.request.name(), "err", err)
continue
}
if !success {
alloc.logger.V(7).Info("Device capacity not enough", "device", device)
continue
}
}
requestData.allDevices = append(requestData.allDevices, device)
}
}
@@ -494,7 +591,12 @@ type allocator struct {
// be allocated.
// Claims are identified by their index in claimsToAllocate.
allocatingDevices map[DeviceID]sets.Set[int]
result []internalAllocationResult
// allocatingCapacity tracks the amount of device capacity that will be newly allocated
// for a particular attempt to find a solution.
// The map is indexed by device ID, and each value represents the accumulated capacity
// requested by all allocations targeting that device.
allocatingCapacity ConsumedCapacityCollection
result []internalAllocationResult
}
// counterSets is a map with the name of counter sets to the counters in
@@ -556,11 +658,13 @@ type internalAllocationResult struct {
type internalDeviceResult struct {
*draapi.Device
request string // name of the request (if no subrequests) or the subrequest
parentRequest string // name of the request which contains the subrequest, empty otherwise
id DeviceID
slice *draapi.ResourceSlice
adminAccess *bool
request string // name of the request (if no subrequests) or the subrequest
parentRequest string // name of the request which contains the subrequest, empty otherwise
id DeviceID
shareID *types.UID
slice *draapi.ResourceSlice
consumedCapacity map[resourceapi.QualifiedName]resource.Quantity
adminAccess *bool
}
func (i internalDeviceResult) requestName() string {
@@ -831,7 +935,11 @@ func (alloc *allocator) allocateOne(r deviceIndices, allocateSubRequest bool) (b
return false, errAllocationResultMaxSizeExceeded
}
alloc.logger.V(6).Info("Allocating one device", "currentClaim", r.claimIndex, "totalClaims", len(alloc.claimsToAllocate), "currentRequest", r.requestIndex, "currentSubRequest", r.subRequestIndex, "totalRequestsPerClaim", len(claim.Spec.Devices.Requests), "currentDevice", r.deviceIndex, "devicesPerRequest", requestData.numDevices, "allDevices", doAllDevices, "adminAccess", request.adminAccess())
alloc.logger.V(6).Info("Allocating one device", "currentClaim", r.claimIndex,
"totalClaims", len(alloc.claimsToAllocate), "currentRequest", r.requestIndex,
"currentSubRequest", r.subRequestIndex, "totalRequestsPerClaim", len(claim.Spec.Devices.Requests),
"currentDevice", r.deviceIndex, "devicesPerRequest", requestData.numDevices, "allDevices", doAllDevices, "adminAccess", request.adminAccess(), "capacities", request.capacities())
if doAllDevices {
// For "all" devices we already know which ones we need. We
// just need to check whether we can use them.
@@ -889,6 +997,19 @@ func (alloc *allocator) allocateOne(r deviceIndices, allocateSubRequest bool) (b
alloc.logger.V(7).Info("Device not selectable", "device", deviceID)
continue
}
if alloc.features.ConsumableCapacity {
// Next validate whether resource request over capacity
success, err := alloc.CmpRequestOverCapacity(requestData.request, slice, deviceIndex)
if err != nil {
alloc.logger.V(7).Info("Skip comparing device capacity request",
"device", deviceID, "request", requestData.request.name(), "err", err)
continue
}
if !success {
alloc.logger.V(7).Info("Device capacity not enough", "device", deviceID)
continue
}
}
// Finally treat as allocated and move on to the next device.
device := deviceWithID{
@@ -989,6 +1110,19 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData,
}
// CmpRequestOverCapacity checks whether a device with remaining resources is consumable by the request.
// Return true if success.
func (alloc *allocator) CmpRequestOverCapacity(request requestAccessor, slice *draapi.ResourceSlice, deviceIndex int) (bool, error) {
deviceID := DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}
allocatingCapacity := alloc.allocatingCapacity[deviceID]
allowMultipleAllocations := slice.Spec.Devices[deviceIndex].AllowMultipleAllocations
capacities := slice.Spec.Devices[deviceIndex].Capacity
if allocatedCapacity, found := alloc.allocatedState.AggregatedCapacity[deviceID]; found {
return CmpRequestOverCapacity(allocatedCapacity, request.capacities(), allowMultipleAllocations, capacities, allocatingCapacity)
}
return CmpRequestOverCapacity(NewConsumedCapacity(), request.capacities(), allowMultipleAllocations, capacities, allocatingCapacity)
}
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
for i, selector := range selectors {
expr := alloc.celCache.GetOrCompile(selector.CEL.Expression)
@@ -1010,7 +1144,7 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device,
if err := draapi.Convert_api_Device_To_v1_Device(device, &d, nil); err != nil {
return false, fmt.Errorf("convert Device: %w", err)
}
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity})
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), AllowMultipleAllocations: d.AllowMultipleAllocations, Attributes: d.Attributes, Capacity: d.Capacity})
if class != nil {
alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
} else {
@@ -1048,7 +1182,11 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
requestKey := requestIndices{claimIndex: r.claimIndex, requestIndex: r.requestIndex, subRequestIndex: r.subRequestIndex}
requestData := alloc.requestData[requestKey]
request := requestData.request
if request.adminAccess() && alloc.allocatingDeviceForClaim(device.id, r.claimIndex) {
allowMultipleAllocations := false
if alloc.features.ConsumableCapacity {
allowMultipleAllocations = device.AllowMultipleAllocations != nil && *device.AllowMultipleAllocations
}
if !allowMultipleAllocations && request.adminAccess() && alloc.allocatingDeviceForClaim(device.id, r.claimIndex) {
alloc.logger.V(7).Info("Device in use in same claim", "device", device.id)
return false, nil, nil
}
@@ -1113,14 +1251,50 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
}
}
// All constraints satisfied. Mark as in use (unless we do admin access)
// All constraints satisfied. Mark as in use (unless we do admin access or allow multiple allocations)
// and record the result.
alloc.logger.V(7).Info("Device allocated", "device", device.id)
if alloc.allocatingDevices[device.id] == nil {
alloc.allocatingDevices[device.id] = make(sets.Set[int])
}
alloc.allocatingDevices[device.id].Insert(r.claimIndex)
if !allowMultipleAllocations {
alloc.allocatingDevices[device.id].Insert(r.claimIndex)
}
consumedCapacity := make(map[resourceapi.QualifiedName]resource.Quantity, 0)
var shareID *types.UID
if alloc.features.ConsumableCapacity {
// Validate whether resource request over capacity
success, err := alloc.CmpRequestOverCapacity(requestData.request, device.slice, r.deviceIndex)
if err != nil {
alloc.logger.V(7).Info("Failed to compare device capacity request",
"device", device, "request", requestData.request.name(), "err", err)
return false, nil, nil
}
if !success {
alloc.logger.V(7).Info("Device capacity not enough", "device", device)
return false, nil, nil
}
if allowMultipleAllocations {
convertedCapacities := make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity)
for key, value := range device.Capacity {
var convertedCapacity resourceapi.DeviceCapacity
err := draapi.Convert_api_DeviceCapacity_To_v1_DeviceCapacity(&value, &convertedCapacity, nil)
if err != nil {
return false, nil, fmt.Errorf("convert DeviceCapacity: %w", err)
}
convertedCapacities[resourceapi.QualifiedName(key)] = convertedCapacity
}
consumedCapacity = GetConsumedCapacityFromRequest(request.capacities(), convertedCapacities)
shareID = GenerateNewShareID()
alloc.logger.V(7).Info("Device capacity allocated", "device", device.id,
"converted capacity", klog.Format(convertedCapacities),
"consumed capacity", klog.Format(consumedCapacity))
alloc.allocatingCapacity.Insert(NewDeviceConsumedCapacity(device.id, consumedCapacity))
}
}
result := internalDeviceResult{
request: request.name(),
@@ -1128,10 +1302,14 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
id: device.id,
Device: device.Device,
slice: device.slice,
shareID: shareID,
}
if request.adminAccess() {
result.adminAccess = ptr.To(request.adminAccess())
}
if len(consumedCapacity) > 0 {
result.consumedCapacity = consumedCapacity
}
previousNumResults := len(alloc.result[r.claimIndex].devices)
alloc.result[r.claimIndex].devices = append(alloc.result[r.claimIndex].devices, result)
@@ -1140,8 +1318,16 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
constraint.remove(baseRequestName, subRequestName, device.Device, device.id)
}
alloc.allocatingDevices[device.id].Delete(r.claimIndex)
if alloc.features.PartitionableDevices && len(device.ConsumesCounters) > 0 {
alloc.deallocateCountersForDevice(device)
if allowMultipleAllocations {
requestedResource := alloc.result[r.claimIndex].devices[previousNumResults].consumedCapacity
if requestedResource != nil {
alloc.allocatingCapacity.Remove(NewDeviceConsumedCapacity(device.id, requestedResource))
}
} else {
alloc.allocatingDevices[device.id].Delete(r.claimIndex)
if alloc.features.PartitionableDevices && len(device.ConsumesCounters) > 0 {
alloc.deallocateCountersForDevice(device)
}
}
// Truncate, but keep the underlying slice.
alloc.result[r.claimIndex].devices = alloc.result[r.claimIndex].devices[:previousNumResults]
@@ -1207,7 +1393,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error
}
// Devices that aren't allocated doesn't consume any counters, so we don't
// need to consider them.
if !alloc.allocatedDevices.Has(deviceID) {
if !alloc.allocatedState.AllocatedDevices.Has(deviceID) {
continue
}
for _, deviceCounterConsumption := range device.ConsumesCounters {
@@ -1280,7 +1466,7 @@ func (alloc *allocator) checkAvailableCounters(device deviceWithID) (bool, error
}
func (alloc *allocator) deviceInUse(deviceID DeviceID) bool {
return alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDeviceForAnyClaim(deviceID)
return alloc.allocatedState.AllocatedDevices.Has(deviceID) || alloc.allocatingDeviceForAnyClaim(deviceID)
}
func (alloc *allocator) allocatingDeviceForAnyClaim(deviceID DeviceID) bool {
@@ -1379,6 +1565,7 @@ type requestAccessor interface {
hasAdminAccess() bool
selectors() []resourceapi.DeviceSelector
tolerations() []resourceapi.DeviceToleration
capacities() *resourceapi.CapacityRequirements
}
// exactDeviceRequestAccessor is an implementation of the
@@ -1419,6 +1606,10 @@ func (d *exactDeviceRequestAccessor) tolerations() []resourceapi.DeviceToleratio
return d.request.Exactly.Tolerations
}
func (d *exactDeviceRequestAccessor) capacities() *resourceapi.CapacityRequirements {
return d.request.Exactly.Capacity
}
// deviceSubRequestAccessor is an implementation of the
// requestAccessor interface for DeviceSubRequests.
type deviceSubRequestAccessor struct {
@@ -1457,6 +1648,10 @@ func (d *deviceSubRequestAccessor) tolerations() []resourceapi.DeviceToleration
return d.subRequest.Tolerations
}
func (d *deviceSubRequestAccessor) capacities() *resourceapi.CapacityRequirements {
return d.subRequest.Capacity
}
func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) {
for _, requirement := range from {
if !containsNodeSelectorRequirement(*to, requirement) {

View File

@@ -21,7 +21,6 @@ import (
"testing"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured/internal"
"k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting"
@@ -33,12 +32,12 @@ func TestAllocator(t *testing.T) {
func(
ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (internal.Allocator, error) {
return NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return NewAllocator(ctx, features, allocatedState, classLister, slices, celCache)
},
)
}

View File

@@ -0,0 +1,127 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package experimental
import (
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/klog/v2"
)
// distinctAttributeConstraint compares an attribute value across devices.
// All devices must share the same value. When the set of devices is
// empty, any device that has the attribute can be added. After that,
// only matching devices can be added.
//
// We don't need to track *which* devices are part of the set, only
// how many.
type distinctAttributeConstraint struct {
logger klog.Logger // Includes name and attribute name, so no need to repeat in log messages.
requestNames sets.Set[string]
attributeName draapi.FullyQualifiedName
attributes map[string]draapi.DeviceAttribute
numDevices int
}
func (m *distinctAttributeConstraint) add(requestName, subRequestName string, device *draapi.Device, deviceID DeviceID) bool {
if m.requestNames.Len() > 0 && !m.matches(requestName, subRequestName) {
// Device not affected by constraint.
return true
}
attribute := lookupAttribute(device, deviceID, m.attributeName)
if attribute == nil {
// Doesn't have the attribute.
m.logger.V(7).Info("Constraint not satisfied, attribute not set")
return false
}
if m.numDevices == 0 {
// The first device can always get picked.
m.attributes[requestName] = *attribute
m.numDevices = 1
m.logger.V(7).Info("First attribute added")
return true
}
if !m.matchesAttribute(*attribute) {
m.logger.V(7).Info("Constraint not satisfied, duplicated attribute")
return false
}
m.attributes[requestName] = *attribute
m.numDevices++
m.logger.V(7).Info("Constraint satisfied by device", "device", deviceID, "numDevices", m.numDevices)
return true
}
func (m *distinctAttributeConstraint) remove(requestName, subRequestName string, device *draapi.Device, deviceID DeviceID) {
if m.requestNames.Len() > 0 && !m.matches(requestName, subRequestName) {
// Device not affected by constraint.
return
}
delete(m.attributes, requestName)
m.numDevices--
m.logger.V(7).Info("Device removed from constraint set", "device", deviceID, "numDevices", m.numDevices)
}
func (m *distinctAttributeConstraint) matches(requestName, subRequestName string) bool {
if subRequestName == "" {
return m.requestNames.Has(requestName)
} else {
fullSubRequestName := fmt.Sprintf("%s/%s", requestName, subRequestName)
return m.requestNames.Has(requestName) || m.requestNames.Has(fullSubRequestName)
}
}
func (m *distinctAttributeConstraint) matchesAttribute(attribute draapi.DeviceAttribute) bool {
for _, attr := range m.attributes {
switch {
case attribute.StringValue != nil:
if attr.StringValue != nil && attribute.StringValue == attr.StringValue {
m.logger.V(7).Info("String values duplicated")
return false
}
case attribute.IntValue != nil:
if attr.IntValue != nil && attribute.IntValue == attr.IntValue {
m.logger.V(7).Info("Int values duplicated")
return false
}
case attribute.BoolValue != nil:
if attr.BoolValue != nil && attribute.BoolValue == attr.BoolValue {
m.logger.V(7).Info("Bool values duplicated")
return false
}
case attribute.VersionValue != nil:
// semver 2.0.0 requires that version strings are in their
// minimal form (in particular, no leading zeros). Therefore a
// strict "exact equal" check can do a string comparison.
if attr.VersionValue != nil && attribute.VersionValue == attr.VersionValue {
m.logger.V(7).Info("Version values duplicated")
return false
}
default:
// Unknown value type, cannot match.
m.logger.V(7).Info("Distinct attribute type unknown")
return false
}
}
return true
}

View File

@@ -0,0 +1,220 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package experimental
import (
"errors"
"fmt"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/resource"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/utils/ptr"
)
// CmpRequestOverCapacity checks whether the new capacity request can be added within the given capacity,
// and checks whether the requested value is against the capacity requestPolicy.
func CmpRequestOverCapacity(currentConsumedCapacity ConsumedCapacity, deviceRequestCapacity *resourceapi.CapacityRequirements,
allowMultipleAllocations *bool, capacity map[draapi.QualifiedName]draapi.DeviceCapacity, allocatingCapacity ConsumedCapacity) (bool, error) {
if requestsContainNonExistCapacity(deviceRequestCapacity, capacity) {
return false, errors.New("some requested capacity has not been defined")
}
clone := currentConsumedCapacity.Clone()
for name, cap := range capacity {
convertedName := resourceapi.QualifiedName(name)
var convertedCapacity resourceapi.DeviceCapacity
err := draapi.Convert_api_DeviceCapacity_To_v1_DeviceCapacity(&cap, &convertedCapacity, nil)
if err != nil {
return false, fmt.Errorf("failed to convert DeviceCapacity %w", err)
}
var requestedValPtr *resource.Quantity
if deviceRequestCapacity != nil && deviceRequestCapacity.Requests != nil {
if requestedVal, requestedFound := deviceRequestCapacity.Requests[convertedName]; requestedFound {
requestedValPtr = &requestedVal
}
}
consumedCapacity := calculateConsumedCapacity(requestedValPtr, convertedCapacity)
if violatesPolicy(consumedCapacity, convertedCapacity.RequestPolicy) {
return false, nil
}
// If the current clone already contains an entry for this capacity, add the consumedCapacity to it.
// Otherwise, initialize it with calculated consumedCapacity.
if _, allocatedFound := clone[convertedName]; allocatedFound {
clone[convertedName].Add(consumedCapacity)
} else {
clone[convertedName] = ptr.To(consumedCapacity)
}
// If allocatingCapacity contains an entry for this capacity, add its value to clone as well.
if allocatingVal, allocatingFound := allocatingCapacity[convertedName]; allocatingFound {
clone[convertedName].Add(*allocatingVal)
}
if clone[convertedName].Cmp(cap.Value) > 0 {
return false, nil
}
}
return true, nil
}
// requestsNonExistCapacity returns true if requests contain non-exist capacity.
func requestsContainNonExistCapacity(deviceRequestCapacity *resourceapi.CapacityRequirements,
capacity map[draapi.QualifiedName]draapi.DeviceCapacity) bool {
if deviceRequestCapacity == nil || deviceRequestCapacity.Requests == nil {
return false
}
for name := range deviceRequestCapacity.Requests {
convertedName := draapi.QualifiedName(name)
if _, found := capacity[convertedName]; !found {
return true
}
}
return false
}
// calculateConsumedCapacity returns valid capacity to be consumed regarding the requested capacity and device capacity policy.
//
// If no requestPolicy, return capacity.Value.
// If no requestVal, fill the quantity by fillEmptyRequest function
// Otherwise, use requestPolicy to calculate the consumed capacity from request if applicable.
func calculateConsumedCapacity(requestedVal *resource.Quantity, capacity resourceapi.DeviceCapacity) resource.Quantity {
if requestedVal == nil {
return fillEmptyRequest(capacity)
}
if capacity.RequestPolicy == nil {
return requestedVal.DeepCopy()
}
switch {
case capacity.RequestPolicy.ValidRange != nil && capacity.RequestPolicy.ValidRange.Min != nil:
return roundUpRange(requestedVal, capacity.RequestPolicy.ValidRange)
case capacity.RequestPolicy.ValidValues != nil:
return roundUpValidValues(requestedVal, capacity.RequestPolicy.ValidValues)
}
return *requestedVal
}
// fillEmptyRequest
// return requestPolicy.default if defined.
// Otherwise, return capacity value.
func fillEmptyRequest(capacity resourceapi.DeviceCapacity) resource.Quantity {
if capacity.RequestPolicy != nil && capacity.RequestPolicy.Default != nil {
return capacity.RequestPolicy.Default.DeepCopy()
}
return capacity.Value.DeepCopy()
}
// roundUpRange rounds the requestedVal up to fit within the specified validRange.
// - If requestedVal is less than Min, it returns Min.
// - If Step is specified, it rounds requestedVal up to the nearest multiple of Step
// starting from Min.
// - If no Step is specified and requestedVal >= Min, it returns requestedVal as is.
func roundUpRange(requestedVal *resource.Quantity, validRange *resourceapi.CapacityRequestPolicyRange) resource.Quantity {
if requestedVal.Cmp(*validRange.Min) < 0 {
return validRange.Min.DeepCopy()
}
if validRange.Step == nil {
return *requestedVal
}
requestedInt64 := requestedVal.Value()
step := validRange.Step.Value()
min := validRange.Min.Value()
added := (requestedInt64 - min)
n := added / step
mod := added % step
if mod != 0 {
n += 1
}
val := min + step*n
return *resource.NewQuantity(val, resource.BinarySI)
}
// roundUpValidValues returns the first value in validValues that is greater than or equal to requestedVal.
// If no such value exists, it returns requestedVal itself.
func roundUpValidValues(requestedVal *resource.Quantity, validValues []resource.Quantity) resource.Quantity {
// Simple sequential search is used as the maximum entry of validValues is finite and small (≤10),
// and the list must already be sorted in ascending order, ensured by API validation.
// Note: A binary search could alternatively be used for better efficiency if the list grows larger.
for _, validValue := range validValues {
if requestedVal.Cmp(validValue) <= 0 {
return validValue.DeepCopy()
}
}
return *requestedVal
}
// GetConsumedCapacityFromRequest returns valid consumed capacity,
// according to claim request and defined capacity.
func GetConsumedCapacityFromRequest(requestedCapacity *resourceapi.CapacityRequirements,
consumableCapacity map[resourceapi.QualifiedName]resourceapi.DeviceCapacity) map[resourceapi.QualifiedName]resource.Quantity {
consumedCapacity := make(map[resourceapi.QualifiedName]resource.Quantity)
for name, cap := range consumableCapacity {
var requestedValPtr *resource.Quantity
if requestedCapacity != nil && requestedCapacity.Requests != nil {
if requestedVal, requestedFound := requestedCapacity.Requests[name]; requestedFound {
requestedValPtr = &requestedVal
}
}
capacity := calculateConsumedCapacity(requestedValPtr, cap)
consumedCapacity[name] = capacity
}
return consumedCapacity
}
// violatesPolicy checks whether the request violate the requestPolicy.
func violatesPolicy(requestedVal resource.Quantity, policy *resourceapi.CapacityRequestPolicy) bool {
if policy == nil {
// no policy to check
return false
}
if policy.Default != nil && requestedVal == *policy.Default {
return false
}
switch {
case policy.ValidRange != nil:
return violateValidRange(requestedVal, *policy.ValidRange)
case len(policy.ValidValues) > 0:
return violateValidValues(requestedVal, policy.ValidValues)
}
// no policy violated through to completion.
return false
}
func violateValidRange(requestedVal resource.Quantity, validRange resourceapi.CapacityRequestPolicyRange) bool {
if validRange.Max != nil &&
requestedVal.Cmp(*validRange.Max) > 0 {
return true
}
if validRange.Step != nil {
requestedInt64 := requestedVal.Value()
step := validRange.Step.Value()
min := validRange.Min.Value()
added := (requestedInt64 - min)
mod := added % step
// must be a multiply of step
if mod != 0 {
return true
}
}
return false
}
func violateValidValues(requestedVal resource.Quantity, validValues []resource.Quantity) bool {
for _, validVal := range validValues {
if requestedVal.Cmp(validVal) == 0 {
return false
}
}
return true
}

View File

@@ -0,0 +1,243 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package experimental
import (
"testing"
. "github.com/onsi/gomega"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/ptr"
)
const (
driverA = "driver-a"
pool1 = "pool-1"
device1 = "device-1"
capacity0 = "capacity-0"
capacity1 = "capacity-1"
)
var (
one = resource.MustParse("1")
two = resource.MustParse("2")
three = resource.MustParse("3")
)
func deviceConsumedCapacity(deviceID DeviceID) DeviceConsumedCapacity {
capaicty := map[resourceapi.QualifiedName]resource.Quantity{
capacity0: one,
}
return NewDeviceConsumedCapacity(deviceID, capaicty)
}
func TestConsumableCapacity(t *testing.T) {
t.Run("add-sub-allocating-consumed-capacity", func(t *testing.T) {
g := NewWithT(t)
allocatedCapacity := NewConsumedCapacity()
g.Expect(allocatedCapacity.Empty()).To(BeTrueBecause("allocated capacity should start from zero"))
oneAllocated := ConsumedCapacity{
capacity0: &one,
}
allocatedCapacity.Add(oneAllocated)
g.Expect(allocatedCapacity.Empty()).To(BeFalseBecause("capacity is added"))
allocatedCapacity.Sub(oneAllocated)
g.Expect(allocatedCapacity.Empty()).To(BeTrueBecause("capacity is subtracted to zero"))
})
t.Run("insert-remove-allocating-consumed-capacity-collection", func(t *testing.T) {
g := NewWithT(t)
deviceID := MakeDeviceID(driverA, pool1, device1)
aggregatedCapacity := NewConsumedCapacityCollection()
aggregatedCapacity.Insert(deviceConsumedCapacity(deviceID))
aggregatedCapacity.Insert(deviceConsumedCapacity(deviceID))
allocatedCapacity, found := aggregatedCapacity[deviceID]
g.Expect(found).To(BeTrueBecause("expected deviceID to be found"))
g.Expect(allocatedCapacity[capacity0].Cmp(two)).To(BeZero())
aggregatedCapacity.Remove(deviceConsumedCapacity(deviceID))
g.Expect(allocatedCapacity[capacity0].Cmp(one)).To(BeZero())
})
t.Run("get-consumed-capacity-from-request", func(t *testing.T) {
requestedCapacity := &resourceapi.CapacityRequirements{
Requests: map[resourceapi.QualifiedName]resource.Quantity{
capacity0: one,
"dummy": one,
},
}
consumableCapacity := map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
capacity0: { // with request and with default, expect requested value
Value: two,
RequestPolicy: &resourceapi.CapacityRequestPolicy{
Default: ptr.To(two),
ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one)},
},
},
capacity1: { // no request but with default, expect default
Value: two,
RequestPolicy: &resourceapi.CapacityRequestPolicy{
Default: ptr.To(one),
ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one)},
},
},
"dummy": {
Value: one, // no request and no policy (no default), expect capacity value
},
}
consumedCapacity := GetConsumedCapacityFromRequest(requestedCapacity, consumableCapacity)
g := NewWithT(t)
g.Expect(consumedCapacity).To(HaveLen(3))
for name, val := range consumedCapacity {
g.Expect(string(name)).Should(BeElementOf([]string{capacity0, capacity1, "dummy"}))
g.Expect(val.Cmp(one)).To(BeZero())
}
})
t.Run("violate-capacity-sharing", testViolateCapacityRequestPolicy)
t.Run("calculate-consumed-capacity", testCalculateConsumedCapacity)
}
func testViolateCapacityRequestPolicy(t *testing.T) {
testcases := map[string]struct {
requestedVal resource.Quantity
requestPolicy *resourceapi.CapacityRequestPolicy
expectResult bool
}{
"no constraint": {one, nil, false},
"less than maximum": {
one,
&resourceapi.CapacityRequestPolicy{
Default: ptr.To(one),
ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one), Max: &two},
},
false,
},
"more than maximum": {
two,
&resourceapi.CapacityRequestPolicy{
Default: ptr.To(one),
ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one), Max: &one},
},
true,
},
"in set": {
one,
&resourceapi.CapacityRequestPolicy{
Default: ptr.To(one),
ValidValues: []resource.Quantity{one},
},
false,
},
"not in set": {
two,
&resourceapi.CapacityRequestPolicy{
Default: ptr.To(one),
ValidValues: []resource.Quantity{one},
},
true,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
g := NewWithT(t)
violate := violatesPolicy(tc.requestedVal, tc.requestPolicy)
g.Expect(violate).To(BeEquivalentTo(tc.expectResult))
})
}
}
func testCalculateConsumedCapacity(t *testing.T) {
testcases := map[string]struct {
requestedVal *resource.Quantity
capacityValue resource.Quantity
requestPolicy *resourceapi.CapacityRequestPolicy
expectResult resource.Quantity
}{
"empty": {nil, one, &resourceapi.CapacityRequestPolicy{}, one},
"min in range": {
nil,
two,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one)}},
one,
},
"default in set": {
nil,
two,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidValues: []resource.Quantity{one}},
one,
},
"more than min in range": {
&two,
two,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one)}},
two,
},
"less than min in range": {
&one,
two,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(two)}},
two,
},
"with step (round up)": {
&two,
three,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one), Step: ptr.To(two.DeepCopy())}},
three,
},
"with step (no remaining)": {
&two,
two,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidRange: &resourceapi.CapacityRequestPolicyRange{Min: ptr.To(one), Step: ptr.To(one.DeepCopy())}},
two,
},
"valid value in set": {
&two,
three,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidValues: []resource.Quantity{one, two, three}},
two,
},
"set (round up)": {
&two,
three,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidValues: []resource.Quantity{one, three}},
three,
},
"larger than set": {
&three,
three,
&resourceapi.CapacityRequestPolicy{Default: ptr.To(one), ValidValues: []resource.Quantity{one, two}},
three,
},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
g := NewWithT(t)
capacity := resourceapi.DeviceCapacity{
Value: tc.capacityValue,
RequestPolicy: tc.requestPolicy,
}
consumedCapacity := calculateConsumedCapacity(tc.requestedVal, capacity)
g.Expect(consumedCapacity.Cmp(tc.expectResult)).To(BeZero())
})
}
}

View File

@@ -21,7 +21,6 @@ import (
"testing"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured/internal"
"k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting"
@@ -33,12 +32,12 @@ func TestAllocator(t *testing.T) {
func(
ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState internal.AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (internal.Allocator, error) {
return NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return NewAllocator(ctx, features, allocatedState.AllocatedDevices, classLister, slices, celCache)
},
)
}

View File

@@ -21,7 +21,6 @@ import (
"testing"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured/internal"
"k8s.io/dynamic-resource-allocation/structured/internal/allocatortesting"
@@ -33,12 +32,12 @@ func TestAllocator(t *testing.T) {
func(
ctx context.Context,
features Features,
allocatedDevices sets.Set[DeviceID],
allocatedState internal.AllocatedState,
classLister DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (internal.Allocator, error) {
return NewAllocator(ctx, features, allocatedDevices, classLister, slices, celCache)
return NewAllocator(ctx, features, allocatedState.AllocatedDevices, classLister, slices, celCache)
},
)
}

View File

@@ -1715,6 +1715,87 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() {
})
}
consumableCapacityTests := func() {
nodes := drautils.NewNodes(f, 1, 1)
// single device which allows multiple allocations and has 80Gi consumable memory.
driver := drautils.NewDriver(f, nodes, drautils.ToDriverResources(
[]resourceapi.CounterSet{},
[]resourceapi.Device{
{
Name: "consumable-device-1",
AllowMultipleAllocations: ptr.To(true),
Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
"memory": {
Value: resource.MustParse("8Gi"),
RequestPolicy: &resourceapi.CapacityRequestPolicy{
Default: ptr.To(resource.MustParse("1Gi")),
ValidRange: &resourceapi.CapacityRequestPolicyRange{
Min: ptr.To(resource.MustParse("1Gi")),
},
},
},
},
},
}...,
))
b := drautils.NewBuilder(f, driver)
f.It("must allow multiple allocations and consume capacity", f.WithLabel("KubeletMinVersion:1.34"), func(ctx context.Context) {
// The first pod will use 4Gi of the device.
claim := b.ExternalClaim()
claim.Spec.Devices.Requests[0].Exactly.Capacity = &resourceapi.CapacityRequirements{
Requests: map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
}
pod := b.PodExternal()
pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name
b.Create(ctx, claim, pod)
b.TestPod(ctx, f, pod)
// The second pod will be failed to request 8Gi capacity.
claim2 := b.ExternalClaim()
claim2.Spec.Devices.Requests[0].Exactly.Capacity = &resourceapi.CapacityRequirements{
Requests: map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("8Gi"),
},
}
pod2 := b.PodExternal()
pod2.Spec.ResourceClaims[0].ResourceClaimName = &claim2.Name
b.Create(ctx, claim2, pod2)
// The third pod should be able to use the rest 4Gi of the device.
claim3 := b.ExternalClaim()
claim3.Spec.Devices.Requests[0].Exactly.Capacity = &resourceapi.CapacityRequirements{
Requests: map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
}
pod3 := b.PodExternal()
pod3.Spec.ResourceClaims[0].ResourceClaimName = &claim3.Name
b.Create(ctx, claim3, pod3)
b.TestPod(ctx, f, pod3)
gomega.Consistently(ctx, func(ctx context.Context) error {
testPod2, err := f.ClientSet.CoreV1().Pods(pod2.Namespace).Get(ctx, pod2.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("expected the test pod %s to exist: %w", pod2.Name, err)
}
if testPod2.Status.Phase != v1.PodPending {
return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod2.Name, testPod2.Status.Phase, v1.PodPending)
}
return nil
}, 20*time.Second, 200*time.Millisecond).Should(gomega.Succeed())
// Delete the first and third pod
b.DeletePodAndWaitForNotFound(ctx, pod)
b.DeletePodAndWaitForNotFound(ctx, pod3)
// There should be available capacity for pod2 now.
b.TestPod(ctx, f, pod2)
})
}
// It is okay to use the same context multiple times (like "control plane"),
// as long as the test names the still remain unique overall.
@@ -1726,6 +1807,10 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() {
framework.Context("kubelet", feature.DynamicResourceAllocation, f.WithFeatureGate(features.DRAPrioritizedList), prioritizedListTests)
framework.Context("kubelet", feature.DynamicResourceAllocation, f.WithFeatureGate(features.DRAConsumableCapacity), consumableCapacityTests)
framework.Context("kubelet", feature.DynamicResourceAllocation, f.WithFeatureGate(features.DRAConsumableCapacity), consumableCapacityTests)
framework.Context("kubelet", feature.DynamicResourceAllocation, "with v1beta1 API", v1beta1Tests)
framework.Context("kubelet", feature.DynamicResourceAllocation, "with v1beta2 API", v1beta2Tests)
@@ -2214,12 +2299,18 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() {
framework.ExpectNoError(err)
gomega.Expect(scheduledPod).ToNot(gomega.BeNil())
var shareIDStr *string
if shareID := allocatedResourceClaim.Status.Allocation.Devices.Results[0].ShareID; shareID != nil {
shareIDStr = ptr.To(string(*shareID))
}
ginkgo.By("Setting the device status a first time")
allocatedResourceClaim.Status.Devices = append(allocatedResourceClaim.Status.Devices,
resourceapi.AllocatedDeviceStatus{
Driver: allocatedResourceClaim.Status.Allocation.Devices.Results[0].Driver,
Pool: allocatedResourceClaim.Status.Allocation.Devices.Results[0].Pool,
Device: allocatedResourceClaim.Status.Allocation.Devices.Results[0].Device,
ShareID: shareIDStr,
Conditions: []metav1.Condition{{Type: "a", Status: "True", Message: "c", Reason: "d", LastTransitionTime: metav1.NewTime(time.Now().Truncate(time.Second))}},
Data: &runtime.RawExtension{Raw: []byte(`{"foo":"bar"}`)},
NetworkData: &resourceapi.NetworkDeviceData{
@@ -2244,6 +2335,7 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() {
Driver: allocatedResourceClaim.Status.Allocation.Devices.Results[0].Driver,
Pool: allocatedResourceClaim.Status.Allocation.Devices.Results[0].Pool,
Device: allocatedResourceClaim.Status.Allocation.Devices.Results[0].Device,
ShareID: shareIDStr,
Conditions: []metav1.Condition{{Type: "e", Status: "True", Message: "g", Reason: "h", LastTransitionTime: metav1.NewTime(time.Now().Truncate(time.Second))}},
Data: &runtime.RawExtension{Raw: []byte(`{"bar":"foo"}`)},
NetworkData: &resourceapi.NetworkDeviceData{

View File

@@ -18,9 +18,11 @@ package e2edra
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
_ "embed"
"encoding/json"
"flag"
"fmt"
"io"
@@ -36,12 +38,19 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
resourceapiv1beta2 "k8s.io/api/resource/v1beta2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/version"
resourceapiac "k8s.io/client-go/applyconfigurations/resource/v1"
resourceapiacv1beta2 "k8s.io/client-go/applyconfigurations/resource/v1beta2"
restclient "k8s.io/client-go/rest"
draapiv1beta2 "k8s.io/dynamic-resource-allocation/api/v1beta2"
"k8s.io/kubernetes/cmd/kubeadm/app/util/errors"
drautils "k8s.io/kubernetes/test/e2e/dra/utils"
"k8s.io/kubernetes/test/e2e/framework"
@@ -196,6 +205,7 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
tCtx.ExpectNoError(e2enode.WaitForAllNodesSchedulable(tCtx, tCtx.Client(), f.Timeouts.NodeSchedulable), "wait for all nodes to be schedulable")
nodes := drautils.NewNodesNow(tCtx, f, 1, 1)
testResourceClaimDeviceStatusAfterUpgrade, testResourceClaimDeviceStatusAfterDowngrade := testResourceClaimDeviceStatus(tCtx, namespace.Name)
// Opening sockets locally avoids intermittent errors and delays caused by proxying through the restarted apiserver.
// We could speed up testing by shortening the sync delay in the ResourceSlice controller, but let's better
@@ -218,6 +228,7 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
// TODO
restoreOptions := cluster.Modify(tCtx, localupcluster.ModifyOptions{Upgrade: true, BinDir: dir})
tCtx = ktesting.End(tCtx)
testResourceClaimDeviceStatusAfterUpgrade()
// The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running.
// Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices.
@@ -251,6 +262,7 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
return output
}).Should(gomega.ContainSubstring(`"Caches are synced" controller="resource_claim"`))
tCtx = ktesting.End(tCtx)
testResourceClaimDeviceStatusAfterDowngrade()
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
@@ -367,3 +379,200 @@ func serverDownloadURL(tCtx ktesting.TContext, major, minor uint) (string, strin
tCtx.ExpectNoError(err, "read response body for %s", url)
return fmt.Sprintf("https://dl.k8s.io/release/%s/kubernetes-server-%s-%s.tar.gz", string(version), runtime.GOOS, runtime.GOARCH), string(version)
}
// testResourceClaimDeviceStatus corresponds to testResourceClaimDeviceStatus in test/integration/dra
// and was copied from there, therefore the unit-test style with tCtx and require.
func testResourceClaimDeviceStatus(tCtx ktesting.TContext, namespace string) (afterUpgrade, afterDowngrade func()) {
claimName := "claim-with-device-status"
claim := &resourceapiv1beta2.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: claimName,
},
Spec: resourceapiv1beta2.ResourceClaimSpec{
Devices: resourceapiv1beta2.DeviceClaim{
Requests: []resourceapiv1beta2.DeviceRequest{
{
Name: "foo",
Exactly: &resourceapiv1beta2.ExactDeviceRequest{
DeviceClassName: "foo",
},
},
},
},
},
}
claim, err := tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).Create(tCtx, claim, metav1.CreateOptions{})
tCtx.ExpectNoError(err, "create ResourceClaim")
// Add an allocation result.
// A finalizer is required for that.
finalizer := "test.example.com/my-test-finalizer"
claim.Finalizers = append(claim.Finalizers, finalizer)
claim, err = tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).Update(tCtx, claim, metav1.UpdateOptions{})
claim.Status.Allocation = &resourceapiv1beta2.AllocationResult{
Devices: resourceapiv1beta2.DeviceAllocationResult{
Results: []resourceapiv1beta2.DeviceRequestAllocationResult{
{
Request: "foo",
Driver: "one",
Pool: "global",
Device: "my-device",
},
{
Request: "foo",
Driver: "two",
Pool: "global",
Device: "another-device",
},
{
Request: "foo",
Driver: "three",
Pool: "global",
Device: "my-device",
},
},
},
}
tCtx.ExpectNoError(err, "add finalizer")
removeClaim := func(tCtx ktesting.TContext) {
client := tCtx.Client().ResourceV1beta2()
claim, err := client.ResourceClaims(namespace).Get(tCtx, claimName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return
}
tCtx.ExpectNoError(err, "get claim to remove finalizer")
if claim.Status.Allocation != nil {
claim.Status.Allocation = nil
claim, err = client.ResourceClaims(namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "remove allocation")
}
claim.Finalizers = nil
claim, err = client.ResourceClaims(namespace).Update(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "remove finalizer")
err = client.ResourceClaims(namespace).Delete(tCtx, claim.Name, metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete claim")
}
tCtx.CleanupCtx(removeClaim)
claim, err = tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "add allocation result")
// Now adding the device status should work.
deviceStatus := []resourceapiv1beta2.AllocatedDeviceStatus{{
Driver: "one",
Pool: "global",
Device: "my-device",
Data: &apiruntime.RawExtension{
Raw: []byte(`{"kind": "foo", "apiVersion": "dra.example.com/v1"}`),
},
NetworkData: &resourceapiv1beta2.NetworkDeviceData{
InterfaceName: "net-1",
IPs: []string{
"10.9.8.0/24",
"2001:db8::/64",
},
HardwareAddress: "ea:9f:cb:40:b1:7b",
},
}}
claim.Status.Devices = deviceStatus
tCtx.ExpectNoError(err, "add device status")
require.Equal(tCtx, deviceStatus, claim.Status.Devices, "after adding device status")
// Strip the RawExtension. SSA re-encodes it, which causes negligble differences that nonetheless break assert.Equal.
claim.Status.Devices[0].Data = nil
deviceStatus[0].Data = nil
claim, err = tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "add device status")
require.Equal(tCtx, deviceStatus, claim.Status.Devices, "after stripping RawExtension")
// Exercise SSA.
deviceStatusAC := resourceapiacv1beta2.AllocatedDeviceStatus().
WithDriver("two").
WithPool("global").
WithDevice("another-device").
WithNetworkData(resourceapiacv1beta2.NetworkDeviceData().WithInterfaceName("net-2"))
deviceStatus = append(deviceStatus, resourceapiv1beta2.AllocatedDeviceStatus{
Driver: "two",
Pool: "global",
Device: "another-device",
NetworkData: &resourceapiv1beta2.NetworkDeviceData{
InterfaceName: "net-2",
},
})
claimAC := resourceapiacv1beta2.ResourceClaim(claim.Name, claim.Namespace).
WithStatus(resourceapiacv1beta2.ResourceClaimStatus().WithDevices(deviceStatusAC))
claim, err = tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).ApplyStatus(tCtx, claimAC, metav1.ApplyOptions{
Force: true,
FieldManager: "manager-1",
})
tCtx.ExpectNoError(err, "apply device status two")
require.Equal(tCtx, deviceStatus, claim.Status.Devices, "after applying device status two")
deviceStatusAC = resourceapiacv1beta2.AllocatedDeviceStatus().
WithDriver("three").
WithPool("global").
WithDevice("my-device").
WithNetworkData(resourceapiacv1beta2.NetworkDeviceData().WithInterfaceName("net-3"))
deviceStatus = append(deviceStatus, resourceapiv1beta2.AllocatedDeviceStatus{
Driver: "three",
Pool: "global",
Device: "my-device",
NetworkData: &resourceapiv1beta2.NetworkDeviceData{
InterfaceName: "net-3",
},
})
claimAC = resourceapiacv1beta2.ResourceClaim(claim.Name, claim.Namespace).
WithStatus(resourceapiacv1beta2.ResourceClaimStatus().WithDevices(deviceStatusAC))
claim, err = tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).ApplyStatus(tCtx, claimAC, metav1.ApplyOptions{
Force: true,
FieldManager: "manager-2",
})
tCtx.ExpectNoError(err, "apply device status three")
require.Equal(tCtx, deviceStatus, claim.Status.Devices, "after applying device status three")
var buffer bytes.Buffer
encoder := json.NewEncoder(&buffer)
encoder.SetIndent(" ", " ")
tCtx.ExpectNoError(encoder.Encode(claim))
tCtx.Logf("Final ResourceClaim:\n%s", buffer.String())
afterUpgrade = func() {
// Update one entry, remove the other.
deviceStatusAC := resourceapiac.AllocatedDeviceStatus().
WithDriver("two").
WithPool("global").
WithDevice("another-device").
WithNetworkData(resourceapiac.NetworkDeviceData().WithInterfaceName("yet-another-net"))
deviceStatus[1].NetworkData.InterfaceName = "yet-another-net"
claimAC := resourceapiac.ResourceClaim(claim.Name, claim.Namespace).
WithStatus(resourceapiac.ResourceClaimStatus().WithDevices(deviceStatusAC))
claim, err := tCtx.Client().ResourceV1().ResourceClaims(namespace).ApplyStatus(tCtx, claimAC, metav1.ApplyOptions{
Force: true,
FieldManager: "manager-1",
})
tCtx.ExpectNoError(err, "update device status two")
var deviceStatusV1 []resourceapi.AllocatedDeviceStatus
for _, status := range deviceStatus {
var statusV1 resourceapi.AllocatedDeviceStatus
tCtx.ExpectNoError(draapiv1beta2.Convert_v1beta2_AllocatedDeviceStatus_To_v1_AllocatedDeviceStatus(&status, &statusV1, nil))
deviceStatusV1 = append(deviceStatusV1, statusV1)
}
require.Equal(tCtx, deviceStatusV1, claim.Status.Devices, "after updating device status two")
}
afterDowngrade = func() {
claimAC := resourceapiacv1beta2.ResourceClaim(claim.Name, claim.Namespace)
deviceStatus = deviceStatus[0:2]
claim, err := tCtx.Client().ResourceV1beta2().ResourceClaims(namespace).ApplyStatus(tCtx, claimAC, metav1.ApplyOptions{
Force: true,
FieldManager: "manager-2",
})
tCtx.ExpectNoError(err, "remove device status three")
require.Equal(tCtx, deviceStatus, claim.Status.Devices, "after removing device status three")
// The cleanup order is so that we have to run this explicitly now.
// The tCtx.CleanupCtx is more for the sake of completeness.
removeClaim(tCtx)
}
return
}

View File

@@ -303,6 +303,7 @@ func TestDRA(t *testing.T) {
// as needed by tests for them.
features.DRAAdminAccess: true,
features.DRADeviceBindingConditions: true,
features.DRAConsumableCapacity: true,
features.DRADeviceTaints: true,
features.DRAPartitionableDevices: true,
features.DRAPrioritizedList: true,
@@ -872,13 +873,14 @@ func testPublishResourceSlices(tCtx ktesting.TContext, haveLatestAPI bool, disab
var expected []any
for _, device := range spec.Devices {
expected = append(expected, gstruct.MatchAllFields(gstruct.Fields{
"Name": gomega.Equal(device.Name),
"Attributes": gomega.Equal(device.Attributes),
"Capacity": gomega.Equal(device.Capacity),
"ConsumesCounters": gomega.Equal(device.ConsumesCounters),
"NodeName": matchPointer(device.NodeName),
"NodeSelector": matchPointer(device.NodeSelector),
"AllNodes": matchPointer(device.AllNodes),
"Name": gomega.Equal(device.Name),
"AllowMultipleAllocations": gomega.Equal(device.AllowMultipleAllocations),
"Attributes": gomega.Equal(device.Attributes),
"Capacity": gomega.Equal(device.Capacity),
"ConsumesCounters": gomega.Equal(device.ConsumesCounters),
"NodeName": matchPointer(device.NodeName),
"NodeSelector": matchPointer(device.NodeSelector),
"AllNodes": matchPointer(device.AllNodes),
"Taints": gomega.HaveExactElements(func() []any {
var expected []any
for _, taint := range device.Taints {

View File

@@ -278,9 +278,10 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
claimInformer := informerFactory.Resource().V1().ResourceClaims().Informer()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
resourceSliceTrackerOpts := resourceslicetracker.Options{
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
KubeClient: tCtx.Client(),
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
EnableConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
KubeClient: tCtx.Client(),
}
if resourceSliceTrackerOpts.EnableDeviceTaints {
resourceSliceTrackerOpts.TaintInformer = informerFactory.Resource().V1alpha3().DeviceTaintRules()
@@ -306,7 +307,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
}
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
celCache := cel.NewCache(10)
celCache := cel.NewCache(10, cel.Features{EnableConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity)})
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
@@ -327,21 +328,40 @@ claims:
claims, err := draManager.ResourceClaims().List()
tCtx.ExpectNoError(err, "list claims")
allocatedDevices := sets.New[structured.DeviceID]()
allocatedSharedDeviceIDs := sets.New[structured.SharedDeviceID]()
aggregatedCapacity := structured.NewConsumedCapacityCollection()
for _, claim := range claims {
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
allocatedDevices.Insert(deviceID)
if result.ShareID == nil {
allocatedDevices.Insert(deviceID)
continue
}
sharedDeviceID := structured.MakeSharedDeviceID(deviceID, result.ShareID)
allocatedSharedDeviceIDs.Insert(sharedDeviceID)
claimedCapacity := result.ConsumedCapacity
if claimedCapacity != nil {
allocatedCapacity := structured.NewDeviceConsumedCapacity(deviceID, claimedCapacity)
aggregatedCapacity.Insert(allocatedCapacity)
}
}
}
allocatedState := structured.AllocatedState{
AllocatedDevices: allocatedDevices,
AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs,
AggregatedCapacity: aggregatedCapacity,
}
allocator, err := structured.NewAllocator(tCtx, structured.Features{
PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
DeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
PartitionableDevices: utilfeature.DefaultFeatureGate.Enabled(features.DRAPartitionableDevices),
}, allocatedDevices, draManager.DeviceClasses(), slices, celCache)
ConsumableCapacity: utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity),
}, allocatedState, draManager.DeviceClasses(), slices, celCache)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {

View File

@@ -693,3 +693,69 @@
initPods: 2500
measurePods: 2500
maxClaimsPerNode: 10
# SteadyStateClusterResourceClaimTemplateConsumableCapacity is a variant of
# SchedulingWithResourceClaimTemplate. It creates a single ResourceSlice that have two devices,
# one preallocate device slice and one basic device. Both allows multiple allocations and contain consumable capacity.
# And, it creates a resource claim template with two requests.
# Each requests half of this capacity for two count.
# The first request checks distinctAttribute which the other checks matchAttribute.
- name: SteadyStateClusterResourceClaimTemplateConsumableCapacity
featureGates:
DynamicResourceAllocation: true
DRAConsumableCapacity: true
workloadTemplate:
- opcode: createNodes
countParam: $nodesWithoutDRA
- opcode: createNodes
nodeTemplatePath: templates/node-with-dra-test-driver.yaml
countParam: $nodesWithDRA
- opcode: createResourceDriver
driverName: test-driver.cdi.k8s.io
nodes: scheduler-perf-dra-*
maxClaimsPerNodeParam: $maxClaimsPerNode
- opcode: createAny
templatePath: templates/resourceslice-consumablecapacity.yaml
countParam: $resourceSlices
- opcode: createAny
templatePath: templates/deviceclass-consumablecapacity.yaml
- opcode: createAny
templatePath: templates/resourceclaimtemplate-consumablecapacity.yaml
namespace: test
- opcode: createPods
namespace: test
countParam: $measurePods
steadyState: true
durationParam: $duration
podTemplatePath: templates/pod-with-claim-template.yaml
collectMetrics: true
workloads:
- name: fast
labels: [integration-test, short]
params:
nodesWithDRA: 1
nodesWithoutDRA: 1
resourceSlices: 1
measurePods: 1
duration: 2s
maxClaimsPerNode: 2
- name: fast_with_DRAPartitionableDevices
featureGates:
DRAPartitionableDevices: true
DRAResourceClaimDeviceStatus: true
labels: [integration-test, short]
params:
nodesWithDRA: 1
nodesWithoutDRA: 1
resourceSlices: 1
measurePods: 1
duration: 2s
maxClaimsPerNode: 2
- name: 2000pods_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
resourceSlices: 2000
measurePods: 2000
duration: 10s
maxClaimsPerNode: 20

View File

@@ -0,0 +1,10 @@
apiVersion: resource.k8s.io/v1beta2
kind: DeviceClass
metadata:
name: test-class
spec:
selectors:
- cel:
expression: |-
device.driver == "test-driver.cdi.k8s.io" &&
device.allowMultipleAllocations == true

View File

@@ -0,0 +1,29 @@
apiVersion: resource.k8s.io/v1beta2
kind: ResourceClaimTemplate
metadata:
name: test-claim-template
spec:
spec:
devices:
requests:
- name: req-0
exactly:
deviceClassName: test-class
count: 2
capacity:
requests:
memory: 40Gi
- name: req-1
exactly:
deviceClassName: test-class
count: 2
capacity:
requests:
memory: 40Gi
constraints:
- requests:
- req-0
distinctAttribute: dra.example.com/slice
- requests:
- req-1
matchAttribute: dra.example.com/slice

View File

@@ -0,0 +1,63 @@
kind: ResourceSlice
apiVersion: resource.k8s.io/v1beta2
metadata:
name: resourceslice-{{.Index}}
spec:
pool:
name: resourceslice-{{.Index}}
generation: 1
resourceSliceCount: 1
driver: test-driver.cdi.k8s.io
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: node-with-dra
operator: In
values:
- "true"
sharedCounters:
- name: counter-set
counters:
counter1:
value: "1"
counter2:
value: "1"
devices:
- name: shareable-device
allowMultipleAllocations: true
attributes:
preallocate:
bool: false
dra.example.com/slice:
int: {{.Index}}
capacity:
memory:
value: 80Gi
requestPolicy:
default: 1Gi
validRange:
min: 1Gi
# 2 counter devices
- name: device-2-counters-1
allowMultipleAllocations: true
attributes:
preallocate:
bool: true
dra.example.com/slice:
int: {{.Index}}
capacity:
counters:
value: "2"
memory:
value: 80Gi
requestPolicy:
default: 1Gi
validRange:
min: 1Gi
consumesCounters:
- counterSet: counter-set
counters:
counter1:
value: "1"
counter2:
value: "1"