dra: add "named resources" structured parameter model

Like the current device plugin interface, a DRA driver using this model
announces a list of resource instances. In contrast to device plugins, this
list is made available to the scheduler together with attributes that can be
used to select suitable instances when they are not all alike.

Because this is the first structured parameter model, some checks that
previously were not possible, in particular "is one structured parameter field
set", now gets enabled. Adding another structured parameter model will be
similar.

The applyconfigs code generator assumes that all types in an API are defined in
a single package. If it wasn't for that, it would be possible to place the
"named resources" types in separate packages, which makes their names in the Go
code more natural and provides an indication of their stability level because
the package name could include a version.
This commit is contained in:
Patrick Ohly
2024-02-23 15:22:02 +01:00
parent 096e948905
commit d4d5ade7f5
66 changed files with 6143 additions and 274 deletions

View File

@@ -0,0 +1,150 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namedresources
import (
"context"
"errors"
"fmt"
"slices"
resourceapi "k8s.io/api/resource/v1alpha2"
"k8s.io/apiserver/pkg/cel/environment"
"k8s.io/dynamic-resource-allocation/structured/namedresources/cel"
)
type Model struct {
instances []instanceAllocation
}
type instanceAllocation struct {
allocated bool
instance *resourceapi.NamedResourcesInstance
}
// AddResources must be called first to create entries for all existing
// resource instances. The resources parameter may be nil.
func AddResources(m *Model, resources *resourceapi.NamedResourcesResources) {
if resources == nil {
return
}
for i := range resources.Instances {
m.instances = append(m.instances, instanceAllocation{instance: &resources.Instances[i]})
}
}
// AddAllocation may get called after AddResources to mark some resource
// instances as allocated. The result parameter may be nil.
func AddAllocation(m *Model, result *resourceapi.NamedResourcesAllocationResult) {
if result == nil {
return
}
for i := range m.instances {
if m.instances[i].instance.Name == result.Name {
m.instances[i].allocated = true
break
}
}
}
func NewClaimController(filter *resourceapi.NamedResourcesFilter, requests []*resourceapi.NamedResourcesRequest) (*Controller, error) {
c := &Controller{}
if filter != nil {
compilation := cel.Compiler.CompileCELExpression(filter.Selector, environment.StoredExpressions)
if compilation.Error != nil {
// Shouldn't happen because of validation.
return nil, fmt.Errorf("compile class filter CEL expression: %w", compilation.Error)
}
c.filter = &compilation
}
for _, request := range requests {
compilation := cel.Compiler.CompileCELExpression(request.Selector, environment.StoredExpressions)
if compilation.Error != nil {
// Shouldn't happen because of validation.
return nil, fmt.Errorf("compile request CEL expression: %w", compilation.Error)
}
c.requests = append(c.requests, compilation)
}
return c, nil
}
type Controller struct {
filter *cel.CompilationResult
requests []cel.CompilationResult
}
func (c *Controller) NodeIsSuitable(ctx context.Context, model Model) (bool, error) {
indices, err := c.allocate(ctx, model)
return len(indices) == len(c.requests), err
}
func (c *Controller) Allocate(ctx context.Context, model Model) ([]*resourceapi.NamedResourcesAllocationResult, error) {
indices, err := c.allocate(ctx, model)
if err != nil {
return nil, err
}
if len(indices) != len(c.requests) {
return nil, errors.New("insufficient resources")
}
results := make([]*resourceapi.NamedResourcesAllocationResult, len(c.requests))
for i := range c.requests {
results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.instances[indices[i]].instance.Name}
}
return results, nil
}
func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) {
// Shallow copy, we need to modify the allocated boolean.
instances := slices.Clone(model.instances)
indices := make([]int, 0, len(c.requests))
for _, request := range c.requests {
for i, instance := range instances {
if instance.allocated {
continue
}
if c.filter != nil {
okay, err := c.filter.Evaluate(ctx, instance.instance.Attributes)
if err != nil {
return nil, fmt.Errorf("evaluate filter CEL expression: %w", err)
}
if !okay {
continue
}
}
okay, err := request.Evaluate(ctx, instance.instance.Attributes)
if err != nil {
return nil, fmt.Errorf("evaluate request CEL expression: %w", err)
}
if !okay {
continue
}
// Found a matching, unallocated instance. Let's use it.
//
// A more thorough search would include backtracking because
// allocating one "large" instances for a "small" request may
// make a following "large" request impossible to satisfy when
// only "small" instances are left.
instances[i].allocated = true
indices = append(indices, i)
break
}
}
return indices, nil
}

View File

