diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 3b1f1353fc6..51aa219d299 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -119,20 +119,43 @@ type DRAPlugin interface { // must have an entry in the response, even if that entry is nil. UnprepareResourceClaims(ctx context.Context, claims []NamespacedObject) (result map[types.UID]error, err error) - // ErrorHandler gets called for each error encountered while publishing - // ResourceSlices. See [resourceslice.Options.ErrorHandler] for details. + // HandleError gets called for errors encountered in the background, + // for example while publishing ResourceSlices. + // See [resourceslice.Options.ErrorHandler] for details on that. // - // A simple implementation is to only log with k8s.io/apimachinery/pkg/util/runtime.HandleErrorWithContext: - // runtime.HandleErrorWithContext(ctx, err, msg) + // The recommended implementation is to log with + // runtime.HandleErrorWithContext(ctx, err, msg) (from + // k8s.io/apimachinery/pkg/util/runtime.HandleErrorWithContext) + // and then to exit the process if the error is fatal. + // Ideally the process should shut down gracefully, which can be + // achieved by canceling the main context of the DRA driver. + // + // Fatal errors can be distinguished from recoverable errors via + // errors.Is(err, kubeletplugin.ErrRecoverable) // // This is a mandatory method because drivers should check for errors // which won't get resolved by retrying and then fail or change the // slices that they are trying to publish: // - dropped fields (see [resourceslice.DroppedFieldsError]) // - validation errors (see [apierrors.IsInvalid]) - ErrorHandler(ctx context.Context, err error, msg string) + HandleError(ctx context.Context, err error, msg string) } +// ErrRecoverable distinguishes recoverable errors from those errors which are fatal +// and should cause the process to exit. Use with: +// +// errors.Is(err, ErrRecoverable) +var ErrRecoverable = errors.New("recoverable error") + +type recoverableError struct { + error +} + +var _ error = recoverableError{} + +func (err recoverableError) Is(other error) bool { return other == ErrRecoverable } +func (err recoverableError) Unwrap() error { return err.error } + // PrepareResult contains the result of preparing one particular ResourceClaim. type PrepareResult struct { // Err, if non-nil, describes a problem that occurred while preparing @@ -275,7 +298,7 @@ func PluginSocket(name string) Option { } } -// PluginListener configures how to create the registrar socket. +// PluginListener configures how to create the DRA service socket. // The default is to remove the file if it exists and to then // create a socket. // @@ -589,12 +612,22 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe if o.draService { // Run the node plugin gRPC server first to ensure that it is ready. - pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) { - if o.nodeV1beta1 { - logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") - drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d}) - } - }) + pluginServer, err := startGRPCServer( + klog.LoggerWithName(logger, "dra"), + o.grpcVerbosity, + o.unaryInterceptors, + o.streamInterceptors, + draEndpoint, + func(ctx context.Context, err error) { + plugin.HandleError(ctx, err, "DRA gRPC server failed") + }, + func(grpcServer *grpc.Server) { + if o.nodeV1beta1 { + logger.V(5).Info("registering v1beta1.DRAPlugin gRPC service") + drapb.RegisterDRAPluginServer(grpcServer, &nodePluginImplementation{Helper: d}) + } + }, + ) if err != nil { return nil, fmt.Errorf("start DRA service: %w", err) } @@ -603,7 +636,19 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe if o.registrationService { // Now make it available to kubelet. - registrar, err := startRegistrar(klog.LoggerWithName(logger, "registrar"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, o.driverName, supportedServices, draEndpoint.path(), o.pluginRegistrationEndpoint) + registrar, err := startRegistrar( + klog.LoggerWithName(logger, "registrar"), + o.grpcVerbosity, + o.unaryInterceptors, + o.streamInterceptors, + o.driverName, + supportedServices, + draEndpoint.path(), + o.pluginRegistrationEndpoint, + func(ctx context.Context, err error) { + plugin.HandleError(ctx, err, "registrar gRPC server failed") + }, + ) if err != nil { return nil, fmt.Errorf("start registrar: %w", err) } @@ -690,11 +735,17 @@ func (d *Helper) PublishResources(_ context.Context, resources resourceslice.Dri var err error if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, resourceslice.Options{ - DriverName: d.driverName, - KubeClient: d.kubeClient, - Owner: &owner, - Resources: driverResources, - ErrorHandler: d.plugin.ErrorHandler, + DriverName: d.driverName, + KubeClient: d.kubeClient, + Owner: &owner, + Resources: driverResources, + ErrorHandler: func(ctx context.Context, err error, msg string) { + // ResourceSlice publishing errors like dropped fields or + // invalid spec are not going to get resolved by retrying, + // but neither is restarting the process going to help + // -> all errors are recoverable. + d.plugin.HandleError(ctx, recoverableError{error: err}, msg) + }, }); err != nil { return fmt.Errorf("start ResourceSlice controller: %w", err) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go index 9c60a8aaadc..9630699ea04 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/noderegistrar.go @@ -17,6 +17,7 @@ limitations under the License. package kubeletplugin import ( + "context" "fmt" "google.golang.org/grpc" @@ -30,7 +31,7 @@ type nodeRegistrar struct { } // startRegistrar returns a running instance. -func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, draEndpointPath string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) { +func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, supportedServices []string, draEndpointPath string, pluginRegistrationEndpoint endpoint, errHandler func(ctx context.Context, err error)) (*nodeRegistrar, error) { n := &nodeRegistrar{ registrationServer: registrationServer{ driverName: driverName, @@ -38,7 +39,7 @@ func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.U supportedVersions: supportedServices, // DRA uses this field to describe provided services (e.g. "v1beta1.DRAPlugin"). }, } - s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) { + s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, errHandler, func(grpcServer *grpc.Server) { registerapi.RegisterRegistrationServer(grpcServer, n) }) if err != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go index cae3b1668b7..49e9dab4fa0 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/nonblockinggrpcserver.go @@ -39,7 +39,10 @@ type registerService func(s *grpc.Server) // startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine // which handles requests for arbitrary services. -func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) { +// +// errHandler gets invoked in the background when errors are encountered there. +// They are fatal and should cause the process to exit. +func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, errHandler func(ctx context.Context, err error), services ...registerService) (*grpcServer, error) { ctx := klog.NewContext(context.Background(), logger) s := &grpcServer{ @@ -78,13 +81,13 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, unaryInterceptors [] defer s.wg.Done() err := s.server.Serve(listener) if err != nil { - logger.Error(err, "GRPC server failed") + errHandler(ctx, err) } else { - logger.V(3).Info("GRPC server terminated gracefully") + logger.V(3).Info("GRPC server terminated gracefully", "endpoint", endpoint.path()) } }() - logger.V(3).Info("GRPC server started") + logger.V(3).Info("GRPC server started", "endpoint", endpoint.path()) return s, nil } diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 67dfe7ec452..8feccf6f94b 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -70,6 +70,10 @@ type ExamplePlugin struct { unprepareResourcesFailure error failUnprepareResourcesMutex sync.Mutex + + // cancelMainContext is used to cancel an upper-level context. + // It's called from HandleError if set. + cancelMainContext context.CancelCauseFunc } type GRPCCall struct { @@ -119,15 +123,16 @@ type FileOperations struct { // file does not exist. Remove func(name string) error - // ErrorHandler is an optional callback for ResourceSlice publishing problems. - ErrorHandler func(ctx context.Context, err error, msg string) + // HandleError is an optional callback for ResourceSlice publishing problems. + HandleError func(ctx context.Context, err error, msg string) + // DriverResources provides the information that the driver will use to // construct the ResourceSlices that it will publish. DriverResources *resourceslice.DriverResources } // StartPlugin sets up the servers that are necessary for a DRA kubelet plugin. -func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kubernetes.Interface, nodeName string, fileOps FileOperations, opts ...kubeletplugin.Option) (*ExamplePlugin, error) { +func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kubernetes.Interface, nodeName string, fileOps FileOperations, opts ...any) (*ExamplePlugin, error) { logger := klog.FromContext(ctx) if fileOps.Create == nil { @@ -143,25 +148,44 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube return nil } } - ex := &ExamplePlugin{ - stopCh: ctx.Done(), - logger: logger, - resourceClient: draclient.New(kubeClient), - fileOps: fileOps, - cdiDir: cdiDir, - driverName: driverName, - nodeName: nodeName, - prepared: make(map[ClaimID][]kubeletplugin.Device), - } - opts = append(opts, + publicOpts := []kubeletplugin.Option{ kubeletplugin.DriverName(driverName), kubeletplugin.NodeName(nodeName), kubeletplugin.KubeClient(kubeClient), + } + + testOpts := &options{} + for _, opt := range opts { + switch typedOpt := opt.(type) { + case TestOption: + if err := typedOpt(testOpts); err != nil { + return nil, fmt.Errorf("apply test option: %w", err) + } + case kubeletplugin.Option: + publicOpts = append(publicOpts, typedOpt) + default: + return nil, fmt.Errorf("unexpected option type %T", opt) + } + } + + ex := &ExamplePlugin{ + stopCh: ctx.Done(), + logger: logger, + resourceClient: draclient.New(kubeClient), + fileOps: fileOps, + cdiDir: cdiDir, + driverName: driverName, + nodeName: nodeName, + prepared: make(map[ClaimID][]kubeletplugin.Device), + cancelMainContext: testOpts.cancelMainContext, + } + + publicOpts = append(publicOpts, kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), ) - d, err := kubeletplugin.Start(ctx, ex, opts...) + d, err := kubeletplugin.Start(ctx, ex, publicOpts...) if err != nil { return nil, fmt.Errorf("start kubelet plugin: %w", err) } @@ -189,12 +213,15 @@ func (ex *ExamplePlugin) IsRegistered() bool { return status.PluginRegistered } -func (ex *ExamplePlugin) ErrorHandler(ctx context.Context, err error, msg string) { - if ex.fileOps.ErrorHandler != nil { - ex.fileOps.ErrorHandler(ctx, err, msg) +func (ex *ExamplePlugin) HandleError(ctx context.Context, err error, msg string) { + if ex.fileOps.HandleError != nil { + ex.fileOps.HandleError(ctx, err, msg) return } utilruntime.HandleErrorWithContext(ctx, err, msg) + if ex.cancelMainContext != nil { + ex.cancelMainContext(err) + } } // BlockNodePrepareResources locks blockPrepareResourcesMutex and returns unlocking function for it diff --git a/test/e2e/dra/test-driver/app/options.go b/test/e2e/dra/test-driver/app/options.go new file mode 100644 index 00000000000..b0518ef35a3 --- /dev/null +++ b/test/e2e/dra/test-driver/app/options.go @@ -0,0 +1,40 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import "context" + +type options struct { + // cancelMainContext is used to cancel upper level + // context when a background error occurs. + // It's called by HandleError if set. + cancelMainContext context.CancelCauseFunc +} + +// TestOption implements the functional options pattern +// dedicated for usage in testing code. +type TestOption func(o *options) error + +// CancelMainContext sets a context cancellation function for +// the plugin. This function is called by HandleError +// when an error occurs in the background. +func CancelMainContext(cancel context.CancelCauseFunc) TestOption { + return func(o *options) error { + o.cancelMainContext = cancel + return nil + } +} diff --git a/test/e2e/dra/utils/deploy.go b/test/e2e/dra/utils/deploy.go index ecf45b367a2..f9d40ea2f09 100644 --- a/test/e2e/dra/utils/deploy.go +++ b/test/e2e/dra/utils/deploy.go @@ -487,7 +487,7 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr } return d.removeFile(&pod, name) }, - ErrorHandler: func(ctx context.Context, err error, msg string) { + HandleError: func(ctx context.Context, err error, msg string) { // Record a failure, but don't kill the background goroutine. defer ginkgo.GinkgoRecover() // During tests when canceling the context it is possible to get all kinds of diff --git a/test/e2e_node/dra_test.go b/test/e2e_node/dra_test.go index 9b5051f6fa7..bcc25f186d8 100644 --- a/test/e2e_node/dra_test.go +++ b/test/e2e_node/dra_test.go @@ -26,12 +26,15 @@ package e2enode import ( "context" + "errors" "fmt" + "net" "os" "path" "regexp" "sort" "strings" + "sync/atomic" "time" "github.com/onsi/ginkgo/v2" @@ -381,17 +384,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami }).WithTimeout(retryTestTimeout).Should(gomega.Equal(calls)) }) - functionalListenAfterRegistration := func(ctx context.Context, socketPath string) { + functionalListenAfterRegistration := func(ctx context.Context, datadir string, opts ...any) { nodeName := getNodeName(ctx, f) ginkgo.By("start DRA registrar") - registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, socketPath) + registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...) ginkgo.By("wait for registration to complete") gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) ginkgo.By("start DRA plugin service") - draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath) + draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...) pod := createTestObjects(ctx, f.ClientSet, nodeName, f.Namespace.Name, "draclass", "external-claim", "drapod", false, []string{driverName}) @@ -405,20 +408,25 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami ginkgo.DescribeTable("must be functional when plugin starts to listen on a service socket after registration", functionalListenAfterRegistration, ginkgo.Entry("2 sockets", ""), - ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")), + ginkgo.Entry( + "1 common socket", + kubeletplugin.KubeletRegistryDir, + kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir), + kubeletplugin.PluginSocket(driverName+"-common.sock"), + ), ) - functionalAfterServiceReconnect := func(ctx context.Context, socketPath string) { + functionalAfterServiceReconnect := func(ctx context.Context, datadir string, opts ...any) { nodeName := getNodeName(ctx, f) ginkgo.By("start DRA registrar") - registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, socketPath) + registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...) ginkgo.By("wait for registration to complete") gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) ginkgo.By("start DRA plugin service") - draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath) + draService := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...) pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drasleeppod" /* enables sleeping */, false /* pod is deleted below */, []string{driverName}) @@ -436,7 +444,7 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami gomega.Eventually(ctx, listResources(f.ClientSet)).Should(gomega.BeEmpty(), "ResourceSlices without plugin") ginkgo.By("restarting plugin") - draService = newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath) + draService = newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...) ginkgo.By("stopping pod") err = f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) @@ -446,7 +454,59 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami ginkgo.DescribeTable("must be functional after service reconnect", functionalAfterServiceReconnect, ginkgo.Entry("2 sockets", ""), - ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")), + ginkgo.Entry( + "1 common socket", + kubeletplugin.KubeletRegistryDir, + kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir), + kubeletplugin.PluginSocket(driverName+"-common.sock"), + ), + ) + + failOnClosedListener := func( + ctx context.Context, + service func(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, datadir string, opts ...any) *testdriver.ExamplePlugin, + listenerOptionFun func(listen func(ctx context.Context, path string) (net.Listener, error)) kubeletplugin.Option, + ) { + ginkgo.By("create a custom listener") + var listener net.Listener + errorMsg := "simulated listener failure" + getListener := func(ctx context.Context, socketPath string) (net.Listener, error) { + listener = newErrorOnCloseListener(errors.New(errorMsg)) + return listener, nil + } + + ginkgo.By("create a context with a cancel function") + tCtx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + ginkgo.By("start service") + service( + ctx, + f.ClientSet, + getNodeName(ctx, f), + driverName, + "", + listenerOptionFun(getListener), + testdriver.CancelMainContext(cancel), + ) + + ginkgo.By("close listener to make the grpc.Server.Serve() fail") + framework.ExpectNoError(listener.Close()) + + ginkgo.By("check that the context is canceled with an expected error and cause") + gomega.Eventually(tCtx.Err).Should(gomega.MatchError(gomega.ContainSubstring("context canceled")), "Context should be canceled by the error handler") + gomega.Expect(context.Cause(tCtx).Error()).To(gomega.ContainSubstring(errorMsg), "Context should be canceled with the expected cause") + } + // The wrappedNewRegistrar function is used to create a new registrar + // with the same signature as the newDRAService function, so that it can be + // used in the DescribeTable. + wrappedNewRegistrar := func(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, datadir string, opts ...any) *testdriver.ExamplePlugin { + return newRegistrar(ctx, clientSet, nodeName, driverName, opts...) + } + ginkgo.DescribeTable("must report gRPC serving error", + failOnClosedListener, + ginkgo.Entry("for registrar", wrappedNewRegistrar, kubeletplugin.RegistrarListener), + ginkgo.Entry("for DRA service", newDRAService, kubeletplugin.PluginListener), ) }) @@ -664,17 +724,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami gomega.Consistently(ctx, listResources(f.ClientSet)).WithTimeout(5*time.Second).Should(gomega.BeEmpty(), "ResourceSlices with no plugin") }) - removedIfPluginStopsAfterRegistration := func(ctx context.Context, socketPath string) { + removedIfPluginStopsAfterRegistration := func(ctx context.Context, datadir string, opts ...any) { nodeName := getNodeName(ctx, f) ginkgo.By("start DRA registrar") - registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, socketPath) + registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...) ginkgo.By("wait for registration to complete") gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) ginkgo.By("start DRA plugin service") - kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, socketPath) + kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...) ginkgo.By("wait for ResourceSlice to be created by plugin") matchNode := gomega.ConsistOf(matchResourcesByNodeName(nodeName)) @@ -691,14 +751,19 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami ginkgo.DescribeTable("must be removed if plugin stops after registration", removedIfPluginStopsAfterRegistration, ginkgo.Entry("2 sockets", ""), - ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")), + ginkgo.Entry( + "1 common socket", + kubeletplugin.KubeletRegistryDir, + kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir), + kubeletplugin.PluginSocket(driverName+"-common.sock"), + ), ) f.It("must be removed if plugin is unresponsive after registration", func(ctx context.Context) { nodeName := getNodeName(ctx, f) ginkgo.By("start DRA registrar") - registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, "") + registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName) ginkgo.By("wait for registration to complete") gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) @@ -711,17 +776,17 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami gomega.Consistently(ctx, listResources(f.ClientSet)).WithTimeout(5*time.Second).Should(gomega.BeEmpty(), "ResourceSlices without plugin") }) - testRemoveIfRestartsQuickly := func(ctx context.Context, socketPath string) { + testRemoveIfRestartsQuickly := func(ctx context.Context, datadir string, opts ...any) { nodeName := getNodeName(ctx, f) ginkgo.By("start DRA registrar") - registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, "") + registrar := newRegistrar(ctx, f.ClientSet, nodeName, driverName, opts...) ginkgo.By("wait for registration to complete") gomega.Eventually(registrar.GetGRPCCalls).WithTimeout(pluginRegistrationTimeout).Should(testdrivergomega.BeRegistered) ginkgo.By("start DRA plugin service") - kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, "") + kubeletPlugin := newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...) ginkgo.By("wait for ResourceSlice to be created by plugin") matchNode := gomega.ConsistOf(matchResourcesByNodeName(nodeName)) @@ -739,7 +804,7 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami time.Sleep(5 * time.Second) ginkgo.By("restarting plugin") - newDRAService(ctx, f.ClientSet, nodeName, driverName, "") + newDRAService(ctx, f.ClientSet, nodeName, driverName, datadir, opts...) ginkgo.By("ensuring unchanged ResourceSlices") gomega.Consistently(ctx, listResources(f.ClientSet)).WithTimeout(time.Minute).Should(gomega.Equal(slices), "ResourceSlices") @@ -747,7 +812,12 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), feature.Dynami ginkgo.DescribeTable("must not be removed if plugin restarts quickly enough", testRemoveIfRestartsQuickly, ginkgo.Entry("2 sockets", ""), - ginkgo.Entry("1 common socket", path.Join(kubeletplugin.KubeletRegistryDir, driverName+"-common.sock")), + ginkgo.Entry( + "1 common socket", + kubeletplugin.KubeletRegistryDir, + kubeletplugin.PluginDataDirectoryPath(kubeletplugin.KubeletRegistryDir), + kubeletplugin.PluginSocket(driverName+"-common.sock"), + ), ) }) }) @@ -804,18 +874,11 @@ func newKubeletPlugin(ctx context.Context, clientSet kubernetes.Interface, nodeN } // newRegistrar starts a registrar for the specified DRA driver, without the DRA gRPC service. -func newRegistrar(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, serviceSocketPath string) *testdriver.ExamplePlugin { +func newRegistrar(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName string, opts ...any) *testdriver.ExamplePlugin { ginkgo.By("start only Kubelet plugin registrar") logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin registrar "+driverName)) ctx = klog.NewContext(ctx, logger) - opts := []kubeletplugin.Option{ - kubeletplugin.DRAService(false), - } - if serviceSocketPath != "" { - dir, file := path.Split(serviceSocketPath) - opts = append(opts, kubeletplugin.PluginDataDirectoryPath(dir)) - opts = append(opts, kubeletplugin.PluginSocket(file)) - } + opts = append(opts, kubeletplugin.DRAService(false)) registrar, err := testdriver.StartPlugin( ctx, cdiDir, @@ -829,31 +892,35 @@ func newRegistrar(ctx context.Context, clientSet kubernetes.Interface, nodeName, return registrar } -// newDRAService starts the DRA gRPC service for the specified DRA driver, without the registrar. -func newDRAService(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, socketPath string) *testdriver.ExamplePlugin { +// newDRAService starts the DRA gRPC service for the specified node and driver. +// It ensures that necessary directories exist, starts the plugin and registers +// cleanup functions to remove created resources after the test. +// Parameters: +// - ctx: The context for controlling cancellation and logging. +// - clientSet: Kubernetes client interface for interacting with the cluster. +// - nodeName: The name of the node where the plugin will run. +// - driverName: The name of the DRA driver. +// - datadir: The directory for the DRA socket and state files. +// Must match what is specified via [kubeletplugin.PluginDataDirectoryPath]. +// May be empty if that option is not used, then the default path is used. +// - opts: Additional options for plugin configuration. +// +// Returns: +// - A pointer to the started ExamplePlugin instance. +func newDRAService(ctx context.Context, clientSet kubernetes.Interface, nodeName, driverName, datadir string, opts ...any) *testdriver.ExamplePlugin { ginkgo.By("start only Kubelet plugin") logger := klog.LoggerWithValues(klog.LoggerWithName(klog.Background(), "kubelet plugin "+driverName), "node", nodeName) ctx = klog.NewContext(ctx, logger) + opts = append(opts, kubeletplugin.RegistrationService(false)) + // Ensure that directories exist, creating them if necessary. We want // to know early if there is a setup problem that would prevent // creating those directories. err := os.MkdirAll(cdiDir, os.FileMode(0750)) framework.ExpectNoError(err, "create CDI directory") - opts := []kubeletplugin.Option{ - kubeletplugin.RegistrationService(false), - } - var datadir string - if socketPath == "" { - // The default, not set as option. + if datadir == "" { datadir = path.Join(kubeletplugin.KubeletPluginsDir, driverName) - } else { - dir, file := path.Split(socketPath) - opts = append(opts, - kubeletplugin.PluginDataDirectoryPath(dir), - kubeletplugin.PluginSocket(file), - ) - datadir = dir } err = os.MkdirAll(datadir, 0750) framework.ExpectNoError(err, "create DRA socket directory") @@ -1074,3 +1141,44 @@ func listAndStoreResources(client kubernetes.Interface, lastSlices *[]resourceap func matchResourcesByNodeName(nodeName string) types.GomegaMatcher { return gomega.HaveField("Spec.NodeName", gomega.Equal(nodeName)) } + +// errorOnCloseListener is a mock net.Listener that blocks on Accept() +// until Close() is called, at which point Accept() returns a predefined error. +// +// This is useful in tests or simulated environments to trigger grpc.Server.Serve() +// to exit cleanly with a known error, without needing real network activity. +type errorOnCloseListener struct { + ch chan struct{} + closed atomic.Bool + err error +} + +// newErrorOnCloseListener creates a new listener that causes Accept to fail +// with the given error after Close is called. +func newErrorOnCloseListener(err error) *errorOnCloseListener { + return &errorOnCloseListener{ + ch: make(chan struct{}), + err: err, + } +} + +// Accept blocks until Close is called, then returns the configured error. +func (l *errorOnCloseListener) Accept() (net.Conn, error) { + <-l.ch + return nil, l.err +} + +// Close unblocks Accept and causes it to return the configured error. +// It is safe to call multiple times. +func (l *errorOnCloseListener) Close() error { + if l.closed.Swap(true) { + return nil // already closed + } + close(l.ch) + return nil +} + +// Addr returns a dummy Unix address. Required to satisfy net.Listener. +func (*errorOnCloseListener) Addr() net.Addr { + return &net.UnixAddr{Name: "errorOnCloseListener", Net: "unix"} +}