feat(cmd/kubelet): support structured and contextual logging

This commit is contained in:
soma00333
2024-10-06 16:35:53 +09:00
parent 1b6a6cc9c0
commit 7ddb042626
12 changed files with 102 additions and 75 deletions

View File

@@ -19,6 +19,8 @@ limitations under the License.
package app
func initForOS(service bool, priorityClass string) error {
import "context"
func initForOS(ctx context.Context, service bool, priorityClass string) error {
return nil
}

View File

@@ -20,12 +20,13 @@ limitations under the License.
package app
import (
"context"
"fmt"
"unsafe"
"golang.org/x/sys/windows"
"k8s.io/klog/v2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/windows/service"
)
@@ -73,13 +74,14 @@ func createWindowsJobObject(pc uint32) (windows.Handle, error) {
return job, nil
}
func initForOS(windowsService bool, windowsPriorityClass string) error {
func initForOS(ctx context.Context, windowsService bool, windowsPriorityClass string) error {
logger := klog.FromContext(ctx)
priority := getPriorityValue(windowsPriorityClass)
if priority == 0 {
return fmt.Errorf("unknown priority class %s, valid ones are available at "+
"https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities", windowsPriorityClass)
}
klog.InfoS("Creating a Windows job object and adding kubelet process to it", "windowsPriorityClass", windowsPriorityClass)
logger.Info("Creating a Windows job object and adding kubelet process to it", "windowsPriorityClass", windowsPriorityClass)
job, err := createWindowsJobObject(priority)
if err != nil {
return err

View File

@@ -18,6 +18,8 @@ package app
// This file exists to force the desired plugin implementations to be linked.
import (
"context"
"k8s.io/component-base/featuregate"
"k8s.io/utils/exec"
@@ -41,7 +43,7 @@ import (
)
// ProbeVolumePlugins collects all volume plugins into an easy to use list.
func ProbeVolumePlugins(featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
func ProbeVolumePlugins(ctx context.Context, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
allPlugins := []volume.VolumePlugin{}
// The list of plugins to probe is decided by the kubelet binary, not
@@ -51,7 +53,7 @@ func ProbeVolumePlugins(featureGate featuregate.FeatureGate) ([]volume.VolumePlu
// Kubelet does not currently need to configure volume plugins.
// If/when it does, see kube-controller-manager/app/plugins.go for example of using volume.VolumeConfig
var err error
allPlugins, err = appendLegacyProviderVolumes(allPlugins, featureGate)
allPlugins, err = appendLegacyProviderVolumes(ctx, allPlugins, featureGate)
if err != nil {
return allPlugins, err
}

View File

@@ -17,6 +17,8 @@ limitations under the License.
package app
import (
"context"
"k8s.io/component-base/featuregate"
"k8s.io/csi-translation-lib/plugins"
"k8s.io/klog/v2"
@@ -28,18 +30,19 @@ import (
type probeFn func() []volume.VolumePlugin
func appendPluginBasedOnFeatureFlags(plugins []volume.VolumePlugin, inTreePluginName string,
func appendPluginBasedOnFeatureFlags(ctx context.Context, plugins []volume.VolumePlugin, inTreePluginName string,
featureGate featuregate.FeatureGate, pluginInfo pluginInfo) ([]volume.VolumePlugin, error) {
logger := klog.FromContext(ctx)
_, err := csimigration.CheckMigrationFeatureFlags(featureGate, pluginInfo.pluginMigrationFeature, pluginInfo.pluginUnregisterFeature)
if err != nil {
klog.InfoS("Unexpected CSI Migration Feature Flags combination detected, CSI Migration may not take effect", "err", err)
logger.Info("Unexpected CSI Migration Feature Flags combination detected, CSI Migration may not take effect", "err", err)
// TODO: fail and return here once alpha only tests can set the feature flags for a plugin correctly
}
// Skip appending the in-tree plugin to the list of plugins to be probed/initialized
// if the plugin unregister feature flag is set
if featureGate.Enabled(pluginInfo.pluginUnregisterFeature) {
klog.InfoS("Skipped registration of plugin since feature flag is enabled", "pluginName", inTreePluginName, "featureFlag", pluginInfo.pluginUnregisterFeature)
logger.Info("Skipped registration of plugin since feature flag is enabled", "pluginName", inTreePluginName, "featureFlag", pluginInfo.pluginUnregisterFeature)
return plugins, nil
}
@@ -53,12 +56,12 @@ type pluginInfo struct {
pluginProbeFunction probeFn
}
func appendLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
func appendLegacyProviderVolumes(ctx context.Context, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
pluginMigrationStatus := make(map[string]pluginInfo)
pluginMigrationStatus[plugins.PortworxVolumePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationPortworx, pluginUnregisterFeature: features.InTreePluginPortworxUnregister, pluginProbeFunction: portworx.ProbeVolumePlugins}
var err error
for pluginName, pluginInfo := range pluginMigrationStatus {
allPlugins, err = appendPluginBasedOnFeatureFlags(allPlugins, pluginName, featureGate, pluginInfo)
allPlugins, err = appendPluginBasedOnFeatureFlags(ctx, allPlugins, pluginName, featureGate, pluginInfo)
if err != nil {
return allPlugins, err
}

View File

@@ -136,6 +136,7 @@ func init() {
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand(ctx context.Context) *cobra.Command {
logger := klog.FromContext(ctx)
cleanFlagSet := pflag.NewFlagSet(server.ComponentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
@@ -143,7 +144,7 @@ func NewKubeletCommand(ctx context.Context) *cobra.Command {
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.ErrorS(err, "Failed to create a new kubelet configuration")
logger.Error(err, "Failed to create a new kubelet configuration")
os.Exit(1)
}
@@ -209,7 +210,7 @@ is checked every 20 seconds (also configurable with a flag).`,
}
if cleanFlagSet.Changed("pod-infra-container-image") {
klog.InfoS("--pod-infra-container-image will not be pruned by the image garbage collector in kubelet and should also be set in the remote runtime")
logger.Info("--pod-infra-container-image will not be pruned by the image garbage collector in kubelet and should also be set in the remote runtime")
_ = cmd.Flags().MarkDeprecated("pod-infra-container-image", "--pod-infra-container-image will be removed in 1.35. Image garbage collector will get sandbox image information from CRI.")
}
@@ -242,6 +243,8 @@ is checked every 20 seconds (also configurable with a flag).`,
// Config and flags parsed, now we can initialize logging.
logs.InitLogs()
// Retrieve the logger again to reflect the updated log settings
logger = klog.FromContext(ctx)
if err := logsapi.ValidateAndApplyAsField(&kubeletConfig.Logging, utilfeature.DefaultFeatureGate, field.NewPath("logging")); err != nil {
return fmt.Errorf("initialize logging: %v", err)
}
@@ -254,7 +257,7 @@ is checked every 20 seconds (also configurable with a flag).`,
}
if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup) != 0) {
klog.InfoS("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
logger.Info("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
}
// construct a KubeletServer from kubeletFlags and kubeletConfig
@@ -264,7 +267,7 @@ is checked every 20 seconds (also configurable with a flag).`,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
kubeletDeps, err := UnsecuredDependencies(ctx, kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
return fmt.Errorf("failed to construct kubelet dependencies: %w", err)
}
@@ -276,8 +279,8 @@ is checked every 20 seconds (also configurable with a flag).`,
}
}
if err := checkPermissions(); err != nil {
klog.ErrorS(err, "kubelet running with insufficient permissions")
if err := checkPermissions(ctx); err != nil {
logger.Error(err, "Kubelet running with insufficient permissions")
}
// make the kubelet's config safe for logging
@@ -286,7 +289,7 @@ is checked every 20 seconds (also configurable with a flag).`,
config.StaticPodURLHeader[k] = []string{"<masked>"}
}
// log the kubelet's config for inspection
klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config))
logger.V(5).Info("KubeletConfiguration", "configuration", klog.Format(config))
// set up signal context for kubelet shutdown
ctx := genericapiserver.SetupSignalContext()
@@ -478,9 +481,9 @@ func loadDropinConfigFileIntoJSON(name string) ([]byte, *schema.GroupVersionKind
// UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
// is not valid. It will not start any background processes, and does not include authentication/authorization
func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
func UnsecuredDependencies(ctx context.Context, s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
// Initialize the TLS Options
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
tlsOptions, err := InitializeTLS(ctx, &s.KubeletFlags, &s.KubeletConfiguration)
if err != nil {
return nil, err
}
@@ -490,7 +493,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
hu := hostutil.NewHostUtil()
pluginRunner := exec.New()
plugins, err := ProbeVolumePlugins(featureGate)
plugins, err := ProbeVolumePlugins(ctx, featureGate)
if err != nil {
return nil, err
}
@@ -521,12 +524,13 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
logger := klog.FromContext(ctx)
// To help debugging, immediately log version
klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
logger.Info("Kubelet version", "kubeletVersion", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
if err := initForOS(ctx, s.WindowsService, s.WindowsPriorityClass); err != nil {
return fmt.Errorf("failed OS init: %w", err)
}
if err := run(ctx, s, kubeDeps, featureGate); err != nil {
@@ -548,14 +552,15 @@ func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfigurati
return nil
}
func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
func initConfigz(ctx context.Context, kc *kubeletconfiginternal.KubeletConfiguration) error {
logger := klog.FromContext(ctx)
cz, err := configz.New("kubeletconfig")
if err != nil {
klog.ErrorS(err, "Failed to register configz")
logger.Error(err, "Failed to register configz")
return err
}
if err := setConfigz(cz, kc); err != nil {
klog.ErrorS(err, "Failed to register config")
logger.Error(err, "Failed to register config")
return err
}
return nil
@@ -566,14 +571,15 @@ func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, node
if kubeDeps.Recorder != nil {
return
}
logger := klog.FromContext(ctx)
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: server.ComponentKubelet, Host: string(nodeName)})
eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil {
klog.V(4).InfoS("Sending events to api server")
logger.V(4).Info("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
klog.InfoS("No api server defined - no events will be sent to API server")
logger.Info("No api server defined - no events will be sent to API server")
}
}
@@ -610,7 +616,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
kubeDeps.HealthChecker = healthChecker
healthChecker.Start(ctx)
}
logger := klog.FromContext(ctx)
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
@@ -624,7 +630,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
// Warn if MemoryQoS enabled with cgroups v1
if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
!kubeletutil.IsCgroup2UnifiedMode() {
klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
logger.Info("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
}
// Obtain Kubelet Lock File
if s.ExitOnLockContention && s.LockFilePath == "" {
@@ -632,22 +638,22 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
logger.Info("Acquiring file lock", "path", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
logger.Info("Watching for inotify events", "path", s.LockFilePath)
if err := watchForLockfileContention(ctx, s.LockFilePath, done); err != nil {
return err
}
}
}
// Register current configuration with /configz endpoint
err = initConfigz(&s.KubeletConfiguration)
err = initConfigz(ctx, &s.KubeletConfiguration)
if err != nil {
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
logger.Error(err, "Failed to register kubelet configuration with configz")
}
if len(s.ShowHiddenMetricsForVersion) > 0 {
@@ -661,7 +667,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s, featureGate)
kubeDeps, err = UnsecuredDependencies(ctx, s, featureGate)
if err != nil {
return err
}
@@ -679,7 +685,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.InfoS("Standalone mode, no API client")
logger.Info("Standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
@@ -746,7 +752,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
logger.Info("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing", "err", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
@@ -774,7 +780,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
logger.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. Defaulting to /")
s.CgroupRoot = "/"
}
@@ -788,7 +794,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
logger.Info("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
@@ -796,7 +802,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.InfoS("After cpu setting is overwritten", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
logger.Info("After cpu setting is overwritten", "kubeReserved", s.KubeReserved, "systemReserved", s.SystemReserved)
}
kubeReserved, err := parseResourceList(s.KubeReserved)
@@ -835,7 +841,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
if !s.FailSwapOn && s.MemorySwap.SwapBehavior == "" {
// This is just a log because we are using a default of NoSwap.
klog.InfoS("NoSwap is set due to memorySwapBehavior not specified", "memorySwapBehavior", s.MemorySwap.SwapBehavior, "FailSwapOn", s.FailSwapOn)
logger.Info("NoSwap is set due to memorySwapBehavior not specified", "memorySwapBehavior", s.MemorySwap.SwapBehavior, "FailSwapOn", s.FailSwapOn)
}
}
@@ -896,7 +902,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
logger.Info("Failed to ApplyOOMScoreAdj", "err", err)
}
if err := RunKubelet(ctx, s, kubeDeps); err != nil {
@@ -909,7 +915,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.ErrorS(err, "Failed to start healthz server")
logger.Error(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
@@ -930,6 +936,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
// buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
// bootstrapping is enabled or client certificate rotation is enabled.
func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp oteltrace.TracerProvider, nodeName types.NodeName) (*restclient.Config, func(), error) {
logger := klog.FromContext(ctx)
if s.RotateCertificates {
// Rules for client rotation and the handling of kube config files:
//
@@ -948,7 +955,7 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp
// which provides a high powered kubeconfig on the master with cert/key data, we must
// bootstrap the cert manager with the contents of the initial client config.
klog.InfoS("Client rotation is on, will bootstrap in background")
logger.Info("Client rotation is on, will bootstrap in background")
certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
if err != nil {
return nil, nil, err
@@ -999,13 +1006,13 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp
// control, users can still opt-in to the previous behavior for closing the connections by
// setting the environment variable DISABLE_HTTP2.
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
klog.InfoS("HTTP2 has been explicitly disabled, Kubelet will forcefully close active connections on heartbeat failures")
logger.Info("HTTP2 has been explicitly disabled, Kubelet will forcefully close active connections on heartbeat failures")
onHeartbeatFailure = closeAllConns
} else {
onHeartbeatFailure = func() { utilnet.CloseIdleConnectionsFor(transportConfig.Transport) }
}
klog.V(2).InfoS("Starting client certificate rotation")
logger.V(2).Info("Starting client certificate rotation")
clientCertificateManager.Start()
return transportConfig, onHeartbeatFailure, nil
@@ -1034,7 +1041,7 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp
// setting the environment variable DISABLE_HTTP2.
var onHeartbeatFailure func()
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
klog.InfoS("HTTP2 has been explicitly disabled, updating Kubelet client Dialer to forcefully close active connections on heartbeat failures")
logger.Info("HTTP2 has been explicitly disabled, updating Kubelet client Dialer to forcefully close active connections on heartbeat failures")
onHeartbeatFailure, err = updateDialer(clientConfig)
if err != nil {
return nil, nil, err
@@ -1099,7 +1106,8 @@ func kubeClientConfigOverrides(s *options.KubeletServer, clientConfig *restclien
// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
func InitializeTLS(ctx context.Context, kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
logger := klog.FromContext(ctx)
if !kc.ServerTLSBootstrap && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
kc.TLSCertFile = filepath.Join(kf.CertDirectory, "kubelet.crt")
kc.TLSPrivateKeyFile = filepath.Join(kf.CertDirectory, "kubelet.key")
@@ -1126,7 +1134,7 @@ func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletCo
return nil, err
}
klog.V(4).InfoS("Using self-signed cert", "TLSCertFile", kc.TLSCertFile, "TLSPrivateKeyFile", kc.TLSPrivateKeyFile)
logger.V(4).Info("Using self-signed cert", "TLSCertFile", kc.TLSCertFile, "TLSPrivateKeyFile", kc.TLSPrivateKeyFile)
}
}
@@ -1140,7 +1148,7 @@ func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletCo
for i := 0; i < len(tlsCipherSuites); i++ {
for cipherName, cipherID := range insecureCiphers {
if tlsCipherSuites[i] == cipherID {
klog.InfoS("Use of insecure cipher detected.", "cipher", cipherName)
logger.Info("Use of insecure cipher detected.", "cipher", cipherName)
}
}
}
@@ -1153,7 +1161,7 @@ func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletCo
if minTLSVersion == tls.VersionTLS13 {
if len(tlsCipherSuites) != 0 {
klog.InfoS("Warning: TLS 1.3 cipher suites are not configurable, ignoring --tls-cipher-suites")
logger.Info("Warning: TLS 1.3 cipher suites are not configurable, ignoring --tls-cipher-suites")
}
}
@@ -1203,6 +1211,7 @@ func setContentTypeForClient(cfg *restclient.Config, contentType string) {
//
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
logger := klog.FromContext(ctx)
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
@@ -1216,7 +1225,7 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps
return fmt.Errorf("bad --node-ip %q: %v", kubeServer.NodeIP, err)
}
if len(invalidNodeIps) != 0 {
klog.FromContext(ctx).Info("Could not parse some node IP(s), ignoring them", "IPs", invalidNodeIps)
logger.Info("Could not parse some node IP(s), ignoring them", "IPs", invalidNodeIps)
}
capabilities.Initialize(capabilities.Capabilities{
@@ -1224,7 +1233,7 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps
})
credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
klog.V(2).InfoS("Using root directory", "path", kubeServer.RootDirectory)
logger.V(2).Info("Using root directory", "path", kubeServer.RootDirectory)
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
@@ -1247,11 +1256,11 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps
podCfg := kubeDeps.PodConfig
if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
klog.ErrorS(err, "Failed to set rlimit on max file handles")
logger.Error(err, "Failed to set rlimit on max file handles")
}
startKubelet(ctx, k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
logger.Info("Started kubelet")
return nil
}
@@ -1362,7 +1371,8 @@ func newTracerProvider(s *options.KubeletServer) (oteltrace.TracerProvider, erro
}
func getCgroupDriverFromCRI(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
klog.V(4).InfoS("Getting CRI runtime configuration information")
logger := klog.FromContext(ctx)
logger.V(4).Info("Getting CRI runtime configuration information")
var (
runtimeConfig *runtimeapi.RuntimeConfigResponse
@@ -1382,7 +1392,7 @@ func getCgroupDriverFromCRI(ctx context.Context, s *options.KubeletServer, kubeD
continue
}
// CRI implementation doesn't support RuntimeConfig, fallback
klog.InfoS("CRI implementation should be updated to support RuntimeConfig when KubeletCgroupDriverFromCRI feature gate has been enabled. Falling back to using cgroupDriver from kubelet config.")
logger.Info("CRI implementation should be updated to support RuntimeConfig when KubeletCgroupDriverFromCRI feature gate has been enabled. Falling back to using cgroupDriver from kubelet config.")
return nil
}
}
@@ -1405,6 +1415,6 @@ func getCgroupDriverFromCRI(ctx context.Context, s *options.KubeletServer, kubeD
default:
return fmt.Errorf("runtime returned an unknown cgroup driver %d", d)
}
klog.InfoS("Using cgroup driver setting received from the CRI runtime", "cgroupDriver", s.CgroupDriver)
logger.Info("Using cgroup driver setting received from the CRI runtime", "cgroupDriver", s.CgroupDriver)
return nil
}

View File

@@ -17,27 +17,30 @@ limitations under the License.
package app
import (
"context"
"k8s.io/klog/v2"
"k8s.io/utils/inotify"
)
func watchForLockfileContention(path string, done chan struct{}) error {
func watchForLockfileContention(ctx context.Context, path string, done chan struct{}) error {
logger := klog.FromContext(ctx)
watcher, err := inotify.NewWatcher()
if err != nil {
klog.ErrorS(err, "Unable to create watcher for lockfile")
logger.Error(err, "Unable to create watcher for lockfile")
return err
}
if err = watcher.AddWatch(path, inotify.InOpen|inotify.InDeleteSelf); err != nil {
klog.ErrorS(err, "Unable to watch lockfile")
logger.Error(err, "Unable to watch lockfile")
watcher.Close()
return err
}
go func() {
select {
case ev := <-watcher.Event:
klog.InfoS("Inotify event", "event", ev)
logger.Info("Inotify event", "event", ev)
case err = <-watcher.Error:
klog.ErrorS(err, "inotify watcher error")
logger.Error(err, "inotify watcher error")
}
close(done)
watcher.Close()

View File

@@ -20,11 +20,12 @@ limitations under the License.
package app
import (
"context"
"fmt"
"os"
)
func checkPermissions() error {
func checkPermissions(ctx context.Context) error {
if uid := os.Getuid(); uid != 0 {
return fmt.Errorf("kubelet needs to run as uid `0`. It is being run as %d", uid)
}

View File

@@ -19,9 +19,12 @@ limitations under the License.
package app
import "errors"
import (
"context"
"errors"
)
func watchForLockfileContention(path string, done chan struct{}) error {
func watchForLockfileContention(ctx context.Context, path string, done chan struct{}) error {
return errors.New("kubelet unsupported in this build")
}

View File

@@ -20,6 +20,7 @@ limitations under the License.
package app
import (
"context"
"errors"
"os/user"
@@ -27,17 +28,17 @@ import (
"k8s.io/klog/v2"
)
func checkPermissions() error {
func checkPermissions(ctx context.Context) error {
logger := klog.FromContext(ctx)
u, err := user.Current()
if err != nil {
klog.ErrorS(err, "Unable to get current user")
logger.Error(err, "Unable to get current user")
return err
}
// For Windows user.UserName contains the login name and user.Name contains
// the user's display name - https://pkg.go.dev/os/user#User
klog.InfoS("Kubelet is running as", "login name", u.Username, "dispaly name", u.Name)
logger.Info("Kubelet is running as", "login name", u.Username, "dispaly name", u.Name)
if !windows.GetCurrentProcessToken().IsElevated() {
return errors.New("kubelet needs to run with elevated permissions!")

View File

@@ -168,7 +168,6 @@ linters:
-structured .*
# Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.*
structured k8s.io/kms/.*
@@ -200,6 +199,7 @@ linters:
contextual k8s.io/sample-controller/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/cmd/kubelet/.*
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*

View File

@@ -182,7 +182,6 @@ linters:
-structured .*
# Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.*
structured k8s.io/kms/.*
@@ -214,6 +213,7 @@ linters:
contextual k8s.io/sample-controller/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/cmd/kubelet/.*
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*

View File

@@ -14,7 +14,6 @@
-structured .*
# Now enable it again for migrated packages.
structured k8s.io/kubernetes/cmd/kubelet/.*
structured k8s.io/kubernetes/pkg/kubelet/.*
structured k8s.io/kubernetes/pkg/proxy/.*
structured k8s.io/kms/.*
@@ -46,6 +45,7 @@ contextual k8s.io/sample-cli-plugin/.*
contextual k8s.io/sample-controller/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/cmd/kubelet/.*
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*