@@ -23,8 +23,10 @@ import (
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/klog/v2"
namedresourcesmodel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
)
@@ -35,7 +37,7 @@ type resources map[string]map[string]resourceModels
// resourceModels may have more than one entry because it is valid for a driver to
// use more than one structured parameter model.
type resourceModels struct {
// TODO: add some structured parameter model
namedresources namedresourcesmodel.Model
}
// newResourceModel parses the available information about resources. Objects
@@ -54,7 +56,7 @@ func newResourceModel(logger klog.Logger, nodeResourceSliceLister resourcev1alph
model[slice.NodeName] = make(map[string]resourceModels)
}
resource := model[slice.NodeName][slice.DriverName]
// TODO: add some structured parameter model
namedresourcesmodel.AddResources(&resource.namedresources, slice.NamedResources)
model[slice.NodeName][slice.DriverName] = resource
}
@@ -75,11 +77,11 @@ func newResourceModel(logger klog.Logger, nodeResourceSliceLister resourcev1alph
if model[structured.NodeName] == nil {
model[structured.NodeName] = make(map[string]resourceModels)
}
// resource := model[structured.NodeName][handle.DriverName]
// TODO: add some structured parameter model
// for _, result := range structured.Results {
// // Call AddAllocation for each known model. Each call itself needs to check for nil.
// }
resource := model[structured.NodeName][handle.DriverName]
for _, result := range structured.Results {
// Call AddAllocation for each known model. Each call itself needs to check for nil.
namedresourcesmodel.AddAllocation(&resource.namedresources, result.NamedResources)
}
}
}
@@ -90,12 +92,52 @@ func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClas
// Each node driver is separate from the others. Each driver may have
// multiple requests which need to be allocated together, so here
// we have to collect them per model.
// TODO: implement some structured parameters model
type perDriverRequests struct {
parameters []runtime.RawExtension
requests []*resourcev1alpha2.NamedResourcesRequest
}
namedresourcesRequests := make(map[string]perDriverRequests)
for i, request := range claimParameters.DriverRequests {
driverName := request.DriverName
p := namedresourcesRequests[driverName]
for e, request := range request.Requests {
switch {
case request.ResourceRequestModel.NamedResources != nil:
p.parameters = append(p.parameters, request.VendorParameters)
p.requests = append(p.requests, request.ResourceRequestModel.NamedResources)
default:
return nil, fmt.Errorf("claim parameters %s: driverRequersts[%d].requests[%d]: no supported structured parameters found", klog.KObj(claimParameters), i, e)
}
}
if len(p.requests) > 0 {
namedresourcesRequests[driverName] = p
}
}
c := &claimController{
class: class,
classParameters: classParameters,
claimParameters: claimParameters,
namedresources: make(map[string]perDriverController, len(namedresourcesRequests)),
}
for driverName, perDriver := range namedresourcesRequests {
var filter *resourcev1alpha2.NamedResourcesFilter
if classParameters != nil {
for _, f := range classParameters.Filters {
if f.DriverName == driverName && f.ResourceFilterModel.NamedResources != nil {
filter = f.ResourceFilterModel.NamedResources
break
}
}
}
controller, err := namedresourcesmodel.NewClaimController(filter, perDriver.requests)
if err != nil {
return nil, fmt.Errorf("creating claim controller for named resources structured model: %w", err)
}
c.namedresources[driverName] = perDriverController{
parameters: perDriver.parameters,
controller: controller,
}
}
return c, nil
}
@@ -106,11 +148,28 @@ type claimController struct {
class *resourcev1alpha2.ResourceClass
classParameters *resourcev1alpha2.ResourceClassParameters
claimParameters *resourcev1alpha2.ResourceClaimParameters
// TODO: implement some structured parameters model
namedresources map[string]perDriverController
}
type perDriverController struct {
parameters []runtime.RawExtension
controller *namedresourcesmodel.Controller
}
func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) {
// TODO: implement some structured parameters model
nodeResources := resources[nodeName]
for driverName, perDriver := range c.namedresources {
okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].namedresources)
if err != nil {
// This is an error in the CEL expression which needs
// to be fixed. Better fail very visibly instead of
// ignoring the node.
return false, fmt.Errorf("checking node %q and resources of driver %q: %w", nodeName, driverName, err)
}
if !okay {
return false, nil
}
}
return true, nil
}
@@ -128,7 +187,49 @@ func (c claimController) allocate(ctx context.Context, nodeName string, resource
},
}
// TODO: implement some structured parameters model
nodeResources := resources[nodeName]
for driverName, perDriver := range c.namedresources {
// Must return one entry for each request. The entry may be nil. This way,
// the result can be correlated with the per-request parameters.
results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].namedresources)
if err != nil {
return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err)
}
handle := resourcev1alpha2.ResourceHandle{
DriverName: driverName,
StructuredData: &resourcev1alpha2.StructuredResourceHandle{
NodeName: nodeName,
},
}
for i, result := range results {
if result == nil {
continue
}
handle.StructuredData.Results = append(handle.StructuredData.Results,
resourcev1alpha2.DriverAllocationResult{
VendorRequestParameters: perDriver.parameters[i],
AllocationResultModel: resourcev1alpha2.AllocationResultModel{
NamedResources: result,
},
},
)
}
if c.classParameters != nil {
for _, p := range c.classParameters.VendorParameters {
if p.DriverName == driverName {
handle.StructuredData.VendorClassParameters = p.Parameters
break
}
}
}
for _, request := range c.claimParameters.DriverRequests {
if request.DriverName == driverName {
handle.StructuredData.VendorClaimParameters = request.VendorParameters
break
}
}
allocation.ResourceHandles = append(allocation.ResourceHandles, handle)
}
return c.class.DriverName, allocation, nil
}