migrate pkg/kubelet/kubeletconfig to contextual logging

This commit is contained in:
Ed Bartosh
2025-02-01 09:37:41 +02:00
parent 7dad9e2af6
commit 75ccd69bab
13 changed files with 40 additions and 23 deletions

View File

@@ -18,6 +18,7 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
@@ -45,6 +46,7 @@ func main() {
os.Exit(1)
}
ctx := context.Background()
outDir, err := genutils.OutDir(path)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get output directory: %v\n", err)
@@ -70,7 +72,7 @@ func main() {
doc.GenMarkdownTree(scheduler, outDir)
case "kubelet":
// generate docs for kubelet
kubelet := kubeletapp.NewKubeletCommand()
kubelet := kubeletapp.NewKubeletCommand(ctx)
doc.GenMarkdownTree(kubelet, outDir)
case "kubeadm":
// resets global flags created by kubelet or other commands e.g.

View File

@@ -18,6 +18,7 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
@@ -58,6 +59,7 @@ func main() {
// Set environment variables used by command so the output is consistent,
// regardless of where we run.
os.Setenv("HOME", "/home/username")
ctx := context.Background()
switch module {
case "kube-apiserver":
@@ -90,7 +92,7 @@ func main() {
}
case "kubelet":
// generate manpage for kubelet
kubelet := kubeletapp.NewKubeletCommand()
kubelet := kubeletapp.NewKubeletCommand(ctx)
genMarkdown(kubelet, "", outDir)
for _, c := range kubelet.Commands() {
genMarkdown(c, "kubelet", outDir)

View File

@@ -135,7 +135,7 @@ func init() {
}
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
func NewKubeletCommand(ctx context.Context) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(server.ComponentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
@@ -215,7 +215,7 @@ is checked every 20 seconds (also configurable with a flag).`,
// load kubelet config file, if provided
if len(kubeletFlags.KubeletConfigFile) > 0 {
kubeletConfig, err = loadConfigFile(kubeletFlags.KubeletConfigFile)
kubeletConfig, err = loadConfigFile(ctx, kubeletFlags.KubeletConfigFile)
if err != nil {
return fmt.Errorf("failed to load kubelet config file, path: %s, error: %w", kubeletFlags.KubeletConfigFile, err)
}
@@ -431,7 +431,7 @@ func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration,
return nil
}
func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
func loadConfigFile(ctx context.Context, name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
const errFmt = "failed to load Kubelet config file %s, error %v"
// compute absolute path based on current working dir
kubeletConfigFile, err := filepath.Abs(name)
@@ -442,7 +442,7 @@ func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, e
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
}
kc, err := loader.Load()
kc, err := loader.Load(ctx)
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
}

View File

@@ -22,6 +22,7 @@ limitations under the License.
package main
import (
"context"
"os"
"k8s.io/component-base/cli"
@@ -32,7 +33,7 @@ import (
)
func main() {
command := app.NewKubeletCommand()
command := app.NewKubeletCommand(context.Background())
code := cli.Run(command)
os.Exit(code)
}

View File

@@ -212,6 +212,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -226,6 +226,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -58,6 +58,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -17,6 +17,7 @@ limitations under the License.
package configfiles
import (
"context"
"fmt"
"path/filepath"
@@ -31,7 +32,7 @@ import (
// Loader loads configuration from a storage layer
type Loader interface {
// Load loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be loaded
Load() (*kubeletconfig.KubeletConfiguration, error)
Load(context.Context) (*kubeletconfig.KubeletConfiguration, error)
// LoadIntoJSON loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be
// loaded. It returns the configuration as a JSON byte slice
LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error)
@@ -61,7 +62,7 @@ func NewFsLoader(fs utilfs.Filesystem, kubeletFile string) (Loader, error) {
}, nil
}
func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
func (loader *fsLoader) Load(ctx context.Context) (*kubeletconfig.KubeletConfiguration, error) {
data, err := loader.fs.ReadFile(loader.kubeletFile)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config file %q, error: %v", loader.kubeletFile, err)
@@ -72,7 +73,7 @@ func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
}
kc, err := utilcodec.DecodeKubeletConfiguration(loader.kubeletCodecs, data)
kc, err := utilcodec.DecodeKubeletConfiguration(ctx, loader.kubeletCodecs, data)
if err != nil {
return nil, err
}

View File

@@ -30,6 +30,7 @@ import (
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -38,6 +39,7 @@ const relativePath = "relative/path/test"
const kubeletFile = "kubelet"
func TestLoad(t *testing.T) {
tCtx := ktesting.Init(t)
cases := []struct {
desc string
file *string
@@ -176,7 +178,7 @@ foo: bar`),
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
kc, err := loader.Load()
kc, err := loader.Load(tCtx)
if c.strictErr && !runtime.IsStrictDecodingError(errors.Unwrap(err)) {
t.Fatalf("got error: %v, want strict decoding error", err)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package codec
import (
"context"
"encoding/json"
"fmt"
@@ -64,7 +65,7 @@ func NewKubeletconfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Enc
}
// DecodeKubeletConfiguration decodes a serialized KubeletConfiguration to the internal type.
func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) {
func DecodeKubeletConfiguration(ctx context.Context, kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) {
var (
obj runtime.Object
gvk *schema.GroupVersionKind
@@ -73,6 +74,7 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
// The UniversalDecoder runs defaulting and returns the internal type by default.
obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
logger := klog.FromContext(ctx)
// Try strict decoding first. If that fails decode with a lenient
// decoder, which has only v1beta1 registered, and log a warning.
// The lenient path is to be dropped when support for v1beta1 is dropped.
@@ -97,7 +99,7 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
return nil, fmt.Errorf("failed lenient decoding: %v", err)
}
// Continue with the v1beta1 object that was decoded leniently, but emit a warning.
klog.InfoS("Using lenient decoding as strict decoding failed", "err", err)
logger.Info("Using lenient decoding as strict decoding failed", "err", err)
}
internalKC, ok := obj.(*kubeletconfig.KubeletConfiguration)

View File

@@ -61,6 +61,7 @@ import (
_ "k8s.io/kubernetes/test/e2e/framework/metrics/init"
_ "k8s.io/kubernetes/test/e2e/framework/node/init"
_ "k8s.io/kubernetes/test/utils/format"
"k8s.io/kubernetes/test/utils/ktesting"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
@@ -158,6 +159,7 @@ func TestMain(m *testing.M) {
const rootfs = "/rootfs"
func TestE2eNode(t *testing.T) {
tCtx := ktesting.Init(t)
// Make sure we are not limited by sshd when it comes to open files
if err := rlimit.SetNumFiles(1000000); err != nil {
klog.Infof("failed to set rlimit on max file handles: %v", err)
@@ -170,7 +172,7 @@ func TestE2eNode(t *testing.T) {
}
if *runKubeletMode {
// If run-kubelet-mode is specified, only start kubelet.
services.RunKubelet(featureGates)
services.RunKubelet(tCtx, featureGates)
return
}
if *systemValidateMode {
@@ -268,7 +270,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte {
// If the services are expected to stop after test, they should monitor the test process.
// If the services are expected to keep running after test, they should not monitor the test process.
e2es = services.NewE2EServices(*stopServices)
gomega.Expect(e2es.Start(featureGates)).To(gomega.Succeed(), "should be able to start node services.")
gomega.Expect(e2es.Start(ctx, featureGates)).To(gomega.Succeed(), "should be able to start node services.")
} else {
klog.Infof("Running tests without starting services.")
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package services
import (
"context"
"flag"
"fmt"
"os"
@@ -74,13 +75,13 @@ func init() {
// RunKubelet starts kubelet and waits for termination signal. Once receives the
// termination signal, it will stop the kubelet gracefully.
func RunKubelet(featureGates map[string]bool) {
func RunKubelet(ctx context.Context, featureGates map[string]bool) {
var err error
// Enable monitorParent to make sure kubelet will receive termination signal
// when test process exits.
e := NewE2EServices(true /* monitorParent */)
defer e.Stop()
e.kubelet, err = e.startKubelet(featureGates)
e.kubelet, err = e.startKubelet(ctx, featureGates)
if err != nil {
klog.Fatalf("Failed to start kubelet: %v", err)
}
@@ -96,7 +97,7 @@ const (
// Health check url of kubelet
var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
func baseKubeConfiguration(cfgPath string) (*kubeletconfig.KubeletConfiguration, error) {
func baseKubeConfiguration(ctx context.Context, cfgPath string) (*kubeletconfig.KubeletConfiguration, error) {
cfgPath, err := filepath.Abs(cfgPath)
if err != nil {
return nil, err
@@ -147,12 +148,12 @@ func baseKubeConfiguration(cfgPath string) (*kubeletconfig.KubeletConfiguration,
return nil, err
}
return loader.Load()
return loader.Load(ctx)
}
// startKubelet starts the Kubelet in a separate process or returns an error
// if the Kubelet fails to start.
func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error) {
func (e *E2EServices) startKubelet(ctx context.Context, featureGates map[string]bool) (*server, error) {
klog.Info("Starting kubelet")
framework.Logf("Standalone mode: %v", framework.TestContext.StandaloneMode)
@@ -195,7 +196,7 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
if lookup != nil {
kubeletConfigFile = lookup.Value.String()
}
kc, err := baseKubeConfiguration(kubeletConfigFile)
kc, err := baseKubeConfiguration(ctx, kubeletConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to load base kubelet configuration: %w", err)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package services
import (
"context"
"fmt"
"os"
"os/exec"
@@ -60,7 +61,7 @@ func NewE2EServices(monitorParent bool) *E2EServices {
// namespace controller.
// * kubelet: kubelet binary is outside. (We plan to move main kubelet start logic out when we have
// standard kubelet launcher)
func (e *E2EServices) Start(featureGates map[string]bool) error {
func (e *E2EServices) Start(ctx context.Context, featureGates map[string]bool) error {
var err error
if e.services, err = e.startInternalServices(); err != nil {
return fmt.Errorf("failed to start internal services: %w", err)
@@ -71,7 +72,7 @@ func (e *E2EServices) Start(featureGates map[string]bool) error {
klog.Info("nothing to do in node-e2e-services, running conformance suite")
} else {
// Start kubelet
e.kubelet, err = e.startKubelet(featureGates)
e.kubelet, err = e.startKubelet(ctx, featureGates)
if err != nil {
return fmt.Errorf("failed to start kubelet: %w", err)
}