Merge pull request #126807 from pohly/dra-resourceslice-update

DRA scheduler: ResourceSlice update
This commit is contained in:
Kubernetes Prow Robot
2024-08-22 15:18:09 +01:00
committed by GitHub
7 changed files with 215 additions and 92 deletions

View File

@@ -77,69 +77,81 @@ type Nodes struct {
var pluginPermissions string
// NewNodes selects nodes to run the test on.
//
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
// want the plugin to run.
//
// Only a subset of the nodes are picked to avoid causing
// unnecessary load on a big cluster.
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
framework.ExpectNoError(err, "get nodes")
numNodes := int32(len(nodeList.Items))
if int(numNodes) < minNodes {
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
}
nodes.NodeNames = nil
for _, node := range nodeList.Items {
nodes.NodeNames = append(nodes.NodeNames, node.Name)
}
sort.Strings(nodes.NodeNames)
framework.Logf("testing on nodes %v", nodes.NodeNames)
// Watch claims in the namespace. This is useful for monitoring a test
// and enables additional sanity checks.
claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil)
cancelCtx, cancel := context.WithCancelCause(context.Background())
var wg sync.WaitGroup
ginkgo.DeferCleanup(func() {
cancel(errors.New("test has completed"))
wg.Wait()
})
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourceapi.ResourceClaim)
framework.Logf("New claim:\n%s", format.Object(claim, 1))
validateClaim(claim)
},
UpdateFunc: func(oldObj, newObj any) {
defer ginkgo.GinkgoRecover()
oldClaim := oldObj.(*resourceapi.ResourceClaim)
newClaim := newObj.(*resourceapi.ResourceClaim)
framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim))
validateClaim(newClaim)
},
DeleteFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourceapi.ResourceClaim)
framework.Logf("Deleted claim:\n%s", format.Object(claim, 1))
},
})
framework.ExpectNoError(err, "AddEventHandler")
wg.Add(1)
go func() {
defer wg.Done()
claimInformer.Run(cancelCtx.Done())
}()
nodes.init(ctx, f, minNodes, maxNodes)
})
return nodes
}
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It.
func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
nodes.init(ctx, f, minNodes, maxNodes)
return nodes
}
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
ginkgo.By("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
// want the plugin to run.
//
// Only a subset of the nodes are picked to avoid causing
// unnecessary load on a big cluster.
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
framework.ExpectNoError(err, "get nodes")
numNodes := int32(len(nodeList.Items))
if int(numNodes) < minNodes {
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes, numNodes)
}
nodes.NodeNames = nil
for _, node := range nodeList.Items {
nodes.NodeNames = append(nodes.NodeNames, node.Name)
}
sort.Strings(nodes.NodeNames)
framework.Logf("testing on nodes %v", nodes.NodeNames)
// Watch claims in the namespace. This is useful for monitoring a test
// and enables additional sanity checks.
claimInformer := resourceapiinformer.NewResourceClaimInformer(f.ClientSet, f.Namespace.Name, 100*time.Hour /* resync */, nil)
cancelCtx, cancel := context.WithCancelCause(context.Background())
var wg sync.WaitGroup
ginkgo.DeferCleanup(func() {
cancel(errors.New("test has completed"))
wg.Wait()
})
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourceapi.ResourceClaim)
framework.Logf("New claim:\n%s", format.Object(claim, 1))
validateClaim(claim)
},
UpdateFunc: func(oldObj, newObj any) {
defer ginkgo.GinkgoRecover()
oldClaim := oldObj.(*resourceapi.ResourceClaim)
newClaim := newObj.(*resourceapi.ResourceClaim)
framework.Logf("Updated claim:\n%s\nDiff:\n%s", format.Object(newClaim, 1), cmp.Diff(oldClaim, newClaim))
validateClaim(newClaim)
},
DeleteFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourceapi.ResourceClaim)
framework.Logf("Deleted claim:\n%s", format.Object(claim, 1))
},
})
framework.ExpectNoError(err, "AddEventHandler")
wg.Add(1)
go func() {
defer wg.Done()
claimInformer.Run(cancelCtx.Done())
}()
}
func validateClaim(claim *resourceapi.ResourceClaim) {
// The apiserver doesn't enforce that a claim always has a finalizer
// while being allocated. This is a convention that whoever allocates a
@@ -153,28 +165,43 @@ func validateClaim(claim *resourceapi.ResourceClaim) {
// NewDriver sets up controller (as client of the cluster) and
// kubelet plugin (via proxy) before the test runs. It cleans
// up after the test.
//
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) *Driver {
d := &Driver{
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
NodeV1alpha3: true,
}
d := NewDriverInstance(f)
ginkgo.BeforeEach(func() {
resources := configureResources()
if len(resources.Nodes) == 0 {
// This always has to be set because the driver might
// not run on all nodes.
resources.Nodes = nodes.NodeNames
}
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
d.SetUp(nodes, resources, devicesPerNode...)
ginkgo.DeferCleanup(d.TearDown)
d.Run(nodes, configureResources, devicesPerNode...)
})
return d
}
// NewDriverInstance is a variant of NewDriver where the driver is inactive and must
// be started explicitly with Run. May be used inside ginkgo.It.
func NewDriverInstance(f *framework.Framework) *Driver {
d := &Driver{
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
NodeV1alpha3: true,
parameterMode: parameterModeStructured,
}
d.initName()
return d
}
func (d *Driver) Run(nodes *Nodes, configureResources func() app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
resources := configureResources()
if len(resources.Nodes) == 0 {
// This always has to be set because the driver might
// not run on all nodes.
resources.Nodes = nodes.NodeNames
}
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
d.SetUp(nodes, resources, devicesPerNode...)
ginkgo.DeferCleanup(d.TearDown)
}
type MethodInstance struct {
Nodename string
FullMethod string
@@ -215,25 +242,23 @@ const (
parameterModeStructured parameterMode = "structured" // allocation through scheduler
)
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
ginkgo.By(fmt.Sprintf("deploying driver on nodes %v", nodes.NodeNames))
d.Nodes = make(map[string]KubeletPlugin)
func (d *Driver) initName() {
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
}
func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...map[string]map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) {
d.initName()
ginkgo.By(fmt.Sprintf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames))
d.Nodes = make(map[string]KubeletPlugin)
resources.DriverName = d.Name
ctx, cancel := context.WithCancel(context.Background())
if d.NameSuffix != "" {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "instance"+d.NameSuffix)
ctx = klog.NewContext(ctx, logger)
}
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
ctx = klog.NewContext(ctx, logger)
d.ctx = ctx
d.cleanup = append(d.cleanup, cancel)
if d.parameterMode == "" {
d.parameterMode = parameterModeStructured
}
switch d.parameterMode {
case parameterModeClassicDRA:
// The controller is easy: we simply connect to the API server.
@@ -387,7 +412,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources, devicesPerNode ...
// Here we merely use impersonation, which is faster.
driverClient := d.impersonateKubeletPlugin(&pod)
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
logger := klog.LoggerWithValues(klog.LoggerWithName(logger, "kubelet-plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
loggerCtx := klog.NewContext(ctx, logger)
fileOps := app.FileOperations{
Create: func(name string, content []byte) error {

View File

@@ -1390,6 +1390,31 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
ginkgo.Context("multiple drivers", func() {
multipleDriversContext("using only drapbv1alpha3", true)
})
ginkgo.It("runs pod after driver starts", func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 4)
driver := NewDriverInstance(f)
b := newBuilderNow(ctx, f, driver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
// Cannot run pod, no devices.
framework.ExpectNoError(e2epod.WaitForPodNameUnschedulableInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace))
// Set up driver, which makes devices available.
driver.Run(nodes, perNode(1, nodes))
// Now it should run.
b.testPod(ctx, f.ClientSet, pod)
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1alpha3().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
})
// builder contains a running counter to make objects unique within thir
@@ -1676,16 +1701,20 @@ func testContainerEnv(ctx context.Context, clientSet kubernetes.Interface, pod *
func newBuilder(f *framework.Framework, driver *Driver) *builder {
b := &builder{f: f, driver: driver}
ginkgo.BeforeEach(b.setUp)
return b
}
func (b *builder) setUp() {
func newBuilderNow(ctx context.Context, f *framework.Framework, driver *Driver) *builder {
b := &builder{f: f, driver: driver}
b.setUp(ctx)
return b
}
func (b *builder) setUp(ctx context.Context) {
b.podCounter = 0
b.claimCounter = 0
b.create(context.Background(), b.class())
b.create(ctx, b.class())
ginkgo.DeferCleanup(b.tearDown)
}