mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #127566 from zhifei92/integrate-watchdog
integrate kubelet with the systemd watchdog
This commit is contained in:
		@@ -1658,6 +1658,7 @@ After=network-online.target
 | 
				
			|||||||
[Service]
 | 
					[Service]
 | 
				
			||||||
Restart=always
 | 
					Restart=always
 | 
				
			||||||
RestartSec=10
 | 
					RestartSec=10
 | 
				
			||||||
 | 
					WatchdogSec=30s
 | 
				
			||||||
EnvironmentFile=${kubelet_env_file}
 | 
					EnvironmentFile=${kubelet_env_file}
 | 
				
			||||||
ExecStart=${kubelet_bin} \$KUBELET_OPTS
 | 
					ExecStart=${kubelet_bin} \$KUBELET_OPTS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -749,6 +749,16 @@ const (
 | 
				
			|||||||
	//
 | 
						//
 | 
				
			||||||
	// Enables the image volume source.
 | 
						// Enables the image volume source.
 | 
				
			||||||
	ImageVolume featuregate.Feature = "ImageVolume"
 | 
						ImageVolume featuregate.Feature = "ImageVolume"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// owner: @zhifei92
 | 
				
			||||||
 | 
						// beta: v1.32
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						// Enables the systemd watchdog for the kubelet. When enabled, the kubelet will
 | 
				
			||||||
 | 
						// periodically notify the systemd watchdog to indicate that it is still alive.
 | 
				
			||||||
 | 
						// This can help prevent the system from restarting the kubelet if it becomes
 | 
				
			||||||
 | 
						// unresponsive. The feature gate is enabled by default, but should only be used
 | 
				
			||||||
 | 
						// if the system supports the systemd watchdog feature and has it configured properly.
 | 
				
			||||||
 | 
						SystemdWatchdog = featuregate.Feature("SystemdWatchdog")
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func init() {
 | 
					func init() {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -697,6 +697,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
 | 
				
			|||||||
		{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
 | 
							{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
	},
 | 
						},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SystemdWatchdog: {
 | 
				
			||||||
 | 
							{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},
 | 
				
			||||||
 | 
						},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	TopologyAwareHints: {
 | 
						TopologyAwareHints: {
 | 
				
			||||||
		{Version: version.MustParse("1.21"), Default: false, PreRelease: featuregate.Alpha},
 | 
							{Version: version.MustParse("1.21"), Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
		{Version: version.MustParse("1.23"), Default: false, PreRelease: featuregate.Beta},
 | 
							{Version: version.MustParse("1.23"), Default: false, PreRelease: featuregate.Beta},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -118,6 +118,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | 
						"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
 | 
						"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/volumemanager"
 | 
						"k8s.io/kubernetes/pkg/kubelet/volumemanager"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/kubelet/watchdog"
 | 
				
			||||||
	httpprobe "k8s.io/kubernetes/pkg/probe/http"
 | 
						httpprobe "k8s.io/kubernetes/pkg/probe/http"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/security/apparmor"
 | 
						"k8s.io/kubernetes/pkg/security/apparmor"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/oom"
 | 
						"k8s.io/kubernetes/pkg/util/oom"
 | 
				
			||||||
@@ -957,6 +958,14 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | 
				
			|||||||
	// since this relies on the rest of the Kubelet having been constructed.
 | 
						// since this relies on the rest of the Kubelet having been constructed.
 | 
				
			||||||
	klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
 | 
						klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) {
 | 
				
			||||||
 | 
							// NewHealthChecker returns an error indicating that the watchdog is configured but the configuration is incorrect,
 | 
				
			||||||
 | 
							// the kubelet will not be started.
 | 
				
			||||||
 | 
							klet.healthChecker, err = watchdog.NewHealthChecker(klet)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("create health checker: %w", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return klet, nil
 | 
						return klet, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1344,6 +1353,9 @@ type Kubelet struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Track node startup latencies
 | 
						// Track node startup latencies
 | 
				
			||||||
	nodeStartupLatencyTracker util.NodeStartupLatencyTracker
 | 
						nodeStartupLatencyTracker util.NodeStartupLatencyTracker
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Health check kubelet
 | 
				
			||||||
 | 
						healthChecker watchdog.HealthChecker
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
 | 
					// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
 | 
				
			||||||
@@ -1698,6 +1710,10 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 | 
				
			|||||||
		kl.eventedPleg.Start()
 | 
							kl.eventedPleg.Start()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) {
 | 
				
			||||||
 | 
							kl.healthChecker.Start()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	kl.syncLoop(ctx, updates, kl)
 | 
						kl.syncLoop(ctx, updates, kl)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -2876,6 +2892,20 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
 | 
				
			|||||||
	return val.(time.Time)
 | 
						return val.(time.Time)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SyncLoopHealthCheck checks if kubelet's sync loop that updates containers is working.
 | 
				
			||||||
 | 
					func (kl *Kubelet) SyncLoopHealthCheck(req *http.Request) error {
 | 
				
			||||||
 | 
						duration := kl.resyncInterval * 2
 | 
				
			||||||
 | 
						minDuration := time.Minute * 5
 | 
				
			||||||
 | 
						if duration < minDuration {
 | 
				
			||||||
 | 
							duration = minDuration
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						enterLoopTime := kl.LatestLoopEntryTime()
 | 
				
			||||||
 | 
						if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
 | 
				
			||||||
 | 
							return fmt.Errorf("sync Loop took longer than expected")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// updateRuntimeUp calls the container runtime status callback, initializing
 | 
					// updateRuntimeUp calls the container runtime status callback, initializing
 | 
				
			||||||
// the runtime dependent modules when the container runtime first comes up,
 | 
					// the runtime dependent modules when the container runtime first comes up,
 | 
				
			||||||
// and returns an error if the status check fails.  If the status check is OK,
 | 
					// and returns an error if the status check fails.  If the status check is OK,
 | 
				
			||||||
@@ -2935,11 +2965,6 @@ func (kl *Kubelet) BirthCry() {
 | 
				
			|||||||
	kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
 | 
						kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ResyncInterval returns the interval used for periodic syncs.
 | 
					 | 
				
			||||||
func (kl *Kubelet) ResyncInterval() time.Duration {
 | 
					 | 
				
			||||||
	return kl.resyncInterval
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// ListenAndServe runs the kubelet HTTP server.
 | 
					// ListenAndServe runs the kubelet HTTP server.
 | 
				
			||||||
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
 | 
					func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
 | 
				
			||||||
	auth server.AuthInterface, tp trace.TracerProvider) {
 | 
						auth server.AuthInterface, tp trace.TracerProvider) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -265,9 +265,8 @@ type HostInterface interface {
 | 
				
			|||||||
	CheckpointContainer(ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
 | 
						CheckpointContainer(ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
 | 
				
			||||||
	GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
 | 
						GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
 | 
				
			||||||
	ServeLogs(w http.ResponseWriter, req *http.Request)
 | 
						ServeLogs(w http.ResponseWriter, req *http.Request)
 | 
				
			||||||
	ResyncInterval() time.Duration
 | 
					 | 
				
			||||||
	GetHostname() string
 | 
						GetHostname() string
 | 
				
			||||||
	LatestLoopEntryTime() time.Time
 | 
						SyncLoopHealthCheck(req *http.Request) error
 | 
				
			||||||
	GetExec(ctx context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
 | 
						GetExec(ctx context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error)
 | 
				
			||||||
	GetAttach(ctx context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
 | 
						GetAttach(ctx context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error)
 | 
				
			||||||
	GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
 | 
						GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
 | 
				
			||||||
@@ -396,7 +395,7 @@ func (s *Server) InstallDefaultHandlers() {
 | 
				
			|||||||
	healthz.InstallHandler(s.restfulCont,
 | 
						healthz.InstallHandler(s.restfulCont,
 | 
				
			||||||
		healthz.PingHealthz,
 | 
							healthz.PingHealthz,
 | 
				
			||||||
		healthz.LogHealthz,
 | 
							healthz.LogHealthz,
 | 
				
			||||||
		healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
 | 
							healthz.NamedCheck("syncloop", s.host.SyncLoopHealthCheck),
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	slis.SLIMetricsWithReset{}.Install(s.restfulCont)
 | 
						slis.SLIMetricsWithReset{}.Install(s.restfulCont)
 | 
				
			||||||
@@ -678,20 +677,6 @@ func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableC
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Checks if kubelet's sync loop  that updates containers is working.
 | 
					 | 
				
			||||||
func (s *Server) syncLoopHealthCheck(req *http.Request) error {
 | 
					 | 
				
			||||||
	duration := s.host.ResyncInterval() * 2
 | 
					 | 
				
			||||||
	minDuration := time.Minute * 5
 | 
					 | 
				
			||||||
	if duration < minDuration {
 | 
					 | 
				
			||||||
		duration = minDuration
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	enterLoopTime := s.host.LatestLoopEntryTime()
 | 
					 | 
				
			||||||
	if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
 | 
					 | 
				
			||||||
		return fmt.Errorf("sync Loop took longer than expected")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// getContainerLogs handles containerLogs request against the Kubelet
 | 
					// getContainerLogs handles containerLogs request against the Kubelet
 | 
				
			||||||
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
 | 
					func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
 | 
				
			||||||
	podNamespace := request.PathParameter("podNamespace")
 | 
						podNamespace := request.PathParameter("podNamespace")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -91,10 +91,6 @@ type fakeKubelet struct {
 | 
				
			|||||||
	streamingRuntime  streaming.Server
 | 
						streamingRuntime  streaming.Server
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fk *fakeKubelet) ResyncInterval() time.Duration {
 | 
					 | 
				
			||||||
	return fk.resyncInterval
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
 | 
					func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
 | 
				
			||||||
	return fk.loopEntryTime
 | 
						return fk.loopEntryTime
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -154,6 +150,19 @@ func (fk *fakeKubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi
 | 
				
			|||||||
	return nil, nil
 | 
						return nil, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (fk *fakeKubelet) SyncLoopHealthCheck(req *http.Request) error {
 | 
				
			||||||
 | 
						duration := fk.resyncInterval * 2
 | 
				
			||||||
 | 
						minDuration := time.Minute * 5
 | 
				
			||||||
 | 
						if duration < minDuration {
 | 
				
			||||||
 | 
							duration = minDuration
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						enterLoopTime := fk.LatestLoopEntryTime()
 | 
				
			||||||
 | 
						if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
 | 
				
			||||||
 | 
							return fmt.Errorf("sync Loop took longer than expected")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type fakeRuntime struct {
 | 
					type fakeRuntime struct {
 | 
				
			||||||
	execFunc        func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
 | 
						execFunc        func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
 | 
				
			||||||
	attachFunc      func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
 | 
						attachFunc      func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										29
									
								
								pkg/kubelet/watchdog/types.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								pkg/kubelet/watchdog/types.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,29 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package watchdog
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "net/http"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// HealthChecker defines the interface of health checkers.
 | 
				
			||||||
 | 
					type HealthChecker interface {
 | 
				
			||||||
 | 
						Start()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// syncLoopHealthChecker contains the health check method for syncLoop.
 | 
				
			||||||
 | 
					type syncLoopHealthChecker interface {
 | 
				
			||||||
 | 
						SyncLoopHealthCheck(req *http.Request) error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										158
									
								
								pkg/kubelet/watchdog/watchdog_linux.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										158
									
								
								pkg/kubelet/watchdog/watchdog_linux.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,158 @@
 | 
				
			|||||||
 | 
					//go:build linux
 | 
				
			||||||
 | 
					// +build linux
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package watchdog
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/coreos/go-systemd/v22/daemon"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/server/healthz"
 | 
				
			||||||
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// WatchdogClient defines the interface for interacting with the systemd watchdog.
 | 
				
			||||||
 | 
					type WatchdogClient interface {
 | 
				
			||||||
 | 
						SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error)
 | 
				
			||||||
 | 
						SdNotify(unsetEnvironment bool) (bool, error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DefaultWatchdogClient implements the WatchdogClient interface using the actual systemd daemon functions.
 | 
				
			||||||
 | 
					type DefaultWatchdogClient struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ WatchdogClient = &DefaultWatchdogClient{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *DefaultWatchdogClient) SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) {
 | 
				
			||||||
 | 
						return daemon.SdWatchdogEnabled(unsetEnvironment)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *DefaultWatchdogClient) SdNotify(unsetEnvironment bool) (bool, error) {
 | 
				
			||||||
 | 
						return daemon.SdNotify(unsetEnvironment, daemon.SdNotifyWatchdog)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Option defines optional parameters for initializing the healthChecker
 | 
				
			||||||
 | 
					// structure.
 | 
				
			||||||
 | 
					type Option func(*healthChecker)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func WithWatchdogClient(watchdog WatchdogClient) Option {
 | 
				
			||||||
 | 
						return func(hc *healthChecker) {
 | 
				
			||||||
 | 
							hc.watchdog = watchdog
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type healthChecker struct {
 | 
				
			||||||
 | 
						checkers     []healthz.HealthChecker
 | 
				
			||||||
 | 
						retryBackoff wait.Backoff
 | 
				
			||||||
 | 
						interval     time.Duration
 | 
				
			||||||
 | 
						watchdog     WatchdogClient
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ HealthChecker = &healthChecker{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const minimalNotifyInterval = time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewHealthChecker creates a new HealthChecker instance.
 | 
				
			||||||
 | 
					// This function initializes the health checker and configures its behavior based on the status of the systemd watchdog.
 | 
				
			||||||
 | 
					// If the watchdog is not enabled, the function returns an error.
 | 
				
			||||||
 | 
					func NewHealthChecker(syncLoop syncLoopHealthChecker, opts ...Option) (HealthChecker, error) {
 | 
				
			||||||
 | 
						hc := &healthChecker{
 | 
				
			||||||
 | 
							watchdog: &DefaultWatchdogClient{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, o := range opts {
 | 
				
			||||||
 | 
							o(hc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// get watchdog information
 | 
				
			||||||
 | 
						watchdogVal, err := hc.watchdog.SdWatchdogEnabled(false)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							// Failed to get watchdog configuration information.
 | 
				
			||||||
 | 
							// This occurs when we want to start the watchdog but the configuration is incorrect,
 | 
				
			||||||
 | 
							// for example, the time is not configured correctly.
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("configure watchdog: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if watchdogVal == 0 {
 | 
				
			||||||
 | 
							klog.InfoS("Systemd watchdog is not enabled")
 | 
				
			||||||
 | 
							return &healthChecker{}, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if watchdogVal <= minimalNotifyInterval {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("configure watchdog timeout too small: %v", watchdogVal)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// The health checks performed by checkers are the same as those for "/healthz".
 | 
				
			||||||
 | 
						checkers := []healthz.HealthChecker{
 | 
				
			||||||
 | 
							healthz.PingHealthz,
 | 
				
			||||||
 | 
							healthz.LogHealthz,
 | 
				
			||||||
 | 
							healthz.NamedCheck("syncloop", syncLoop.SyncLoopHealthCheck),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						retryBackoff := wait.Backoff{
 | 
				
			||||||
 | 
							Duration: time.Second,
 | 
				
			||||||
 | 
							Factor:   2.0,
 | 
				
			||||||
 | 
							Jitter:   0.1,
 | 
				
			||||||
 | 
							Steps:    2,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						hc.checkers = checkers
 | 
				
			||||||
 | 
						hc.retryBackoff = retryBackoff
 | 
				
			||||||
 | 
						hc.interval = watchdogVal / 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return hc, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (hc *healthChecker) Start() {
 | 
				
			||||||
 | 
						if hc.interval <= 0 {
 | 
				
			||||||
 | 
							klog.InfoS("Systemd watchdog is not enabled or the interval is invalid, so health checking will not be started.")
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						klog.InfoS("Starting systemd watchdog with interval", "interval", hc.interval)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go wait.Forever(func() {
 | 
				
			||||||
 | 
							if err := hc.doCheck(); err != nil {
 | 
				
			||||||
 | 
								klog.ErrorS(err, "Do not notify watchdog this iteration as the kubelet is reportedly not healthy")
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							err := wait.ExponentialBackoff(hc.retryBackoff, func() (bool, error) {
 | 
				
			||||||
 | 
								ack, err := hc.watchdog.SdNotify(false)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									klog.V(5).InfoS("Failed to notify systemd watchdog, retrying", "error", err)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !ack {
 | 
				
			||||||
 | 
									return false, fmt.Errorf("failed to notify systemd watchdog, notification not supported - (i.e. NOTIFY_SOCKET is unset)")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								klog.V(5).InfoS("Watchdog plugin notified", "acknowledgment", ack, "state", daemon.SdNotifyWatchdog)
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								klog.ErrorS(err, "Failed to notify watchdog")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}, hc.interval)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (hc *healthChecker) doCheck() error {
 | 
				
			||||||
 | 
						for _, hc := range hc.checkers {
 | 
				
			||||||
 | 
							if err := hc.Check(nil); err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("checker %s failed: %w", hc.Name(), err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										180
									
								
								pkg/kubelet/watchdog/watchdog_linux_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										180
									
								
								pkg/kubelet/watchdog/watchdog_linux_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,180 @@
 | 
				
			|||||||
 | 
					//go:build linux
 | 
				
			||||||
 | 
					// +build linux
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package watchdog
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"bytes"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"flag"
 | 
				
			||||||
 | 
						"net/http"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Mock syncLoopHealthChecker
 | 
				
			||||||
 | 
					type mockSyncLoopHealthChecker struct {
 | 
				
			||||||
 | 
						healthCheckErr error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *mockSyncLoopHealthChecker) SyncLoopHealthCheck(req *http.Request) error {
 | 
				
			||||||
 | 
						return m.healthCheckErr
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Mock WatchdogClient
 | 
				
			||||||
 | 
					type mockWatchdogClient struct {
 | 
				
			||||||
 | 
						enabledVal time.Duration
 | 
				
			||||||
 | 
						enabledErr error
 | 
				
			||||||
 | 
						notifyAck  bool
 | 
				
			||||||
 | 
						notifyErr  error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *mockWatchdogClient) SdWatchdogEnabled(unsetEnvironment bool) (time.Duration, error) {
 | 
				
			||||||
 | 
						return m.enabledVal, m.enabledErr
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *mockWatchdogClient) SdNotify(unsetEnvironment bool) (bool, error) {
 | 
				
			||||||
 | 
						return m.notifyAck, m.notifyErr
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						interval      = 4 * time.Second
 | 
				
			||||||
 | 
						intervalSmall = 1 * time.Second
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestNewHealthChecker tests the NewHealthChecker function.
 | 
				
			||||||
 | 
					func TestNewHealthChecker(t *testing.T) {
 | 
				
			||||||
 | 
						// Test cases
 | 
				
			||||||
 | 
						tests := []struct {
 | 
				
			||||||
 | 
							name        string
 | 
				
			||||||
 | 
							mockEnabled time.Duration
 | 
				
			||||||
 | 
							mockErr     error
 | 
				
			||||||
 | 
							wantErr     bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{"Watchdog enabled", interval, nil, false},
 | 
				
			||||||
 | 
							{"Watchdog not enabled", 0, nil, false},
 | 
				
			||||||
 | 
							{"Watchdog enabled with error", interval, errors.New("mock error"), true},
 | 
				
			||||||
 | 
							{"Watchdog timeout too small", intervalSmall, nil, true},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, tt := range tests {
 | 
				
			||||||
 | 
							t.Run(tt.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								mockClient := &mockWatchdogClient{
 | 
				
			||||||
 | 
									enabledVal: tt.mockEnabled,
 | 
				
			||||||
 | 
									enabledErr: tt.mockErr,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								_, err := NewHealthChecker(&mockSyncLoopHealthChecker{}, WithWatchdogClient(mockClient))
 | 
				
			||||||
 | 
								if (err != nil) != tt.wantErr {
 | 
				
			||||||
 | 
									t.Errorf("NewHealthChecker() error = %v, wantErr %v", err, tt.wantErr)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestHealthCheckerStart tests the Start method of the healthChecker.
 | 
				
			||||||
 | 
					func TestHealthCheckerStart(t *testing.T) {
 | 
				
			||||||
 | 
						// Test cases
 | 
				
			||||||
 | 
						tests := []struct {
 | 
				
			||||||
 | 
							name           string
 | 
				
			||||||
 | 
							enabledVal     time.Duration
 | 
				
			||||||
 | 
							healthCheckErr error
 | 
				
			||||||
 | 
							notifyAck      bool
 | 
				
			||||||
 | 
							notifyErr      error
 | 
				
			||||||
 | 
							expectedLogs   []string
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:           "Watchdog enabled and notify succeeds",
 | 
				
			||||||
 | 
								enabledVal:     interval,
 | 
				
			||||||
 | 
								healthCheckErr: nil,
 | 
				
			||||||
 | 
								notifyAck:      true,
 | 
				
			||||||
 | 
								notifyErr:      nil,
 | 
				
			||||||
 | 
								expectedLogs:   []string{"Starting systemd watchdog with interval", "Watchdog plugin notified"},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:           "Watchdog enabled and notify fails, notification not supported",
 | 
				
			||||||
 | 
								enabledVal:     interval,
 | 
				
			||||||
 | 
								healthCheckErr: nil,
 | 
				
			||||||
 | 
								notifyAck:      false,
 | 
				
			||||||
 | 
								notifyErr:      nil,
 | 
				
			||||||
 | 
								expectedLogs:   []string{"Starting systemd watchdog with interval", "Failed to notify watchdog", "notification not supported"},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:           "Watchdog enabled and notify fails, transmission failed",
 | 
				
			||||||
 | 
								enabledVal:     interval,
 | 
				
			||||||
 | 
								healthCheckErr: nil,
 | 
				
			||||||
 | 
								notifyAck:      false,
 | 
				
			||||||
 | 
								notifyErr:      errors.New("mock notify error"),
 | 
				
			||||||
 | 
								expectedLogs:   []string{"Starting systemd watchdog with interval", "Failed to notify watchdog"},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:           "Watchdog enabled and health check fails",
 | 
				
			||||||
 | 
								enabledVal:     interval,
 | 
				
			||||||
 | 
								healthCheckErr: errors.New("mock healthy error"),
 | 
				
			||||||
 | 
								notifyAck:      true,
 | 
				
			||||||
 | 
								notifyErr:      nil,
 | 
				
			||||||
 | 
								expectedLogs:   []string{"Starting systemd watchdog with interval", "Do not notify watchdog this iteration as the kubelet is reportedly not healthy"},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, tt := range tests {
 | 
				
			||||||
 | 
							t.Run(tt.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								// Capture logs
 | 
				
			||||||
 | 
								var logBuffer bytes.Buffer
 | 
				
			||||||
 | 
								flags := &flag.FlagSet{}
 | 
				
			||||||
 | 
								klog.InitFlags(flags)
 | 
				
			||||||
 | 
								if err := flags.Set("v", "5"); err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								klog.LogToStderr(false)
 | 
				
			||||||
 | 
								klog.SetOutput(&logBuffer)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Mock SdWatchdogEnabled to return a valid value
 | 
				
			||||||
 | 
								mockClient := &mockWatchdogClient{
 | 
				
			||||||
 | 
									enabledVal: tt.enabledVal,
 | 
				
			||||||
 | 
									notifyAck:  tt.notifyAck,
 | 
				
			||||||
 | 
									notifyErr:  tt.notifyErr,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Create a healthChecker
 | 
				
			||||||
 | 
								hc, err := NewHealthChecker(&mockSyncLoopHealthChecker{healthCheckErr: tt.healthCheckErr}, WithWatchdogClient(mockClient))
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("NewHealthChecker() failed: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Start the health checker
 | 
				
			||||||
 | 
								hc.Start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Wait for a short period to allow the health check to run
 | 
				
			||||||
 | 
								time.Sleep(2 * interval)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Check logs to verify the health check ran
 | 
				
			||||||
 | 
								klog.Flush()
 | 
				
			||||||
 | 
								logs := logBuffer.String()
 | 
				
			||||||
 | 
								for _, expectedLog := range tt.expectedLogs {
 | 
				
			||||||
 | 
									if !strings.Contains(logs, expectedLog) {
 | 
				
			||||||
 | 
										t.Errorf("Expected log '%s' not found in logs: %s", expectedLog, logs)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										33
									
								
								pkg/kubelet/watchdog/watchdog_unsupported.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								pkg/kubelet/watchdog/watchdog_unsupported.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
				
			|||||||
 | 
					//go:build !linux
 | 
				
			||||||
 | 
					// +build !linux
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package watchdog
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type healthCheckerUnsupported struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ HealthChecker = &healthCheckerUnsupported{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewHealthChecker creates a fake one here
 | 
				
			||||||
 | 
					func NewHealthChecker(_ syncLoopHealthChecker) (HealthChecker, error) {
 | 
				
			||||||
 | 
						return &healthCheckerUnsupported{}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ow *healthCheckerUnsupported) Start() {
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -1222,6 +1222,12 @@
 | 
				
			|||||||
    lockToDefault: false
 | 
					    lockToDefault: false
 | 
				
			||||||
    preRelease: Alpha
 | 
					    preRelease: Alpha
 | 
				
			||||||
    version: "1.31"
 | 
					    version: "1.31"
 | 
				
			||||||
 | 
					- name: SystemdWatchdog
 | 
				
			||||||
 | 
					  versionedSpecs:
 | 
				
			||||||
 | 
					  - default: true
 | 
				
			||||||
 | 
					    lockToDefault: false
 | 
				
			||||||
 | 
					    preRelease: Beta
 | 
				
			||||||
 | 
					    version: "1.32"
 | 
				
			||||||
- name: TopologyAwareHints
 | 
					- name: TopologyAwareHints
 | 
				
			||||||
  versionedSpecs:
 | 
					  versionedSpecs:
 | 
				
			||||||
  - default: false
 | 
					  - default: false
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user