Merge pull request #129942 from bart0sh/PR171-migrate-some-kubelet-components-to-contextual-logging

Migrate kubelet/{apis,kubeletconfig,nodeshutdown,pod,preemption} to contextual logging
This commit is contained in:
Kubernetes Prow Robot
2025-07-18 20:28:25 -07:00
committed by GitHub
32 changed files with 154 additions and 100 deletions

View File

@@ -18,6 +18,7 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
@@ -45,6 +46,7 @@ func main() {
os.Exit(1)
}
ctx := context.Background()
outDir, err := genutils.OutDir(path)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get output directory: %v\n", err)
@@ -70,7 +72,7 @@ func main() {
doc.GenMarkdownTree(scheduler, outDir)
case "kubelet":
// generate docs for kubelet
kubelet := kubeletapp.NewKubeletCommand()
kubelet := kubeletapp.NewKubeletCommand(ctx)
doc.GenMarkdownTree(kubelet, outDir)
case "kubeadm":
// resets global flags created by kubelet or other commands e.g.

View File

@@ -18,6 +18,7 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
@@ -58,6 +59,7 @@ func main() {
// Set environment variables used by command so the output is consistent,
// regardless of where we run.
os.Setenv("HOME", "/home/username")
ctx := context.Background()
switch module {
case "kube-apiserver":
@@ -90,7 +92,7 @@ func main() {
}
case "kubelet":
// generate manpage for kubelet
kubelet := kubeletapp.NewKubeletCommand()
kubelet := kubeletapp.NewKubeletCommand(ctx)
genMarkdown(kubelet, "", outDir)
for _, c := range kubelet.Commands() {
genMarkdown(c, "kubelet", outDir)

View File

@@ -135,7 +135,7 @@ func init() {
}
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
func NewKubeletCommand(ctx context.Context) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(server.ComponentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
@@ -215,7 +215,7 @@ is checked every 20 seconds (also configurable with a flag).`,
// load kubelet config file, if provided
if len(kubeletFlags.KubeletConfigFile) > 0 {
kubeletConfig, err = loadConfigFile(kubeletFlags.KubeletConfigFile)
kubeletConfig, err = loadConfigFile(ctx, kubeletFlags.KubeletConfigFile)
if err != nil {
return fmt.Errorf("failed to load kubelet config file, path: %s, error: %w", kubeletFlags.KubeletConfigFile, err)
}
@@ -431,7 +431,7 @@ func kubeletConfigFlagPrecedence(kc *kubeletconfiginternal.KubeletConfiguration,
return nil
}
func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
func loadConfigFile(ctx context.Context, name string) (*kubeletconfiginternal.KubeletConfiguration, error) {
const errFmt = "failed to load Kubelet config file %s, error %v"
// compute absolute path based on current working dir
kubeletConfigFile, err := filepath.Abs(name)
@@ -442,7 +442,7 @@ func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, e
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
}
kc, err := loader.Load()
kc, err := loader.Load(ctx)
if err != nil {
return nil, fmt.Errorf(errFmt, name, err)
}
@@ -1250,13 +1250,13 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps
klog.ErrorS(err, "Failed to set rlimit on max file handles")
}
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
startKubelet(ctx, k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
return nil
}
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
func startKubelet(ctx context.Context, k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
@@ -1267,7 +1267,7 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), kubeDeps.TracerProvider)
}
go k.ListenAndServePodResources()
go k.ListenAndServePodResources(ctx)
}
func createAndInitKubelet(kubeServer *options.KubeletServer,

View File

@@ -22,6 +22,7 @@ limitations under the License.
package main
import (
"context"
"os"
"k8s.io/component-base/cli"
@@ -32,7 +33,7 @@ import (
)
func main() {
command := app.NewKubeletCommand()
command := app.NewKubeletCommand(context.Background())
code := cli.Run(command)
os.Exit(code)
}

View File

@@ -211,6 +211,11 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -225,6 +225,11 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -57,6 +57,11 @@ contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/apis/.*
contextual k8s.io/kubernetes/pkg/kubelet/kubeletconfig/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -49,9 +49,10 @@ func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor
}
// WithRateLimiter creates new rate limiter with unary interceptor.
func WithRateLimiter(serviceName string, qps, burstTokens int32) grpc.ServerOption {
func WithRateLimiter(ctx context.Context, serviceName string, qps, burstTokens int32) grpc.ServerOption {
logger := klog.FromContext(ctx)
qpsVal := gotimerate.Limit(qps)
burstVal := int(burstTokens)
klog.InfoS("Setting rate limiting for endpoint", "service", serviceName, "qps", qpsVal, "burstTokens", burstVal)
logger.Info("Setting rate limiting for endpoint", "service", serviceName, "qps", qpsVal, "burstTokens", burstVal)
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
}

View File

@@ -25,7 +25,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"k8s.io/cri-client/pkg/util"
"k8s.io/kubelet/pkg/apis/podresources/v1"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)
@@ -39,7 +39,7 @@ func GetV1alpha1Client(socket string, connectionTimeout time.Duration, maxMsgSiz
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
ctx, cancel := context.WithTimeout(context.TODO(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr,
@@ -58,7 +58,7 @@ func GetV1Client(socket string, connectionTimeout time.Duration, maxMsgSize int)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
ctx, cancel := context.WithTimeout(context.TODO(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr,

View File

@@ -43,9 +43,10 @@ type v1PodResourcesServer struct {
// NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewV1PodResourcesServer(providers PodResourcesProviders) podresourcesv1.PodResourcesListerServer {
func NewV1PodResourcesServer(ctx context.Context, providers PodResourcesProviders) podresourcesv1.PodResourcesListerServer {
logger := klog.FromContext(ctx)
useActivePods := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesListUseActivePods)
klog.InfoS("podresources", "method", "list", "useActivePods", useActivePods)
logger.Info("podresources", "method", "list", "useActivePods", useActivePods)
return &v1PodResourcesServer{
podsProvider: providers.Pods,
devicesProvider: providers.Devices,

View File

@@ -17,7 +17,6 @@ limitations under the License.
package podresources
import (
"context"
"fmt"
"sort"
"testing"
@@ -34,11 +33,13 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
pkgfeatures "k8s.io/kubernetes/pkg/features"
podresourcetest "k8s.io/kubernetes/pkg/kubelet/apis/podresources/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestListPodResourcesV1(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesDynamicResources, true)
tCtx := ktesting.Init(t)
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
@@ -240,8 +241,8 @@ func TestListPodResourcesV1(t *testing.T) {
Memory: mockMemoryProvider,
DynamicResources: mockDynamicResourcesProvider,
}
server := NewV1PodResourcesServer(providers)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
server := NewV1PodResourcesServer(tCtx, providers)
resp, err := server.List(tCtx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
@@ -293,6 +294,7 @@ func collectNamespacedNamesFromPodResources(prs []*podresourcesapi.PodResources)
}
func TestListPodResourcesUsesOnlyActivePodsV1(t *testing.T) {
tCtx := ktesting.Init(t)
numaID := int64(1)
// we abuse the fact that we don't care about the assignments,
@@ -391,8 +393,8 @@ func TestListPodResourcesUsesOnlyActivePodsV1(t *testing.T) {
Memory: mockMemoryProvider,
DynamicResources: mockDynamicResourcesProvider,
}
server := NewV1PodResourcesServer(providers)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
server := NewV1PodResourcesServer(tCtx, providers)
resp, err := server.List(tCtx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
@@ -407,7 +409,7 @@ func TestListPodResourcesUsesOnlyActivePodsV1(t *testing.T) {
func TestListPodResourcesWithInitContainersV1(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesDynamicResources, true)
tCtx := ktesting.Init(t)
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
@@ -589,8 +591,8 @@ func TestListPodResourcesWithInitContainersV1(t *testing.T) {
Memory: mockMemoryProvider,
DynamicResources: mockDynamicResourcesProvider,
}
server := NewV1PodResourcesServer(providers)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
server := NewV1PodResourcesServer(tCtx, providers)
resp, err := server.List(tCtx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
@@ -602,6 +604,7 @@ func TestListPodResourcesWithInitContainersV1(t *testing.T) {
}
func TestAllocatableResources(t *testing.T) {
tCtx := ktesting.Init(t)
allDevs := []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
@@ -878,9 +881,9 @@ func TestAllocatableResources(t *testing.T) {
Cpus: mockCPUsProvider,
Memory: mockMemoryProvider,
}
server := NewV1PodResourcesServer(providers)
server := NewV1PodResourcesServer(tCtx, providers)
resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{})
resp, err := server.GetAllocatableResources(tCtx, &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
@@ -896,6 +899,7 @@ func TestGetPodResourcesV1(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesGet, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesDynamicResources, true)
tCtx := ktesting.Init(t)
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
@@ -1047,9 +1051,9 @@ func TestGetPodResourcesV1(t *testing.T) {
Memory: mockMemoryProvider,
DynamicResources: mockDynamicResourcesProvider,
}
server := NewV1PodResourcesServer(providers)
server := NewV1PodResourcesServer(tCtx, providers)
podReq := &podresourcesapi.GetPodResourcesRequest{PodName: podName, PodNamespace: podNamespace}
resp, err := server.Get(context.TODO(), podReq)
resp, err := server.Get(tCtx, podReq)
if err != nil {
if err.Error() != tc.err.Error() {
@@ -1073,6 +1077,7 @@ func TestGetPodResourcesWithInitContainersV1(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesGet, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesDynamicResources, true)
tCtx := ktesting.Init(t)
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
@@ -1245,9 +1250,9 @@ func TestGetPodResourcesWithInitContainersV1(t *testing.T) {
Memory: mockMemoryProvider,
DynamicResources: mockDynamicResourcesProvider,
}
server := NewV1PodResourcesServer(providers)
server := NewV1PodResourcesServer(tCtx, providers)
podReq := &podresourcesapi.GetPodResourcesRequest{PodName: podName, PodNamespace: podNamespace}
resp, err := server.Get(context.TODO(), podReq)
resp, err := server.Get(tCtx, podReq)
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}

View File

@@ -17,7 +17,6 @@ limitations under the License.
package podresources
import (
"context"
"testing"
v1 "k8s.io/api/core/v1"
@@ -26,9 +25,11 @@ import (
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
podresourcetest "k8s.io/kubernetes/pkg/kubelet/apis/podresources/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestListPodResourcesV1alpha1(t *testing.T) {
tCtx := ktesting.Init(t)
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
@@ -135,7 +136,7 @@ func TestListPodResourcesV1alpha1(t *testing.T) {
Devices: mockDevicesProvider,
}
server := NewV1alpha1PodResourcesServer(providers)
resp, err := server.List(context.TODO(), &v1alpha1.ListPodResourcesRequest{})
resp, err := server.List(tCtx, &v1alpha1.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}

View File

@@ -287,7 +287,7 @@ type Bootstrap interface {
StartGarbageCollection()
ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider)
ListenAndServePodResources()
ListenAndServePodResources(ctx context.Context)
Run(<-chan kubetypes.PodUpdate)
}
@@ -2006,7 +2006,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
// Create Mirror Pod for Static Pod if it doesn't already exist
kl.tryReconcileMirrorPods(pod, mirrorPod)
kl.tryReconcileMirrorPods(ctx, pod, mirrorPod)
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
@@ -2984,7 +2984,7 @@ func (pp *kubeletPodsProvider) GetPodByName(namespace, name string) (*v1.Pod, bo
}
// ListenAndServePodResources runs the kubelet podresources grpc service
func (kl *Kubelet) ListenAndServePodResources() {
func (kl *Kubelet) ListenAndServePodResources(ctx context.Context) {
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
@@ -2999,7 +2999,7 @@ func (kl *Kubelet) ListenAndServePodResources() {
DynamicResources: kl.containerManager,
}
server.ListenAndServePodResources(endpoint, providers)
server.ListenAndServePodResources(ctx, endpoint, providers)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
@@ -3105,7 +3105,7 @@ func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) e
// Ensure Mirror Pod for Static Pod exists and matches the current pod definition.
// The function logs and ignores any errors.
func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) {
func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirrorPod *v1.Pod) {
if !kubetypes.IsStaticPod(staticPod) {
return
}
@@ -3114,9 +3114,9 @@ func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.ObjectMeta.UID)
klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.UID)
podFullName := kubecontainer.GetPodFullName(staticPod)
if ok, err := kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID); err != nil {
if ok, err := kl.mirrorPodClient.DeleteMirrorPod(ctx, podFullName, &mirrorPod.UID); err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
} else if ok {
deleted = ok
@@ -3132,7 +3132,7 @@ func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) {
klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil {
if err := kl.mirrorPodClient.CreateMirrorPod(ctx, staticPod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod))
}
}
@@ -3155,7 +3155,7 @@ func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for staticPod, mirrorPod := range staticPodToMirrorPodMap {
kl.tryReconcileMirrorPods(staticPod, mirrorPod)
kl.tryReconcileMirrorPods(ctx, staticPod, mirrorPod)
}
}

View File

@@ -1245,7 +1245,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
klog.V(3).InfoS("Clean up orphaned mirror pods")
for _, podFullname := range orphanedMirrorPodFullnames {
if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) {
_, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil)
_, err := kl.mirrorPodClient.DeleteMirrorPod(ctx, podFullname, nil)
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package configfiles
import (
"context"
"fmt"
"path/filepath"
@@ -31,7 +32,7 @@ import (
// Loader loads configuration from a storage layer
type Loader interface {
// Load loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be loaded
Load() (*kubeletconfig.KubeletConfiguration, error)
Load(context.Context) (*kubeletconfig.KubeletConfiguration, error)
// LoadIntoJSON loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be
// loaded. It returns the configuration as a JSON byte slice
LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error)
@@ -61,7 +62,7 @@ func NewFsLoader(fs utilfs.Filesystem, kubeletFile string) (Loader, error) {
}, nil
}
func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
func (loader *fsLoader) Load(ctx context.Context) (*kubeletconfig.KubeletConfiguration, error) {
data, err := loader.fs.ReadFile(loader.kubeletFile)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config file %q, error: %v", loader.kubeletFile, err)
@@ -72,7 +73,7 @@ func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
}
kc, err := utilcodec.DecodeKubeletConfiguration(loader.kubeletCodecs, data)
kc, err := utilcodec.DecodeKubeletConfiguration(ctx, loader.kubeletCodecs, data)
if err != nil {
return nil, err
}

View File

@@ -30,6 +30,7 @@ import (
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -38,6 +39,7 @@ const relativePath = "relative/path/test"
const kubeletFile = "kubelet"
func TestLoad(t *testing.T) {
tCtx := ktesting.Init(t)
cases := []struct {
desc string
file *string
@@ -176,7 +178,7 @@ foo: bar`),
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
kc, err := loader.Load()
kc, err := loader.Load(tCtx)
if c.strictErr && !runtime.IsStrictDecodingError(errors.Unwrap(err)) {
t.Fatalf("got error: %v, want strict decoding error", err)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package codec
import (
"context"
"encoding/json"
"fmt"
@@ -64,7 +65,7 @@ func NewKubeletconfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Enc
}
// DecodeKubeletConfiguration decodes a serialized KubeletConfiguration to the internal type.
func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) {
func DecodeKubeletConfiguration(ctx context.Context, kubeletCodecs *serializer.CodecFactory, data []byte) (*kubeletconfig.KubeletConfiguration, error) {
var (
obj runtime.Object
gvk *schema.GroupVersionKind
@@ -73,6 +74,7 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
// The UniversalDecoder runs defaulting and returns the internal type by default.
obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
logger := klog.FromContext(ctx)
// Try strict decoding first. If that fails decode with a lenient
// decoder, which has only v1beta1 registered, and log a warning.
// The lenient path is to be dropped when support for v1beta1 is dropped.
@@ -97,7 +99,7 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
return nil, fmt.Errorf("failed lenient decoding: %v", err)
}
// Continue with the v1beta1 object that was decoded leniently, but emit a warning.
klog.InfoS("Using lenient decoding as strict decoding failed", "err", err)
logger.Info("Using lenient decoding as strict decoding failed", "err", err)
}
internalKC, ok := obj.(*kubeletconfig.KubeletConfiguration)

View File

@@ -17,7 +17,9 @@ limitations under the License.
package lifecycle
import (
"k8s.io/api/core/v1"
"context"
v1 "k8s.io/api/core/v1"
)
// AdmissionFailureHandlerStub is an AdmissionFailureHandler that does not perform any handling of admission failure.
@@ -32,6 +34,6 @@ func NewAdmissionFailureHandlerStub() *AdmissionFailureHandlerStub {
}
// HandleAdmissionFailure simply passes admission rejection on, with no special handling.
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error) {
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(ctx context.Context, admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error) {
return failureReasons, nil
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package lifecycle
import (
"context"
"fmt"
"runtime"
@@ -92,7 +93,7 @@ type pluginResourceUpdateFuncType func(*schedulerframework.NodeInfo, *PodAdmitAt
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
// This allows for the graceful handling of pod admission failure.
type AdmissionFailureHandler interface {
HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error)
HandleAdmissionFailure(ctx context.Context, admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error)
}
type predicateAdmitHandler struct {
@@ -114,6 +115,10 @@ func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admission
}
func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
// TODO: pass context to Admit when migrating this component to
// contextual logging
ctx := context.TODO()
logger := klog.FromContext(ctx)
node, err := w.getNodeAnyWayFunc()
if err != nil {
klog.ErrorS(err, "Cannot get Node info")
@@ -158,7 +163,7 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
// ensure the node has enough plugin resources for that required in pods
if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil {
message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
logger.Info("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
return PodAdmitResult{
Admit: false,
Reason: UnexpectedAdmissionError,
@@ -179,11 +184,11 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
reasons := generalFilter(podWithoutMissingExtendedResources, nodeInfo)
fit := len(reasons) == 0
if !fit {
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(ctx, admitPod, reasons)
fit = len(reasons) == 0 && err == nil
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.InfoS("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
logger.Info("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
return PodAdmitResult{
Admit: fit,
Reason: UnexpectedAdmissionError,
@@ -196,7 +201,7 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
klog.InfoS("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
logger.Info("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
return PodAdmitResult{
Admit: fit,
Reason: UnknownReason,
@@ -209,7 +214,7 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
case *PredicateFailureError:
reason = re.PredicateName
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
logger.V(2).Info("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
case *InsufficientResourceError:
switch re.ResourceName {
case v1.ResourceCPU:
@@ -224,11 +229,11 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
reason = fmt.Sprintf("%s%s", InsufficientResourcePrefix, re.ResourceName)
}
message = re.Error()
klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
logger.V(2).Info("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
default:
reason = UnexpectedPredicateFailureType
message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
logger.Info("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
}
return PodAdmitResult{
Admit: fit,

View File

@@ -51,7 +51,7 @@ type dbusInhibiter interface {
InhibitShutdown() (systemd.InhibitLock, error)
ReleaseInhibitLock(lock systemd.InhibitLock) error
ReloadLogindConf() error
MonitorShutdown() (<-chan bool, error)
MonitorShutdown(klog.Logger) (<-chan bool, error)
OverrideInhibitDelay(inhibitDelayMax time.Duration) error
}
@@ -207,7 +207,7 @@ func (m *managerImpl) start() (chan struct{}, error) {
return nil, err
}
events, err := m.dbusCon.MonitorShutdown()
events, err := m.dbusCon.MonitorShutdown(m.logger)
if err != nil {
releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
if releaseErr != nil {

View File

@@ -37,6 +37,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init" // activate ktesting command line flags
"k8s.io/kubernetes/pkg/apis/scheduling"
@@ -81,7 +82,7 @@ func (f *fakeDbus) ReloadLogindConf() error {
return nil
}
func (f *fakeDbus) MonitorShutdown() (<-chan bool, error) {
func (f *fakeDbus) MonitorShutdown(_ klog.Logger) (<-chan bool, error) {
return f.shutdownChan, nil
}

View File

@@ -135,7 +135,7 @@ func (bus *DBusCon) ReloadLogindConf() error {
// MonitorShutdown detects the node shutdown by watching for "PrepareForShutdown" logind events.
// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details.
func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) {
func (bus *DBusCon) MonitorShutdown(logger klog.Logger) (<-chan bool, error) {
err := bus.SystemBus.AddMatchSignal(dbus.WithMatchInterface(logindInterface), dbus.WithMatchMember("PrepareForShutdown"), dbus.WithMatchObjectPath("/org/freedesktop/login1"))
if err != nil {
@@ -155,12 +155,12 @@ func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) {
return
}
if event == nil || len(event.Body) == 0 {
klog.ErrorS(nil, "Failed obtaining shutdown event, PrepareForShutdown event was empty")
logger.Error(nil, "Failed obtaining shutdown event, PrepareForShutdown event was empty")
continue
}
shutdownActive, ok := event.Body[0].(bool)
if !ok {
klog.ErrorS(nil, "Failed obtaining shutdown event, PrepareForShutdown event was not bool type as expected")
logger.Error(nil, "Failed obtaining shutdown event, PrepareForShutdown event was not bool type as expected")
continue
}
shutdownChan <- shutdownActive

View File

@@ -27,6 +27,7 @@ import (
"github.com/godbus/dbus/v5"
"github.com/stretchr/testify/assert"
"k8s.io/klog/v2/ktesting"
)
type fakeDBusObject struct {
@@ -145,6 +146,7 @@ func TestReloadLogindConf(t *testing.T) {
}
func TestMonitorShutdown(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
var tests = []struct {
desc string
shutdownActive bool
@@ -167,7 +169,7 @@ func TestMonitorShutdown(t *testing.T) {
SystemBus: fakeSystemBus,
}
outChan, err := bus.MonitorShutdown()
outChan, err := bus.MonitorShutdown(logger)
assert.NoError(t, err)
done := make(chan bool)

View File

@@ -36,10 +36,10 @@ type MirrorClient interface {
// pod or returns an error. The mirror pod will have the same annotations
// as the given pod as well as an extra annotation containing the hash of
// the static pod.
CreateMirrorPod(pod *v1.Pod) error
CreateMirrorPod(ctx context.Context, pod *v1.Pod) error
// DeleteMirrorPod deletes the mirror pod with the given full name from
// the API server or returns an error.
DeleteMirrorPod(podFullName string, uid *types.UID) (bool, error)
DeleteMirrorPod(ctx context.Context, podFullName string, uid *types.UID) (bool, error)
}
// nodeGetter is a subset of NodeLister, simplified for testing.
@@ -66,7 +66,7 @@ func NewBasicMirrorClient(apiserverClient clientset.Interface, nodeName string,
}
}
func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
func (mc *basicMirrorClient) CreateMirrorPod(ctx context.Context, pod *v1.Pod) error {
if mc.apiserverClient == nil {
return nil
}
@@ -96,7 +96,7 @@ func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
Controller: &controller,
}}
apiPod, err := mc.apiserverClient.CoreV1().Pods(copyPod.Namespace).Create(context.TODO(), &copyPod, metav1.CreateOptions{})
apiPod, err := mc.apiserverClient.CoreV1().Pods(copyPod.Namespace).Create(ctx, &copyPod, metav1.CreateOptions{})
if err != nil && apierrors.IsAlreadyExists(err) {
// Check if the existing pod is the same as the pod we want to create.
if h, ok := apiPod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok && h == hash {
@@ -113,13 +113,14 @@ func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
// while parsing the name of the pod.
// Non-existence of the pod or UID mismatch is not treated as an error; the
// routine simply returns false in that case.
func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string, uid *types.UID) (bool, error) {
func (mc *basicMirrorClient) DeleteMirrorPod(ctx context.Context, podFullName string, uid *types.UID) (bool, error) {
if mc.apiserverClient == nil {
return false, nil
}
logger := klog.FromContext(ctx)
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
klog.ErrorS(err, "Failed to parse a pod full name", "podFullName", podFullName)
logger.Error(err, "Failed to parse a pod full name", "podFullName", podFullName)
return false, err
}
@@ -127,15 +128,15 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string, uid *types.UID)
if uid != nil {
uidValue = *uid
}
klog.V(2).InfoS("Deleting a mirror pod", "pod", klog.KRef(namespace, name), "podUID", uidValue)
logger.V(2).Info("Deleting a mirror pod", "pod", klog.KRef(namespace, name), "podUID", uidValue)
var GracePeriodSeconds int64
if err := mc.apiserverClient.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{GracePeriodSeconds: &GracePeriodSeconds, Preconditions: &metav1.Preconditions{UID: uid}}); err != nil {
if err := mc.apiserverClient.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{GracePeriodSeconds: &GracePeriodSeconds, Preconditions: &metav1.Preconditions{UID: uid}}); err != nil {
// Unfortunately, there's no generic error for failing a precondition
if !(apierrors.IsNotFound(err) || apierrors.IsConflict(err)) {
// We should return the error here, but historically this routine does
// not return an error unless it can't parse the pod name
klog.ErrorS(err, "Failed deleting a mirror pod", "pod", klog.KRef(namespace, name))
logger.Error(err, "Failed deleting a mirror pod", "pod", klog.KRef(namespace, name))
}
return false, nil
}

View File

@@ -17,7 +17,6 @@ limitations under the License.
package pod
import (
"context"
"errors"
"testing"
@@ -29,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -64,6 +64,7 @@ func TestParsePodFullName(t *testing.T) {
}
func TestCreateMirrorPod(t *testing.T) {
tCtx := ktesting.Init(t)
const (
testNodeName = "test-node-name"
testNodeUID = types.UID("test-node-uid-1234")
@@ -120,13 +121,13 @@ func TestCreateMirrorPod(t *testing.T) {
},
}
err := mc.CreateMirrorPod(pod)
err := mc.CreateMirrorPod(tCtx, pod)
if !test.expectSuccess {
assert.Error(t, err)
return
}
createdPod, err := clientset.CoreV1().Pods(testPodNS).Get(context.TODO(), testPodName, metav1.GetOptions{})
createdPod, err := clientset.CoreV1().Pods(testPodNS).Get(tCtx, testPodName, metav1.GetOptions{})
require.NoError(t, err)
// Validate created pod

View File

@@ -17,6 +17,7 @@ limitations under the License.
package testing
import (
"context"
"sync"
v1 "k8s.io/api/core/v1"
@@ -42,7 +43,7 @@ func NewFakeMirrorClient() *FakeMirrorClient {
return &m
}
func (fmc *FakeMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
func (fmc *FakeMirrorClient) CreateMirrorPod(_ context.Context, pod *v1.Pod) error {
fmc.mirrorPodLock.Lock()
defer fmc.mirrorPodLock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
@@ -52,7 +53,7 @@ func (fmc *FakeMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
}
// TODO (Robert Krawitz): Implement UID checking
func (fmc *FakeMirrorClient) DeleteMirrorPod(podFullName string, _ *types.UID) (bool, error) {
func (fmc *FakeMirrorClient) DeleteMirrorPod(_ context.Context, podFullName string, _ *types.UID) (bool, error) {
fmc.mirrorPodLock.Lock()
defer fmc.mirrorPodLock.Unlock()
fmc.mirrorPods.Delete(podFullName)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package preemption
import (
"context"
"fmt"
"math"
@@ -60,7 +61,7 @@ func NewCriticalPodAdmissionHandler(getPodsFunc eviction.ActivePodsFunc, killPod
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(ctx context.Context, admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
if !kubetypes.IsCriticalPod(admitPod) {
return failureReasons, nil
}
@@ -82,7 +83,7 @@ func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, f
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
return nonResourceReasons, nil
}
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
err := c.evictPodsToFreeRequests(ctx, admitPod, admissionRequirementList(resourceReasons))
// if no error is returned, preemption succeeded and the pod is safe to admit.
return nil, err
}
@@ -90,7 +91,8 @@ func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, f
// evictPodsToFreeRequests takes a list of insufficient resources, and attempts to free them by evicting pods
// based on requests. For example, if the only insufficient resource is 200Mb of memory, this function could
// evict a pod with request=250Mb.
func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod, insufficientResources admissionRequirementList) error {
func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(ctx context.Context, admitPod *v1.Pod, insufficientResources admissionRequirementList) error {
logger := klog.FromContext(ctx)
podsToPreempt, err := getPodsToPreempt(admitPod, c.getPodsFunc(), insufficientResources)
if err != nil {
return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err)
@@ -99,7 +101,7 @@ func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod,
// record that we are evicting the pod
c.recorder.Eventf(pod, v1.EventTypeWarning, events.PreemptContainer, message)
// this is a blocking call and should only return when the pod and its containers are killed.
klog.V(2).InfoS("Preempting pod to free up resources", "pod", klog.KObj(pod), "podUID", pod.UID, "insufficientResources", insufficientResources.toString(), "requestingPod", klog.KObj(admitPod))
logger.V(2).Info("Preempting pod to free up resources", "pod", klog.KObj(pod), "podUID", pod.UID, "insufficientResources", insufficientResources.toString(), "requestingPod", klog.KObj(admitPod))
err := c.killPodFunc(pod, true, nil, func(status *v1.PodStatus) {
status.Phase = v1.PodFailed
status.Reason = events.PreemptContainer
@@ -113,7 +115,7 @@ func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod,
})
})
if err != nil {
klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(pod))
logger.Error(err, "Failed to evict pod", "pod", klog.KObj(pod))
// In future syncPod loops, the kubelet will retry the pod deletion steps that it was stuck on.
continue
}
@@ -122,7 +124,7 @@ func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod,
} else {
metrics.Preemptions.WithLabelValues("").Inc()
}
klog.InfoS("Pod evicted successfully", "pod", klog.KObj(pod))
logger.Info("Pod evicted successfully", "pod", klog.KObj(pod))
}
return nil
}

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/client-go/tools/record"
kubeapi "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@@ -91,6 +92,7 @@ func getTestCriticalPodAdmissionHandler(podProvider *fakePodProvider, podKiller
}
func TestEvictPodsToFreeRequests(t *testing.T) {
tCtx := ktesting.Init(t)
type testRun struct {
testName string
isPodKillerWithError bool
@@ -144,7 +146,7 @@ func TestEvictPodsToFreeRequests(t *testing.T) {
podKiller := newFakePodKiller(r.isPodKillerWithError)
criticalPodAdmissionHandler := getTestCriticalPodAdmissionHandler(podProvider, podKiller)
podProvider.setPods(r.inputPods)
outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(allPods[clusterCritical], r.insufficientResources)
outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(tCtx, allPods[clusterCritical], r.insufficientResources)
outputPods := podKiller.getKilledPods()
if !r.expectErr && outErr != nil {
t.Errorf("evictPodsToFreeRequests returned an unexpected error during the %s test. Err: %v", r.testName, outErr)

View File

@@ -234,11 +234,11 @@ func ListenAndServeKubeletReadOnlyServer(
}
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(endpoint string, providers podresources.PodResourcesProviders) {
server := grpc.NewServer(apisgrpc.WithRateLimiter("podresources", podresources.DefaultQPS, podresources.DefaultBurstTokens))
func ListenAndServePodResources(ctx context.Context, endpoint string, providers podresources.PodResourcesProviders) {
server := grpc.NewServer(apisgrpc.WithRateLimiter(ctx, "podresources", podresources.DefaultQPS, podresources.DefaultBurstTokens))
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(ctx, providers))
l, err := util.CreateListener(endpoint)
if err != nil {

View File

@@ -61,6 +61,7 @@ import (
_ "k8s.io/kubernetes/test/e2e/framework/metrics/init"
_ "k8s.io/kubernetes/test/e2e/framework/node/init"
_ "k8s.io/kubernetes/test/utils/format"
"k8s.io/kubernetes/test/utils/ktesting"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
@@ -158,6 +159,7 @@ func TestMain(m *testing.M) {
const rootfs = "/rootfs"
func TestE2eNode(t *testing.T) {
tCtx := ktesting.Init(t)
// Make sure we are not limited by sshd when it comes to open files
if err := rlimit.SetNumFiles(1000000); err != nil {
klog.Infof("failed to set rlimit on max file handles: %v", err)
@@ -170,7 +172,7 @@ func TestE2eNode(t *testing.T) {
}
if *runKubeletMode {
// If run-kubelet-mode is specified, only start kubelet.
services.RunKubelet(featureGates)
services.RunKubelet(tCtx, featureGates)
return
}
if *systemValidateMode {
@@ -268,7 +270,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte {
// If the services are expected to stop after test, they should monitor the test process.
// If the services are expected to keep running after test, they should not monitor the test process.
e2es = services.NewE2EServices(*stopServices)
gomega.Expect(e2es.Start(featureGates)).To(gomega.Succeed(), "should be able to start node services.")
gomega.Expect(e2es.Start(ctx, featureGates)).To(gomega.Succeed(), "should be able to start node services.")
} else {
klog.Infof("Running tests without starting services.")
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package services
import (
"context"
"flag"
"fmt"
"os"
@@ -74,13 +75,13 @@ func init() {
// RunKubelet starts kubelet and waits for termination signal. Once receives the
// termination signal, it will stop the kubelet gracefully.
func RunKubelet(featureGates map[string]bool) {
func RunKubelet(ctx context.Context, featureGates map[string]bool) {
var err error
// Enable monitorParent to make sure kubelet will receive termination signal
// when test process exits.
e := NewE2EServices(true /* monitorParent */)
defer e.Stop()
e.kubelet, err = e.startKubelet(featureGates)
e.kubelet, err = e.startKubelet(ctx, featureGates)
if err != nil {
klog.Fatalf("Failed to start kubelet: %v", err)
}
@@ -96,7 +97,7 @@ const (
// Health check url of kubelet
var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
func baseKubeConfiguration(cfgPath string) (*kubeletconfig.KubeletConfiguration, error) {
func baseKubeConfiguration(ctx context.Context, cfgPath string) (*kubeletconfig.KubeletConfiguration, error) {
cfgPath, err := filepath.Abs(cfgPath)
if err != nil {
return nil, err
@@ -147,12 +148,12 @@ func baseKubeConfiguration(cfgPath string) (*kubeletconfig.KubeletConfiguration,
return nil, err
}
return loader.Load()
return loader.Load(ctx)
}
// startKubelet starts the Kubelet in a separate process or returns an error
// if the Kubelet fails to start.
func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error) {
func (e *E2EServices) startKubelet(ctx context.Context, featureGates map[string]bool) (*server, error) {
klog.Info("Starting kubelet")
framework.Logf("Standalone mode: %v", framework.TestContext.StandaloneMode)
@@ -195,7 +196,7 @@ func (e *E2EServices) startKubelet(featureGates map[string]bool) (*server, error
if lookup != nil {
kubeletConfigFile = lookup.Value.String()
}
kc, err := baseKubeConfiguration(kubeletConfigFile)
kc, err := baseKubeConfiguration(ctx, kubeletConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to load base kubelet configuration: %w", err)
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package services
import (
"context"
"fmt"
"os"
"os/exec"
@@ -60,7 +61,7 @@ func NewE2EServices(monitorParent bool) *E2EServices {
// namespace controller.
// * kubelet: kubelet binary is outside. (We plan to move main kubelet start logic out when we have
// standard kubelet launcher)
func (e *E2EServices) Start(featureGates map[string]bool) error {
func (e *E2EServices) Start(ctx context.Context, featureGates map[string]bool) error {
var err error
if e.services, err = e.startInternalServices(); err != nil {
return fmt.Errorf("failed to start internal services: %w", err)
@@ -71,7 +72,7 @@ func (e *E2EServices) Start(featureGates map[string]bool) error {
klog.Info("nothing to do in node-e2e-services, running conformance suite")
} else {
// Start kubelet
e.kubelet, err = e.startKubelet(featureGates)
e.kubelet, err = e.startKubelet(ctx, featureGates)
if err != nil {
return fmt.Errorf("failed to start kubelet: %w", err)
}