Merge pull request #132598 from bart0sh/PR182-DRA-handle-serving-failures

DRA plugin: handle gRPC serving failures
This commit is contained in:
Kubernetes Prow Robot
2025-07-16 14:18:24 -07:00
committed by GitHub
7 changed files with 316 additions and 86 deletions

View File

@@ -119,20 +119,43 @@ type DRAPlugin interface {
// must have an entry in the response, even if that entry is nil.
UnprepareResourceClaims(ctx context.Context, claims []NamespacedObject) (result map[types.UID]error, err error)
// ErrorHandler gets called for each error encountered while publishing
// ResourceSlices. See [resourceslice.Options.ErrorHandler] for details.
// HandleError gets called for errors encountered in the background,
// for example while publishing ResourceSlices.
// See [resourceslice.Options.ErrorHandler] for details on that.
//
// A simple implementation is to only log with k8s.io/apimachinery/pkg/util/runtime.HandleErrorWithContext:
// runtime.HandleErrorWithContext(ctx, err, msg)
// The recommended implementation is to log with
// runtime.HandleErrorWithContext(ctx, err, msg) (from
// k8s.io/apimachinery/pkg/util/runtime.HandleErrorWithContext)
// and then to exit the process if the error is fatal.
// Ideally the process should shut down gracefully, which can be
// achieved by canceling the main context of the DRA driver.
//
// Fatal errors can be distinguished from recoverable errors via
// errors.Is(err, kubeletplugin.ErrRecoverable)
//
// This is a mandatory method because drivers should check for errors
// which won't get resolved by retrying and then fail or change the
// slices that they are trying to publish:
// - dropped fields (see [resourceslice.DroppedFieldsError])
// - validation errors (see [apierrors.IsInvalid])
ErrorHandler(ctx context.Context, err error, msg string)
HandleError(ctx context.Context, err error, msg string)
}
// ErrRecoverable distinguishes recoverable errors from those errors which are fatal
// and should cause the process to exit. Use with:
//
// errors.Is(err, ErrRecoverable)
var ErrRecoverable = errors.New("recoverable error")
type recoverableError struct {
error
}
var _ error = recoverableError{}
func (err recoverableError) Is(other error) bool { return other == ErrRecoverable }
func (err recoverableError) Unwrap() error { return err.error }
// PrepareResult contains the result of preparing one particular ResourceClaim.
type PrepareResult struct {
// Err, if non-nil, describes a problem that occurred while preparing
@@ -275,7 +298,7 @@ func PluginSocket(name string) Option {
}
}
// PluginListener configures how to create the registrar socket.
// PluginListener configures how to create the DRA service socket.
// The default is to remove the file if it exists and to then
// create a socket.
//
@@ -589,12 +612,22 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
if o.draService {
// Run the node plugin gRPC server first to ensure that it is ready.
pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
if o.nodeV1beta1 {
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d})
}
})
pluginServer, err := startGRPCServer(
klog.LoggerWithName(logger, "dra"),
o.grpcVerbosity,
o.unaryInterceptors,
o.streamInterceptors,
draEndpoint,
func(ctx context.Context, err error) {
plugin.HandleError(ctx, err, "DRA gRPC server failed")
},
func(grpcServer *grpc.Server) {
if o.nodeV1beta1 {
logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service")
drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d})
}
},
)
if err != nil {
return nil, fmt.Errorf("start DRA service: %w", err)
}
@@ -603,7 +636,19 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
if o.registrationService {
// Now make it available to kubelet.
registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint)
registrar, err := startRegistrar(
klog.LoggerWithName(logger, "registrar"),
o.grpcVerbosity,
o.unaryInterceptors,
o.streamInterceptors,
o.driverName,
supportedServices,
draEndpoint.path(),
o.pluginRegistrationEndpoint,
func(ctx context.Context, err error) {
plugin.HandleError(ctx, err, "registrar gRPC server failed")
},
)
if err != nil {
return nil, fmt.Errorf("start registrar: %w", err)
}
@@ -690,11 +735,17 @@ func (d *Helper) PublishResources(_ context.Context, resources resourceslice.Dri
var err error
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx,
resourceslice.Options{
DriverName: d.driverName,
KubeClient: d.kubeClient,
Owner: &owner,
Resources: driverResources,
ErrorHandler: d.plugin.ErrorHandler,
DriverName: d.driverName,
KubeClient: d.kubeClient,
Owner: &owner,
Resources: driverResources,
ErrorHandler: func(ctx context.Context, err error, msg string) {
// ResourceSlice publishing errors like dropped fields or
// invalid spec are not going to get resolved by retrying,
// but neither is restarting the process going to help
// -> all errors are recoverable.
d.plugin.HandleError(ctx, recoverableError{error: err}, msg)
},
}); err != nil {
return fmt.Errorf("start ResourceSlice controller: %w", err)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package kubeletplugin
import (
"context"
"fmt"
"google.golang.org/grpc"
@@ -30,7 +31,7 @@ type nodeRegistrar struct {
}
// startRegistrar returns a running instance.
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, draEndpointPath string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, draEndpointPath string, pluginRegistrationEndpoint endpoint, errHandler func(ctx context.Context, err error)) (*nodeRegistrar, error) {
n := &nodeRegistrar{
registrationServer: registrationServer{
driverName: driverName,
@@ -38,7 +39,7 @@ func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.U
supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin").
},
}
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, errHandler, func(grpcServer *grpc.Server) {
registerapi.RegisterRegistrationServer(grpcServer, n)
})
if err != nil {

View File

@@ -39,7 +39,10 @@ type registerService func(s *grpc.Server)
// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
// which handles requests for arbitrary services.
func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
//
// errHandler gets invoked in the background when errors are encountered there.
// They are fatal and should cause the process to exit.
func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, errHandler func(ctx context.Context, err error), services ...registerService) (*grpcServer, error) {
ctx := klog.NewContext(context.Background(), logger)
s := &grpcServer{
@@ -78,13 +81,13 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []
defer s.wg.Done()
err := s.server.Serve(listener)
if err != nil {
logger.Error(err, "GRPC server failed")
errHandler(ctx, err)
} else {
logger.V(3).Info("GRPC server terminated gracefully")
logger.V(3).Info("GRPC server terminated gracefully", "endpoint", endpoint.path())
}
}()
logger.V(3).Info("GRPC server started")
logger.V(3).Info("GRPC server started", "endpoint", endpoint.path())
return s, nil
}

View File

@@ -70,6 +70,10 @@ type ExamplePlugin struct {
unprepareResourcesFailure error
failUnprepareResourcesMutex sync.Mutex
// cancelMainContext is used to cancel an upper-level context.
// It's called from HandleError if set.
cancelMainContext context.CancelCauseFunc
}
type GRPCCall struct {
@@ -119,15 +123,16 @@ type FileOperations struct {
// file does not exist.
Remove func(name string) error
// ErrorHandler is an optional callback for ResourceSlice publishing problems.
ErrorHandler func(ctx context.Context, err error, msg string)
// HandleError is an optional callback for ResourceSlice publishing problems.
HandleError func(ctx context.Context, err error, msg string)
// DriverResources provides the information that the driver will use to
// construct the ResourceSlices that it will publish.
DriverResources *resourceslice.DriverResources
}
// StartPlugin sets up the servers that are necessary for a DRA kubelet plugin.
func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kubernetes.Interface, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) {
func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kubernetes.Interface, nodeName string, fileOps FileOperations, opts ...any) (*ExamplePlugin, error) {
logger := klog.FromContext(ctx)
if fileOps.Create == nil {
@@ -143,25 +148,44 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
return nil
}
}
ex := &ExamplePlugin{
stopCh: ctx.Done(),
logger: logger,
resourceClient: draclient.New(kubeClient),
fileOps: fileOps,
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
prepared: make(map[ClaimID][]kubeletplugin.Device),
}
opts = append(opts,
publicOpts := []kubeletplugin.Option{
kubeletplugin.DriverName(driverName),
kubeletplugin.NodeName(nodeName),
kubeletplugin.KubeClient(kubeClient),
}
testOpts := &options{}
for _, opt := range opts {
switch typedOpt := opt.(type) {
case TestOption:
if err := typedOpt(testOpts); err != nil {
return nil, fmt.Errorf("apply test option: %w", err)
}
case kubeletplugin.Option:
publicOpts = append(publicOpts, typedOpt)
default:
return nil, fmt.Errorf("unexpected option type %T", opt)
}
}
ex := &ExamplePlugin{
stopCh: ctx.Done(),
logger: logger,
resourceClient: draclient.New(kubeClient),
fileOps: fileOps,
cdiDir: cdiDir,
driverName: driverName,
nodeName: nodeName,
prepared: make(map[ClaimID][]kubeletplugin.Device),
cancelMainContext: testOpts.cancelMainContext,
}
publicOpts = append(publicOpts,
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
)
d, err := kubeletplugin.Start(ctx, ex, opts...)
d, err := kubeletplugin.Start(ctx, ex, publicOpts...)
if err != nil {
return nil, fmt.Errorf("start kubelet plugin: %w", err)
}
@@ -189,12 +213,15 @@ func (ex *ExamplePlugin) IsRegistered() bool {
return status.PluginRegistered
}
func (ex *ExamplePlugin) ErrorHandler(ctx context.Context, err error, msg string) {
if ex.fileOps.ErrorHandler != nil {
ex.fileOps.ErrorHandler(ctx, err, msg)
func (ex *ExamplePlugin) HandleError(ctx context.Context, err error, msg string) {
if ex.fileOps.HandleError != nil {
ex.fileOps.HandleError(ctx, err, msg)
return
}
utilruntime.HandleErrorWithContext(ctx, err, msg)
if ex.cancelMainContext != nil {
ex.cancelMainContext(err)
}
}
// BlockNodePrepareResources locks blockPrepareResourcesMutex and returns unlocking function for it

View File

@@ -0,0 +1,40 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import "context"
type options struct {
// cancelMainContext is used to cancel upper level
// context when a background error occurs.
// It's called by HandleError if set.
cancelMainContext context.CancelCauseFunc
}
// TestOption implements the functional options pattern
// dedicated for usage in testing code.
type TestOption func(o *options) error
// CancelMainContext sets a context cancellation function for
// the plugin. This function is called by HandleError
// when an error occurs in the background.
func CancelMainContext(cancel context.CancelCauseFunc) TestOption {
return func(o *options) error {
o.cancelMainContext = cancel
return nil
}
}

View File

@@ -487,7 +487,7 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
}
return d.removeFile(&pod, name)
},
ErrorHandler: func(ctx context.Context, err error, msg string) {
HandleError: func(ctx context.Context, err error, msg string) {
// Record a failure, but don't kill the background goroutine.
defer ginkgo.GinkgoRecover()
// During tests when canceling the context it is possible to get all kinds of

View File

@@ -26,12 +26,15 @@ package e2enode
import (
"context"
"errors"
"fmt"
"net"
"os"
"path"
"regexp"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/onsi/ginkgo/v2"
@@ -381,17 +384,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
}).WithTimeout(retryTestTimeout).Should(gomega.Equal(calls))
})
functionalListenAfterRegistration := func(ctx context.Context, socketPath string) {
functionalListenAfterRegistration := func(ctx context.Context, datadir string, opts ...any) {
nodeName := getNodeName(ctx, f)
ginkgo.By("start DRA registrar")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, socketPath)
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...)
ginkgo.By("wait for registration to complete")
gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
ginkgo.By("start DRA plugin service")
draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath)
draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...)
pod := createTestObjects(ctx, f.ClientSet, nodeName, f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName})
@@ -405,20 +408,25 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
ginkgo.DescribeTable("must be functional when plugin starts to listen on a service socket after registration",
functionalListenAfterRegistration,
ginkgo.Entry("2 sockets", ""),
ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")),
ginkgo.Entry(
"1 common socket",
kubeletplugin.KubeletRegistryDir,
kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir),
kubeletplugin.PluginSocket(driverName+"-common.sock"),
),
)
functionalAfterServiceReconnect := func(ctx context.Context, socketPath string) {
functionalAfterServiceReconnect := func(ctx context.Context, datadir string, opts ...any) {
nodeName := getNodeName(ctx, f)
ginkgo.By("start DRA registrar")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, socketPath)
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...)
ginkgo.By("wait for registration to complete")
gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
ginkgo.By("start DRA plugin service")
draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath)
draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...)
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drasleeppod" /* enables sleeping */, false /* pod is deleted below */, []string{driverName})
@@ -436,7 +444,7 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
gomega.Eventually(ctx, listResources(f.ClientSet)).Should(gomega.BeEmpty(), "ResourceSlices without plugin")
ginkgo.By("restarting plugin")
draService = newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath)
draService = newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...)
ginkgo.By("stopping pod")
err = f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
@@ -446,7 +454,59 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
ginkgo.DescribeTable("must be functional after service reconnect",
functionalAfterServiceReconnect,
ginkgo.Entry("2 sockets", ""),
ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")),
ginkgo.Entry(
"1 common socket",
kubeletplugin.KubeletRegistryDir,
kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir),
kubeletplugin.PluginSocket(driverName+"-common.sock"),
),
)
failOnClosedListener := func(
ctx context.Context,
service func(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, datadir string, opts ...any) *testdriver.ExamplePlugin,
listenerOptionFun func(listen func(ctx context.Context, path string) (net.Listener, error)) kubeletplugin.Option,
) {
ginkgo.By("create a custom listener")
var listener net.Listener
errorMsg := "simulated listener failure"
getListener := func(ctx context.Context, socketPath string) (net.Listener, error) {
listener = newErrorOnCloseListener(errors.New(errorMsg))
return listener, nil
}
ginkgo.By("create a context with a cancel function")
tCtx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
ginkgo.By("start service")
service(
ctx,
f.ClientSet,
getNodeName(ctx, f),
driverName,
"",
listenerOptionFun(getListener),
testdriver.CancelMainContext(cancel),
)
ginkgo.By("close listener to make the grpc.Server.Serve() fail")
framework.ExpectNoError(listener.Close())
ginkgo.By("check that the context is canceled with an expected error and cause")
gomega.Eventually(tCtx.Err).Should(gomega.MatchError(gomega.ContainSubstring("context canceled")), "Context should be canceled by the error handler")
gomega.Expect(context.Cause(tCtx).Error()).To(gomega.ContainSubstring(errorMsg), "Context should be canceled with the expected cause")
}
// The wrappedNewRegistrar function is used to create a new registrar
// with the same signature as the newDRAService function, so that it can be
// used in the DescribeTable.
wrappedNewRegistrar := func(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, datadir string, opts ...any) *testdriver.ExamplePlugin {
return newRegistrar(ctx, clientSet, nodeName, driverName, opts...)
}
ginkgo.DescribeTable("must report gRPC serving error",
failOnClosedListener,
ginkgo.Entry("for registrar", wrappedNewRegistrar, kubeletplugin.RegistrarListener),
ginkgo.Entry("for DRA service", newDRAService, kubeletplugin.PluginListener),
)
})
@@ -664,17 +724,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
gomega.Consistently(ctx, listResources(f.ClientSet)).WithTimeout(5*time.Second).Should(gomega.BeEmpty(), "ResourceSlices with no plugin")
})
removedIfPluginStopsAfterRegistration := func(ctx context.Context, socketPath string) {
removedIfPluginStopsAfterRegistration := func(ctx context.Context, datadir string, opts ...any) {
nodeName := getNodeName(ctx, f)
ginkgo.By("start DRA registrar")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, socketPath)
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...)
ginkgo.By("wait for registration to complete")
gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
ginkgo.By("start DRA plugin service")
kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath)
kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...)
ginkgo.By("wait for ResourceSlice to be created by plugin")
matchNode := gomega.ConsistOf(matchResourcesByNodeName(nodeName))
@@ -691,14 +751,19 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
ginkgo.DescribeTable("must be removed if plugin stops after registration",
removedIfPluginStopsAfterRegistration,
ginkgo.Entry("2 sockets", ""),
ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")),
ginkgo.Entry(
"1 common socket",
kubeletplugin.KubeletRegistryDir,
kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir),
kubeletplugin.PluginSocket(driverName+"-common.sock"),
),
)
f.It("must be removed if plugin is unresponsive after registration", func(ctx context.Context) {
nodeName := getNodeName(ctx, f)
ginkgo.By("start DRA registrar")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, "")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName)
ginkgo.By("wait for registration to complete")
gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
@@ -711,17 +776,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
gomega.Consistently(ctx, listResources(f.ClientSet)).WithTimeout(5*time.Second).Should(gomega.BeEmpty(), "ResourceSlices without plugin")
})
testRemoveIfRestartsQuickly := func(ctx context.Context, socketPath string) {
testRemoveIfRestartsQuickly := func(ctx context.Context, datadir string, opts ...any) {
nodeName := getNodeName(ctx, f)
ginkgo.By("start DRA registrar")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, "")
registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...)
ginkgo.By("wait for registration to complete")
gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered)
ginkgo.By("start DRA plugin service")
kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, "")
kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...)
ginkgo.By("wait for ResourceSlice to be created by plugin")
matchNode := gomega.ConsistOf(matchResourcesByNodeName(nodeName))
@@ -739,7 +804,7 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
time.Sleep(5 * time.Second)
ginkgo.By("restarting plugin")
newDRAService(ctx, f.ClientSet, nodeName, driverName, "")
newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...)
ginkgo.By("ensuring unchanged ResourceSlices")
gomega.Consistently(ctx, listResources(f.ClientSet)).WithTimeout(time.Minute).Should(gomega.Equal(slices), "ResourceSlices")
@@ -747,7 +812,12 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami
ginkgo.DescribeTable("must not be removed if plugin restarts quickly enough",
testRemoveIfRestartsQuickly,
ginkgo.Entry("2 sockets", ""),
ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")),
ginkgo.Entry(
"1 common socket",
kubeletplugin.KubeletRegistryDir,
kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir),
kubeletplugin.PluginSocket(driverName+"-common.sock"),
),
)
})
})
@@ -804,18 +874,11 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN
}
// newRegistrar starts a registrar for the specified DRA driver, without the DRA gRPC service.
func newRegistrar(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, serviceSocketPath string) *testdriver.ExamplePlugin {
func newRegistrar(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName string, opts ...any) *testdriver.ExamplePlugin {
ginkgo.By("start only Kubelet plugin registrar")
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin registrar "+driverName))
ctx = klog.NewContext(ctx, logger)
opts := []kubeletplugin.Option{
kubeletplugin.DRAService(false),
}
if serviceSocketPath != "" {
dir, file := path.Split(serviceSocketPath)
opts = append(opts, kubeletplugin.PluginDataDirectoryPath(dir))
opts = append(opts, kubeletplugin.PluginSocket(file))
}
opts = append(opts, kubeletplugin.DRAService(false))
registrar, err := testdriver.StartPlugin(
ctx,
cdiDir,
@@ -829,31 +892,35 @@ func newRegistrar(ctx context.Context, clientSet kubernetes.Interface, nodeName,
return registrar
}
// newDRAService starts the DRA gRPC service for the specified DRA driver, without the registrar.
func newDRAService(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, socketPath string) *testdriver.ExamplePlugin {
// newDRAService starts the DRA gRPC service for the specified node and driver.
// It ensures that necessary directories exist, starts the plugin and registers
// cleanup functions to remove created resources after the test.
// Parameters:
// - ctx: The context for controlling cancellation and logging.
// - clientSet: Kubernetes client interface for interacting with the cluster.
// - nodeName: The name of the node where the plugin will run.
// - driverName: The name of the DRA driver.
// - datadir: The directory for the DRA socket and state files.
// Must match what is specified via [kubeletplugin.PluginDataDirectoryPath].
// May be empty if that option is not used, then the default path is used.
// - opts: Additional options for plugin configuration.
//
// Returns:
// - A pointer to the started ExamplePlugin instance.
func newDRAService(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, datadir string, opts ...any) *testdriver.ExamplePlugin {
ginkgo.By("start only Kubelet plugin")
logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin "+driverName), "node", nodeName)
ctx = klog.NewContext(ctx, logger)
opts = append(opts, kubeletplugin.RegistrationService(false))
// Ensure that directories exist, creating them if necessary. We want
// to know early if there is a setup problem that would prevent
// creating those directories.
err := os.MkdirAll(cdiDir, os.FileMode(0750))
framework.ExpectNoError(err, "create CDI directory")
opts := []kubeletplugin.Option{
kubeletplugin.RegistrationService(false),
}
var datadir string
if socketPath == "" {
// The default, not set as option.
if datadir == "" {
datadir = path.Join(kubeletplugin.KubeletPluginsDir, driverName)
} else {
dir, file := path.Split(socketPath)
opts = append(opts,
kubeletplugin.PluginDataDirectoryPath(dir),
kubeletplugin.PluginSocket(file),
)
datadir = dir
}
err = os.MkdirAll(datadir, 0750)
framework.ExpectNoError(err, "create DRA socket directory")
@@ -1074,3 +1141,44 @@ func listAndStoreResources(client kubernetes.Interface, lastSlices *[]resourceap
func matchResourcesByNodeName(nodeName string) types.GomegaMatcher {
return gomega.HaveField("Spec.NodeName", gomega.Equal(nodeName))
}
// errorOnCloseListener is a mock net.Listener that blocks on Accept()
// until Close() is called, at which point Accept() returns a predefined error.
//
// This is useful in tests or simulated environments to trigger grpc.Server.Serve()
// to exit cleanly with a known error, without needing real network activity.
type errorOnCloseListener struct {
ch chan struct{}
closed atomic.Bool
err error
}
// newErrorOnCloseListener creates a new listener that causes Accept to fail
// with the given error after Close is called.
func newErrorOnCloseListener(err error) *errorOnCloseListener {
return &errorOnCloseListener{
ch: make(chan struct{}),
err: err,
}
}
// Accept blocks until Close is called, then returns the configured error.
func (l *errorOnCloseListener) Accept() (net.Conn, error) {
<-l.ch
return nil, l.err
}
// Close unblocks Accept and causes it to return the configured error.
// It is safe to call multiple times.
func (l *errorOnCloseListener) Close() error {
if l.closed.Swap(true) {
return nil // already closed
}
close(l.ch)
return nil
}
// Addr returns a dummy Unix address. Required to satisfy net.Listener.
func (*errorOnCloseListener) Addr() net.Addr {
return &net.UnixAddr{Name: "errorOnCloseListener", Net: "unix"}
}