From d868eeb0889b85b7348352a80ab606d07bdd26a1 Mon Sep 17 00:00:00 2001 From: Chulong Li Date: Sun, 16 Mar 2025 20:34:09 +0800 Subject: [PATCH] Migrate pkg/kubelet/status to contextual logging Update with logger from context Fix nil pointer issue in TestUpdateReadiness --- hack/golangci-hints.yaml | 1 + hack/golangci.yaml | 1 + hack/logcheck.conf | 1 + pkg/kubelet/kubelet.go | 24 +- pkg/kubelet/kubelet_pods.go | 2 +- pkg/kubelet/kubelet_pods_test.go | 7 +- pkg/kubelet/kubelet_test.go | 8 +- pkg/kubelet/prober/prober_manager_test.go | 11 +- pkg/kubelet/prober/scale_test.go | 4 +- pkg/kubelet/prober/worker_test.go | 57 ++--- pkg/kubelet/status/status_manager.go | 156 ++++++------ pkg/kubelet/status/status_manager_test.go | 282 ++++++++++++---------- 12 files changed, 307 insertions(+), 247 deletions(-) diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index c2ce0914f78..84a5bb63d5e 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -177,6 +177,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* + contextual k8s.io/kubernetes/pkg/kubelet/status/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, diff --git a/hack/golangci.yaml b/hack/golangci.yaml index 5daa137e02a..d0f05d84384 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -243,6 +243,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* + contextual k8s.io/kubernetes/pkg/kubelet/status/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 82507bd11ae..dfa502327b4 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -55,6 +55,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* +contextual k8s.io/kubernetes/pkg/kubelet/status/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b327a713b9b..383c1d4cd7f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1809,7 +1809,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } // Start component sync loops. - kl.statusManager.Start() + kl.statusManager.Start(ctx) // Start syncing RuntimeClasses if enabled. if kl.runtimeClassManager != nil { @@ -1888,6 +1888,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType attribute.String("k8s.pod.update_type", updateType.String()), semconv.K8SNamespaceNameKey.String(pod.Namespace), )) + logger := klog.FromContext(ctx) klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer func() { if err != nil { @@ -1948,7 +1949,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // If the pod is terminal, we don't need to continue to setup the pod if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed { - kl.statusManager.SetPodStatus(pod, apiPodStatus) + kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) isTerminal = true return isTerminal, nil } @@ -1961,7 +1962,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime)) } - kl.statusManager.SetPodStatus(pod, apiPodStatus) + kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) // If the network plugin is not ready, only start the pod if it uses the host network if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { @@ -2099,6 +2100,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus semconv.K8SPodNameKey.String(pod.Name), semconv.K8SNamespaceNameKey.String(pod.Namespace), )) + logger := klog.FromContext(ctx) defer otelSpan.End() klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -2112,7 +2114,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus if podStatusFn != nil { podStatusFn(&apiPodStatus) } - kl.statusManager.SetPodStatus(pod, apiPodStatus) + kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) if gracePeriod != nil { klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) @@ -2187,7 +2189,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus // information about the container end states (including exit codes) - when // SyncTerminatedPod is called the containers may already be removed. apiPodStatus = kl.generateAPIPodStatus(pod, stoppedPodStatus, true) - kl.statusManager.SetPodStatus(pod, apiPodStatus) + kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) // we have successfully stopped all containers, the pod is terminating, our status is "done" klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -2249,6 +2251,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus semconv.K8SPodNameKey.String(pod.Name), semconv.K8SNamespaceNameKey.String(pod.Namespace), )) + logger := klog.FromContext(ctx) defer otelSpan.End() klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -2262,7 +2265,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus // TODO: should we simply fold this into TerminatePod? that would give a single pod update apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, true) - kl.statusManager.SetPodStatus(pod, apiPodStatus) + kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) // volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied // before syncTerminatedPod is invoked) @@ -2309,7 +2312,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus kl.usernsManager.Release(pod.UID) // mark the final pod status - kl.statusManager.TerminatePod(pod) + kl.statusManager.TerminatePod(logger, pod) klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID) return nil @@ -2379,7 +2382,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { // and updates the pod to the failed phase in the status manager. func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message) - kl.statusManager.SetPodStatus(pod, v1.PodStatus{ + kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ QOSClass: v1qos.GetPodQOS(pod), // keep it as is Phase: v1.PodFailed, Reason: reason, @@ -2507,6 +2510,7 @@ func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpd // containers have failed health checks func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { + logger := klog.FromContext(ctx) select { case u, open := <-configCh: // Update from a config source; dispatch it to the right handler @@ -2577,7 +2581,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety } case update := <-kl.readinessManager.Updates(): ready := update.Result == proberesults.Success - kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) + kl.statusManager.SetContainerReadiness(logger, update.PodUID, update.ContainerID, ready) status := "not ready" if ready { @@ -2586,7 +2590,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety handleProbeSync(kl, update, handler, "readiness", status) case update := <-kl.startupManager.Updates(): started := update.Result == proberesults.Success - kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) + kl.statusManager.SetContainerStartup(logger, update.PodUID, update.ContainerID, started) status := "unhealthy" if started { diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index b49afb38668..501815a4600 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1113,7 +1113,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po for _, pod := range mirrorPods { podUIDs[pod.UID] = true } - kl.statusManager.RemoveOrphanedStatuses(podUIDs) + kl.statusManager.RemoveOrphanedStatuses(klog.TODO(), podUIDs) } // HandlePodCleanups performs a series of cleanup work, including terminating diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 04ee389c6c5..1938f051dca 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -47,6 +47,7 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/kubelet/pkg/cri/streaming/portforward" "k8s.io/kubelet/pkg/cri/streaming/remotecommand" _ "k8s.io/kubernetes/pkg/apis/core/install" @@ -3800,11 +3801,12 @@ func Test_generateAPIPodStatus(t *testing.T) { for _, test := range tests { for _, enablePodReadyToStartContainersCondition := range []bool{false, true} { t.Run(test.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodReadyToStartContainersCondition, enablePodReadyToStartContainersCondition) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet - kl.statusManager.SetPodStatus(test.pod, test.previousStatus) + kl.statusManager.SetPodStatus(logger, test.pod, test.previousStatus) for _, name := range test.unreadyContainer { kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod) } @@ -3921,12 +3923,13 @@ func Test_generateAPIPodStatusForInPlaceVPAEnabled(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet oldStatus := test.pod.Status - kl.statusManager.SetPodStatus(test.pod, oldStatus) + kl.statusManager.SetPodStatus(logger, test.pod, oldStatus) actual := kl.generateAPIPodStatus(test.pod, &testKubecontainerPodStatus /* criStatus */, false /* test.isPodTerminal */) for _, c := range actual.Conditions { if c.Type == v1.PodResizePending || c.Type == v1.PodResizeInProgress { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 58c315dcb1e..e98abe81b62 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1523,6 +1523,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { } func TestFilterOutInactivePods(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -1550,7 +1551,7 @@ func TestFilterOutInactivePods(t *testing.T) { // pod that is running but has been rejected by admission is excluded pods[5].Status.Phase = v1.PodRunning - kubelet.statusManager.SetPodStatus(pods[5], v1.PodStatus{Phase: v1.PodFailed}) + kubelet.statusManager.SetPodStatus(logger, pods[5], v1.PodStatus{Phase: v1.PodFailed}) // pod that is running according to the api but is known terminated is excluded pods[6].Status.Phase = v1.PodRunning @@ -1839,6 +1840,7 @@ func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod, } func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1850,8 +1852,8 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) { syncAndVerifyPodDir(t, testKubelet, pods, pods, true) // Pod 1 failed, and pod 2 succeeded. None of the pod directories should be // deleted. - kl.statusManager.SetPodStatus(pods[1], v1.PodStatus{Phase: v1.PodFailed}) - kl.statusManager.SetPodStatus(pods[2], v1.PodStatus{Phase: v1.PodSucceeded}) + kl.statusManager.SetPodStatus(logger, pods[1], v1.PodStatus{Phase: v1.PodFailed}) + kl.statusManager.SetPodStatus(logger, pods[2], v1.PodStatus{Phase: v1.PodSucceeded}) syncAndVerifyPodDir(t, testKubelet, pods, pods, true) } diff --git a/pkg/kubelet/prober/prober_manager_test.go b/pkg/kubelet/prober/prober_manager_test.go index 0f714e6a255..b66d3460f45 100644 --- a/pkg/kubelet/prober/prober_manager_test.go +++ b/pkg/kubelet/prober/prober_manager_test.go @@ -27,9 +27,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/test/utils/ktesting" ) func init() { @@ -547,14 +549,15 @@ func TestUpdatePodStatusWithInitContainers(t *testing.T) { } } -func (m *manager) extractedReadinessHandling() { +func (m *manager) extractedReadinessHandling(logger klog.Logger) { update := <-m.readinessManager.Updates() // This code corresponds to an extract from kubelet.syncLoopIteration() ready := update.Result == results.Success - m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) + m.statusManager.SetContainerReadiness(logger, update.PodUID, update.ContainerID, ready) } func TestUpdateReadiness(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testPod := getTestPod() setTestProbe(testPod, readiness, v1.Probe{}) m := newTestManager() @@ -562,7 +565,7 @@ func TestUpdateReadiness(t *testing.T) { // Start syncing readiness without leaking goroutine. stopCh := make(chan struct{}) - go wait.Until(m.extractedReadinessHandling, 0, stopCh) + go wait.Until(func() { m.extractedReadinessHandling(logger) }, 0, stopCh) defer func() { close(stopCh) // Send an update to exit extractedReadinessHandling() @@ -573,7 +576,7 @@ func TestUpdateReadiness(t *testing.T) { exec.set(probe.Success, nil) m.prober.exec = &exec - m.statusManager.SetPodStatus(testPod, getTestRunningStatus()) + m.statusManager.SetPodStatus(logger, testPod, getTestRunningStatus()) m.AddPod(testPod) probePaths := []probeKey{{testPodUID, testContainerName, readiness}} diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go index 35c1da6f427..6cef4cc5958 100644 --- a/pkg/kubelet/prober/scale_test.go +++ b/pkg/kubelet/prober/scale_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/status" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" ) @@ -80,6 +81,7 @@ func TestTCPPortExhaustion(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() m := NewManager( @@ -133,7 +135,7 @@ func TestTCPPortExhaustion(t *testing.T) { }) } podManager.AddPod(&pod) - m.statusManager.SetPodStatus(&pod, pod.Status) + m.statusManager.SetPodStatus(logger, &pod, pod.Status) m.AddPod(&pod) } t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now)) diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 3e1fa335a16..ec8f8673ec5 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -17,7 +17,6 @@ limitations under the License. package prober import ( - "context" "fmt" "testing" "time" @@ -32,12 +31,14 @@ import ( statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/test/utils/ktesting" ) func init() { } func TestDoProbe(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() for _, probeType := range [...]probeType{liveness, readiness, startup} { @@ -131,10 +132,9 @@ func TestDoProbe(t *testing.T) { } for i, test := range tests { - ctx := context.Background() w := newTestWorker(m, probeType, test.probe) if test.podStatus != nil { - m.statusManager.SetPodStatus(w.pod, *test.podStatus) + m.statusManager.SetPodStatus(logger, w.pod, *test.podStatus) } if test.setDeletionTimestamp { now := metav1.Now() @@ -158,14 +158,14 @@ func TestDoProbe(t *testing.T) { } func TestInitialDelay(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() for _, probeType := range [...]probeType{liveness, readiness, startup} { w := newTestWorker(m, probeType, v1.Probe{ InitialDelaySeconds: 10, }) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(probeType != startup)) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(probeType != startup)) expectContinue(t, w, w.doProbe(ctx), "during initial delay") // Default value depends on probe, Success for liveness, Failure for readiness, Unknown for startup @@ -182,7 +182,7 @@ func TestInitialDelay(t *testing.T) { laterStatus := getTestRunningStatusWithStarted(probeType != startup) laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = time.Now().Add(-100 * time.Second) - m.statusManager.SetPodStatus(w.pod, laterStatus) + m.statusManager.SetPodStatus(logger, w.pod, laterStatus) // Second call should succeed (already waited). expectContinue(t, w, w.doProbe(ctx), "after initial delay") @@ -191,10 +191,10 @@ func TestInitialDelay(t *testing.T) { } func TestFailureThreshold(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() w := newTestWorker(m, readiness, v1.Probe{SuccessThreshold: 1, FailureThreshold: 3}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatus()) for i := 0; i < 2; i++ { // First probe should succeed. @@ -226,10 +226,10 @@ func TestFailureThreshold(t *testing.T) { } func TestSuccessThreshold(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() w := newTestWorker(m, readiness, v1.Probe{SuccessThreshold: 3, FailureThreshold: 1}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatus()) // Start out failure. w.resultsManager.Set(testContainerID, results.Failure, &v1.Pod{}) @@ -261,12 +261,12 @@ func TestSuccessThreshold(t *testing.T) { } func TestStartupProbeSuccessThreshold(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() successThreshold := 1 failureThreshold := 3 w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: int32(successThreshold), FailureThreshold: int32(failureThreshold)}) - m.statusManager.SetPodStatus(w.pod, getTestNotRunningStatus()) + m.statusManager.SetPodStatus(logger, w.pod, getTestNotRunningStatus()) m.prober.exec = fakeExecProber{probe.Success, nil} for i := 0; i < successThreshold+1; i++ { @@ -294,12 +294,12 @@ func TestStartupProbeSuccessThreshold(t *testing.T) { } func TestStartupProbeFailureThreshold(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() successThreshold := 1 failureThreshold := 3 w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: int32(successThreshold), FailureThreshold: int32(failureThreshold)}) - m.statusManager.SetPodStatus(w.pod, getTestNotRunningStatus()) + m.statusManager.SetPodStatus(logger, w.pod, getTestNotRunningStatus()) m.prober.exec = fakeExecProber{probe.Failure, nil} for i := 0; i < failureThreshold+1; i++ { @@ -346,12 +346,13 @@ func TestStartupProbeFailureThreshold(t *testing.T) { } func TestCleanUp(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) m := newTestManager() for _, probeType := range [...]probeType{liveness, readiness, startup} { key := probeKey{testPodUID, testContainerName, probeType} w := newTestWorker(m, probeType, v1.Probe{}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(probeType != startup)) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(probeType != startup)) go w.run() m.workers[key] = w @@ -411,13 +412,13 @@ func resultsManager(m *manager, probeType probeType) results.Manager { } func TestOnHoldOnLivenessOrStartupCheckFailure(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() for _, probeType := range [...]probeType{liveness, startup} { w := newTestWorker(m, probeType, v1.Probe{SuccessThreshold: 1, FailureThreshold: 1}) status := getTestRunningStatusWithStarted(probeType != startup) - m.statusManager.SetPodStatus(w.pod, status) + m.statusManager.SetPodStatus(logger, w.pod, status) // First probe should fail. m.prober.exec = fakeExecProber{probe.Failure, nil} @@ -439,7 +440,7 @@ func TestOnHoldOnLivenessOrStartupCheckFailure(t *testing.T) { // Set a new container ID to lift the hold. The next probe will succeed. status.ContainerStatuses[0].ContainerID = "test://newCont_ID" - m.statusManager.SetPodStatus(w.pod, status) + m.statusManager.SetPodStatus(logger, w.pod, status) msg = "hold lifted" expectContinue(t, w, w.doProbe(ctx), msg) expectResult(t, w, results.Success, msg) @@ -452,10 +453,10 @@ func TestOnHoldOnLivenessOrStartupCheckFailure(t *testing.T) { } func TestResultRunOnLivenessCheckFailure(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() w := newTestWorker(m, liveness, v1.Probe{SuccessThreshold: 1, FailureThreshold: 3}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatus()) m.prober.exec = fakeExecProber{probe.Success, nil} msg := "initial probe success" @@ -494,10 +495,10 @@ func TestResultRunOnLivenessCheckFailure(t *testing.T) { } func TestResultRunOnStartupCheckFailure(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: 1, FailureThreshold: 3}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(false)) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(false)) // Below FailureThreshold leaves probe state unchanged // which is failed for startup at first. @@ -530,17 +531,17 @@ func TestResultRunOnStartupCheckFailure(t *testing.T) { } func TestLivenessProbeDisabledByStarted(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() w := newTestWorker(m, liveness, v1.Probe{SuccessThreshold: 1, FailureThreshold: 1}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(false)) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(false)) // livenessProbe fails, but is disabled m.prober.exec = fakeExecProber{probe.Failure, nil} msg := "Not started, probe failure, result success" expectContinue(t, w, w.doProbe(ctx), msg) expectResult(t, w, results.Success, msg) // setting started state - m.statusManager.SetContainerStartup(w.pod.UID, w.containerID, true) + m.statusManager.SetContainerStartup(logger, w.pod.UID, w.containerID, true) // livenessProbe fails m.prober.exec = fakeExecProber{probe.Failure, nil} msg = "Started, probe failure, result failure" @@ -549,10 +550,10 @@ func TestLivenessProbeDisabledByStarted(t *testing.T) { } func TestStartupProbeDisabledByStarted(t *testing.T) { - ctx := context.Background() + logger, ctx := ktesting.NewTestContext(t) m := newTestManager() w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: 1, FailureThreshold: 2}) - m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(false)) + m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(false)) // startupProbe fails < FailureThreshold, stays unknown m.prober.exec = fakeExecProber{probe.Failure, nil} msg := "Not started, probe failure, result unknown" @@ -564,7 +565,7 @@ func TestStartupProbeDisabledByStarted(t *testing.T) { expectContinue(t, w, w.doProbe(ctx), msg) expectResult(t, w, results.Success, msg) // setting started state - m.statusManager.SetContainerStartup(w.pod.UID, w.containerID, true) + m.statusManager.SetContainerStartup(logger, w.pod.UID, w.containerID, true) // startupProbe fails, but is disabled m.prober.exec = fakeExecProber{probe.Failure, nil} msg = "Started, probe failure, result success" diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index c86bef17ff3..69a423aaa65 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -131,26 +131,26 @@ type Manager interface { PodStatusProvider // Start the API server status sync loop. - Start() + Start(ctx context.Context) // SetPodStatus caches updates the cached status for the given pod, and triggers a status update. - SetPodStatus(pod *v1.Pod, status v1.PodStatus) + SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) // SetContainerReadiness updates the cached container status with the given readiness, and // triggers a status update. - SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) + SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool) // SetContainerStartup updates the cached container status with the given startup, and // triggers a status update. - SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) + SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool) // TerminatePod resets the container status for the provided pod to terminated and triggers // a status update. - TerminatePod(pod *v1.Pod) + TerminatePod(logger klog.Logger, pod *v1.Pod) // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // the provided podUIDs. - RemoveOrphanedStatuses(podUIDs map[types.UID]bool) + RemoveOrphanedStatuses(logger klog.Logger, podUIDs map[types.UID]bool) // GetPodResizeConditions returns cached PodStatus Resize conditions value GetPodResizeConditions(podUID types.UID) []*v1.PodCondition @@ -218,16 +218,17 @@ func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool { return apiequality.Semantic.DeepEqual(oldCopy, status) } -func (m *manager) Start() { +func (m *manager) Start(ctx context.Context) { + logger := klog.FromContext(ctx) // Don't start the status manager if we don't have a client. This will happen // on the master, where the kubelet is responsible for bootstrapping the pods // of the master components. if m.kubeClient == nil { - klog.InfoS("Kubernetes client is nil, not starting status manager") + logger.Info("Kubernetes client is nil, not starting status manager") return } - klog.InfoS("Starting to sync pod status with apiserver") + logger.Info("Starting to sync pod status with apiserver") //nolint:staticcheck // SA1015 Ticker can leak since this is only called once and doesn't handle termination. syncTicker := time.NewTicker(syncPeriod).C @@ -237,11 +238,11 @@ func (m *manager) Start() { for { select { case <-m.podStatusChannel: - klog.V(4).InfoS("Syncing updated statuses") - m.syncBatch(false) + logger.V(4).Info("Syncing updated statuses") + m.syncBatch(ctx, false) case <-syncTicker: - klog.V(4).InfoS("Syncing all statuses") - m.syncBatch(true) + logger.V(4).Info("Syncing all statuses") + m.syncBatch(ctx, true) } } }, 0) @@ -315,7 +316,7 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { return status.status, ok } -func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { +func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() @@ -328,22 +329,22 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { // Force a status update if deletion timestamp is set. This is necessary // because if the pod is in the non-running state, the pod worker still // needs to be able to trigger an update and/or deletion. - m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil, false) + m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false) } -func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { +func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() pod, ok := m.podManager.GetPodByUID(podUID) if !ok { - klog.V(4).InfoS("Pod has been deleted, no need to update readiness", "podUID", string(podUID)) + logger.V(4).Info("Pod has been deleted, no need to update readiness", "podUID", podUID) return } oldStatus, found := m.podStatuses[pod.UID] if !found { - klog.InfoS("Container readiness changed before pod has synced", + logger.Info("Container readiness changed before pod has synced", "pod", klog.KObj(pod), "containerID", containerID.String()) return @@ -352,14 +353,14 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai // Find the container to update. containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String()) if !ok { - klog.InfoS("Container readiness changed for unknown container", + logger.Info("Container readiness changed for unknown container", "pod", klog.KObj(pod), "containerID", containerID.String()) return } if containerStatus.Ready == ready { - klog.V(4).InfoS("Container readiness unchanged", + logger.V(4).Info("Container readiness unchanged", "ready", ready, "pod", klog.KObj(pod), "containerID", containerID.String()) @@ -383,7 +384,7 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai if conditionIndex != -1 { status.Conditions[conditionIndex] = condition } else { - klog.InfoS("PodStatus missing condition type", "conditionType", conditionType, "status", status) + logger.Info("PodStatus missing condition type", "conditionType", conditionType, "status", status) status.Conditions = append(status.Conditions, condition) } } @@ -391,22 +392,22 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai allContainerStatuses := append(status.InitContainerStatuses, status.ContainerStatuses...) updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(pod, &oldStatus.status, status.Conditions, allContainerStatuses, status.Phase)) updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(pod, &oldStatus.status, allContainerStatuses, status.Phase)) - m.updateStatusInternal(pod, status, false, false) + m.updateStatusInternal(logger, pod, status, false, false) } -func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) { +func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() pod, ok := m.podManager.GetPodByUID(podUID) if !ok { - klog.V(4).InfoS("Pod has been deleted, no need to update startup", "podUID", string(podUID)) + logger.V(4).Info("Pod has been deleted, no need to update startup", "podUID", string(podUID)) return } oldStatus, found := m.podStatuses[pod.UID] if !found { - klog.InfoS("Container startup changed before pod has synced", + logger.Info("Container startup changed before pod has synced", "pod", klog.KObj(pod), "containerID", containerID.String()) return @@ -415,14 +416,14 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine // Find the container to update. containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String()) if !ok { - klog.InfoS("Container startup changed for unknown container", + logger.Info("Container startup changed for unknown container", "pod", klog.KObj(pod), "containerID", containerID.String()) return } if containerStatus.Started != nil && *containerStatus.Started == started { - klog.V(4).InfoS("Container startup unchanged", + logger.V(4).Info("Container startup unchanged", "pod", klog.KObj(pod), "containerID", containerID.String()) return @@ -433,7 +434,7 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus.Started = &started - m.updateStatusInternal(pod, status, false, false) + m.updateStatusInternal(logger, pod, status, false, false) } func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { @@ -464,7 +465,7 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta // to override waiting containers (unless there is evidence the pod previously started those containers). // It also makes sure that pods are transitioned to a terminal phase (Failed or Succeeded) before // their deletion. -func (m *manager) TerminatePod(pod *v1.Pod) { +func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() @@ -478,7 +479,7 @@ func (m *manager) TerminatePod(pod *v1.Pod) { status := *oldStatus.DeepCopy() // once a pod has initialized, any missing status is treated as a failure - if hasPodInitialized(pod) { + if hasPodInitialized(logger, pod) { for i := range status.ContainerStatuses { if status.ContainerStatuses[i].State.Terminated != nil { continue @@ -516,23 +517,23 @@ func (m *manager) TerminatePod(pod *v1.Pod) { // do nothing, already terminal case v1.PodPending, v1.PodRunning: if status.Phase == v1.PodRunning && isCached { - klog.InfoS("Terminal running pod should have already been marked as failed, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.Info("Terminal running pod should have already been marked as failed, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) } - klog.V(3).InfoS("Marking terminal pod as failed", "oldPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(3).Info("Marking terminal pod as failed", "oldPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) status.Phase = v1.PodFailed default: - klog.ErrorS(fmt.Errorf("unknown phase: %v", status.Phase), "Unknown phase, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.Error(fmt.Errorf("unknown phase: %v", status.Phase), "Unknown phase, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) status.Phase = v1.PodFailed } } - klog.V(5).InfoS("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID) - m.updateStatusInternal(pod, status, true, true) + logger.V(5).Info("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID) + m.updateStatusInternal(logger, pod, status, true, true) } // hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which // implies those containers should not be transitioned to terminated status. -func hasPodInitialized(pod *v1.Pod) bool { +func hasPodInitialized(logger klog.Logger, pod *v1.Pod) bool { // a pod without init containers is always initialized if len(pod.Spec.InitContainers) == 0 { return true @@ -547,7 +548,7 @@ func hasPodInitialized(pod *v1.Pod) bool { if l := len(pod.Status.InitContainerStatuses); l > 0 { container, ok := kubeutil.GetContainerByIndex(pod.Spec.InitContainers, pod.Status.InitContainerStatuses, l-1) if !ok { - klog.V(4).InfoS("Mismatch between pod spec and status, likely programmer error", "pod", klog.KObj(pod), "containerName", container.Name) + logger.V(4).Info("Mismatch between pod spec and status, likely programmer error", "pod", klog.KObj(pod), "containerName", container.Name) return false } @@ -639,7 +640,7 @@ func checkContainerStateTransition(oldStatuses, newStatuses *v1.PodStatus, podSp // updateStatusInternal updates the internal status cache, and queues an update to the api server if // necessary. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) { +func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) { var oldStatus v1.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -647,7 +648,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // TODO(#116484): Also assign terminal phase to static pods. if !kubetypes.IsStaticPod(pod) { if cachedStatus.podIsFinished && !podIsFinished { - klog.InfoS("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error.", "pod", klog.KObj(pod)) + logger.Info("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error", "pod", klog.KObj(pod)) podIsFinished = true } } @@ -659,7 +660,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // Check for illegal state transition in containers if err := checkContainerStateTransition(&oldStatus, &status, &pod.Spec); err != nil { - klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) + logger.Error(err, "Status update on pod aborted", "pod", klog.KObj(pod)) return } @@ -699,7 +700,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // Perform some more extensive logging of container termination state to assist in // debugging production races (generally not needed). - if klogV := klog.V(5); klogV.Enabled() { + if loggerV := logger.V(5); loggerV.Enabled() { var containers []string for _, s := range append(append([]v1.ContainerStatus(nil), status.InitContainerStatuses...), status.ContainerStatuses...) { var current, previous string @@ -726,13 +727,13 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp containers = append(containers, fmt.Sprintf("(%s state=%s previous=%s)", s.Name, current, previous)) } sort.Strings(containers) - klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " ")) + loggerV.Info("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " ")) } // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { - klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status) + logger.V(3).Info("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status) return } @@ -789,12 +790,12 @@ func (m *manager) deletePodStatus(uid types.UID) { } // TODO(filipg): It'd be cleaner if we can do this without signal from user. -func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { +func (m *manager) RemoveOrphanedStatuses(logger klog.Logger, podUIDs map[types.UID]bool) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() for key := range m.podStatuses { if _, ok := podUIDs[key]; !ok { - klog.V(5).InfoS("Removing pod from status map.", "podUID", key) + logger.V(5).Info("Removing pod from status map", "podUID", key) delete(m.podStatuses, key) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { delete(m.podResizeConditions, key) @@ -805,7 +806,8 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { // syncBatch syncs pods statuses with the apiserver. Returns the number of syncs // attempted for testing. -func (m *manager) syncBatch(all bool) int { +func (m *manager) syncBatch(ctx context.Context, all bool) int { + logger := klog.FromContext(ctx) type podSync struct { podUID types.UID statusUID kubetypes.MirrorPodUID @@ -837,7 +839,7 @@ func (m *manager) syncBatch(all bool) int { uidOfStatus := kubetypes.MirrorPodUID(uid) if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { if mirrorUID == "" { - klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping", + logger.V(5).Info("Static pod does not have a corresponding mirror pod; skipping", "podUID", uid, "pod", klog.KRef(status.podNamespace, status.podName)) continue @@ -858,9 +860,9 @@ func (m *manager) syncBatch(all bool) int { // Ensure that any new status, or mismatched status, or pod that is ready for // deletion gets updated. If a status update fails we retry the next time any // other pod is updated. - if m.needsUpdate(types.UID(uidOfStatus), status) { + if m.needsUpdate(logger, types.UID(uidOfStatus), status) { updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) - } else if m.needsReconcile(uid, status.status) { + } else if m.needsReconcile(logger, uid, status.status) { // Delete the apiStatusVersions here to force an update on the pod status // In most cases the deleted apiStatusVersions here should be filled // soon after the following syncPod() [If the syncPod() sync an update @@ -872,19 +874,20 @@ func (m *manager) syncBatch(all bool) int { }() for _, update := range updatedStatuses { - klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version) - m.syncPod(update.podUID, update.status) + logger.V(5).Info("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version) + m.syncPod(ctx, update.podUID, update.status) } return len(updatedStatuses) } // syncPod syncs the given status with the API server. The caller must not hold the status lock. -func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { +func (m *manager) syncPod(ctx context.Context, uid types.UID, status versionedPodStatus) { + logger := klog.FromContext(ctx) // TODO: make me easier to express from client code - pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{}) + pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(ctx, status.podName, metav1.GetOptions{}) if errors.IsNotFound(err) { - klog.V(3).InfoS("Pod does not exist on the server", + logger.V(3).Info("Pod does not exist on the server", "podUID", uid, "pod", klog.KRef(status.podNamespace, status.podName)) // If the Pod is deleted the status will be cleared in @@ -892,17 +895,16 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { return } if err != nil { - klog.InfoS("Failed to get status for pod", + logger.Error(err, "Failed to get status for pod", "podUID", uid, - "pod", klog.KRef(status.podNamespace, status.podName), - "err", err) + "pod", klog.KRef(status.podNamespace, status.podName)) return } translatedUID := m.podManager.TranslatePodUID(pod.UID) // Type convert original uid just for the purpose of comparison. if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) { - klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update", + logger.V(2).Info("Pod was deleted and then recreated, skipping status update", "pod", klog.KObj(pod), "oldPodUID", uid, "podUID", translatedUID) @@ -912,17 +914,17 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { mergedStatus := mergePodStatus(pod, pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod)) - newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) - klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes)) + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(ctx, m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) + logger.V(3).Info("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes)) if err != nil { - klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err) + logger.Error(err, "Failed to update status for pod", "pod", klog.KObj(pod)) return } if unchanged { - klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) + logger.V(3).Info("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) } else { - klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) + logger.V(3).Info("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) pod = newPod // We pass a new object (result of API call which contains updated ResourceVersion) m.podStartupLatencyHelper.RecordStatusUpdated(pod) @@ -930,7 +932,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { // measure how long the status update took to propagate from generation to update on the server if status.at.IsZero() { - klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) + logger.V(3).Info("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) } else { duration := time.Since(status.at).Truncate(time.Millisecond) metrics.PodStatusSyncDuration.Observe(duration.Seconds()) @@ -939,26 +941,26 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version // We don't handle graceful deletion of mirror pods. - if m.canBeDeleted(pod, status.status, status.podIsFinished) { + if m.canBeDeleted(logger, pod, status.status, status.podIsFinished) { deleteOptions := metav1.DeleteOptions{ GracePeriodSeconds: new(int64), // Use the pod UID as the precondition for deletion to prevent deleting a // newly created pod with the same name and namespace. Preconditions: metav1.NewUIDPreconditions(string(pod.UID)), } - err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions) + err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, deleteOptions) if err != nil { - klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err) + logger.Info("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err) return } - klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod)) + logger.V(3).Info("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod)) m.deletePodStatus(uid) } } // needsUpdate returns whether the status is stale for the given pod UID. // This method is not thread safe, and must only be accessed by the sync thread. -func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { +func (m *manager) needsUpdate(logger klog.Logger, uid types.UID, status versionedPodStatus) bool { latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)] if !ok || latest < status.version { return true @@ -967,10 +969,10 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { if !ok { return false } - return m.canBeDeleted(pod, status.status, status.podIsFinished) + return m.canBeDeleted(logger, pod, status.status, status.podIsFinished) } -func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool { +func (m *manager) canBeDeleted(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool { if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) { return false } @@ -978,12 +980,12 @@ func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished b // which comes from pod manager. if !podutil.IsPodPhaseTerminal(pod.Status.Phase) { // For debugging purposes we also log the kubelet's local phase, when the deletion is delayed. - klog.V(3).InfoS("Delaying pod deletion as the phase is non-terminal", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(3).Info("Delaying pod deletion as the phase is non-terminal", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) return false } // If this is an update completing pod termination then we know the pod termination is finished. if podIsFinished { - klog.V(3).InfoS("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(3).Info("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID) return true } return false @@ -998,18 +1000,18 @@ func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished b // now the pod manager only supports getting mirror pod by static pod, so we have to pass // static pod uid here. // TODO(random-liu): Simplify the logic when mirror pod manager is added. -func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { +func (m *manager) needsReconcile(logger klog.Logger, uid types.UID, status v1.PodStatus) bool { // The pod could be a static pod, so we should translate first. pod, ok := m.podManager.GetPodByUID(uid) if !ok { - klog.V(4).InfoS("Pod has been deleted, no need to reconcile", "podUID", string(uid)) + logger.V(4).Info("Pod has been deleted, no need to reconcile", "podUID", uid) return false } // If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us. if kubetypes.IsStaticPod(pod) { mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod) if !ok { - klog.V(4).InfoS("Static pod has no corresponding mirror pod, no need to reconcile", "pod", klog.KObj(pod)) + logger.V(4).Info("Static pod has no corresponding mirror pod, no need to reconcile", "pod", klog.KObj(pod)) return false } pod = mirrorPod @@ -1023,7 +1025,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { // reconcile is not needed. Just return. return false } - klog.V(3).InfoS("Pod status is inconsistent with cached status for pod, a reconciliation should be triggered", + logger.V(3).Info("Pod status is inconsistent with cached status for pod, a reconciliation should be triggered", "pod", klog.KObj(pod), "statusDiff", cmp.Diff(podStatus, &status)) diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 57e0e45c541..0be94a49c63 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package status import ( + "context" "fmt" "math/rand" "reflect" @@ -39,6 +40,8 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -73,7 +76,7 @@ func getTestPod() *v1.Pod { // will be triggered, which will mess up all the old unit test. // To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in // pod manager the same with cached ones before syncBatch(true) so as to avoid reconciling. -func (m *manager) testSyncBatch() { +func (m *manager) testSyncBatch(ctx context.Context) { for uid, status := range m.podStatuses { pod, ok := m.podManager.GetPodByUID(uid) if ok { @@ -84,7 +87,7 @@ func (m *manager) testSyncBatch() { pod.Status = status.status } } - m.syncBatch(true) + m.syncBatch(ctx, true) } func newTestManager(kubeClient clientset.Interface) *manager { @@ -105,8 +108,9 @@ func getRandomPodStatus() v1.PodStatus { } func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) { + _, ctx := ktesting.NewTestContext(t) t.Helper() - manager.consumeUpdates() + manager.consumeUpdates(ctx) actions := manager.kubeClient.(*fake.Clientset).Actions() defer manager.kubeClient.(*fake.Clientset).ClearActions() if len(actions) != len(expectedActions) { @@ -122,20 +126,21 @@ func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action } func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { + _, ctx := ktesting.NewTestContext(t) t.Helper() // Consume all updates in the channel. - numUpdates := manager.consumeUpdates() + numUpdates := manager.consumeUpdates(ctx) if numUpdates != expectedUpdates { t.Errorf("unexpected number of updates %d, expected %d", numUpdates, expectedUpdates) } } -func (m *manager) consumeUpdates() int { +func (m *manager) consumeUpdates(ctx context.Context) int { updates := 0 for { select { case <-m.podStatusChannel: - updates += m.syncBatch(false) + updates += m.syncBatch(ctx, false) default: return updates } @@ -143,9 +148,10 @@ func (m *manager) consumeUpdates() int { } func TestNewStatus(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() - syncer.SetPodStatus(testPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) status := expectPodStatus(t, syncer, testPod) @@ -155,6 +161,7 @@ func TestNewStatus(t *testing.T) { } func TestNewStatusPreservesPodStartTime(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -167,7 +174,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) { now := metav1.Now() startTime := metav1.NewTime(now.Time.Add(-1 * time.Minute)) pod.Status.StartTime = &startTime - syncer.SetPodStatus(pod, getRandomPodStatus()) + syncer.SetPodStatus(logger, pod, getRandomPodStatus()) status := expectPodStatus(t, syncer, pod) if !status.StartTime.Time.Equal(startTime.Time) { @@ -187,6 +194,7 @@ func getReadyPodStatus() v1.PodStatus { } func TestNewStatusSetsReadyTransitionTime(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) podStatus := getReadyPodStatus() pod := &v1.Pod{ @@ -197,33 +205,35 @@ func TestNewStatusSetsReadyTransitionTime(t *testing.T) { }, Status: v1.PodStatus{}, } - syncer.SetPodStatus(pod, podStatus) + syncer.SetPodStatus(logger, pod, podStatus) verifyUpdates(t, syncer, 1) status := expectPodStatus(t, syncer, pod) readyCondition := podutil.GetPodReadyCondition(status) if readyCondition.LastTransitionTime.IsZero() { - t.Errorf("Unexpected: last transition time not set") + logger.Error(nil, "Unexpected: last transition time not set") } } func TestChangedStatus(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() - syncer.SetPodStatus(testPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) - syncer.SetPodStatus(testPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) } func TestChangedStatusKeepsStartTime(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() now := metav1.Now() firstStatus := getRandomPodStatus() firstStatus.StartTime = &now - syncer.SetPodStatus(testPod, firstStatus) + syncer.SetPodStatus(logger, testPod, firstStatus) verifyUpdates(t, syncer, 1) - syncer.SetPodStatus(testPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) finalStatus := expectPodStatus(t, syncer, testPod) if finalStatus.StartTime.IsZero() { @@ -236,6 +246,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { } func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) podStatus := getReadyPodStatus() pod := &v1.Pod{ @@ -246,12 +257,12 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) { }, Status: v1.PodStatus{}, } - syncer.SetPodStatus(pod, podStatus) + syncer.SetPodStatus(logger, pod, podStatus) verifyUpdates(t, syncer, 1) oldStatus := expectPodStatus(t, syncer, pod) anotherStatus := getReadyPodStatus() anotherStatus.Conditions[0].Status = v1.ConditionFalse - syncer.SetPodStatus(pod, anotherStatus) + syncer.SetPodStatus(logger, pod, anotherStatus) verifyUpdates(t, syncer, 1) newStatus := expectPodStatus(t, syncer, pod) @@ -266,15 +277,17 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) { } func TestUnchangedStatus(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() podStatus := getRandomPodStatus() - syncer.SetPodStatus(testPod, podStatus) - syncer.SetPodStatus(testPod, podStatus) + syncer.SetPodStatus(logger, testPod, podStatus) + syncer.SetPodStatus(logger, testPod, podStatus) verifyUpdates(t, syncer, 1) } func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) podStatus := getReadyPodStatus() pod := &v1.Pod{ @@ -285,11 +298,11 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) { }, Status: v1.PodStatus{}, } - syncer.SetPodStatus(pod, podStatus) + syncer.SetPodStatus(logger, pod, podStatus) verifyUpdates(t, syncer, 1) oldStatus := expectPodStatus(t, syncer, pod) anotherStatus := getReadyPodStatus() - syncer.SetPodStatus(pod, anotherStatus) + syncer.SetPodStatus(logger, pod, anotherStatus) // No update. verifyUpdates(t, syncer, 0) newStatus := expectPodStatus(t, syncer, pod) @@ -305,24 +318,27 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) { } func TestSyncPodIgnoresNotFound(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) client := fake.Clientset{} syncer := newTestManager(&client) client.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod") }) - syncer.SetPodStatus(getTestPod(), getRandomPodStatus()) + syncer.SetPodStatus(logger, getTestPod(), getRandomPodStatus()) verifyActions(t, syncer, []core.Action{getAction()}) } func TestSyncPod(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() syncer.kubeClient = fake.NewSimpleClientset(testPod) - syncer.SetPodStatus(testPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, testPod, getRandomPodStatus()) verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } func TestSyncPodChecksMismatchedUID(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) pod := getTestPod() pod.UID = "first" @@ -331,11 +347,12 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) { differentPod.UID = "second" syncer.podManager.(mutablePodManager).AddPod(differentPod) syncer.kubeClient = fake.NewSimpleClientset(pod) - syncer.SetPodStatus(differentPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, differentPod, getRandomPodStatus()) verifyActions(t, syncer, []core.Action{getAction()}) } func TestSyncPodNoDeadlock(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) client := &fake.Clientset{} m := newTestManager(client) pod := getTestPod() @@ -360,71 +377,72 @@ func TestSyncPodNoDeadlock(t *testing.T) { t.Logf("Pod not found.") ret = nil err = errors.NewNotFound(api.Resource("pods"), pod.Name) - m.SetPodStatus(pod, getRandomPodStatus()) + m.SetPodStatus(logger, pod, getRandomPodStatus()) verifyActions(t, m, []core.Action{getAction()}) t.Logf("Pod was recreated.") ret = getTestPod() ret.UID = "other_pod" err = nil - m.SetPodStatus(pod, getRandomPodStatus()) + m.SetPodStatus(logger, pod, getRandomPodStatus()) verifyActions(t, m, []core.Action{getAction()}) t.Logf("Pod not deleted (success case).") ret = getTestPod() - m.SetPodStatus(pod, getRandomPodStatus()) + m.SetPodStatus(logger, pod, getRandomPodStatus()) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Pod is terminated, but still running.") pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} - m.SetPodStatus(pod, getRandomPodStatus()) + m.SetPodStatus(logger, pod, getRandomPodStatus()) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Pod is terminated successfully.") pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{} - m.SetPodStatus(pod, getRandomPodStatus()) + m.SetPodStatus(logger, pod, getRandomPodStatus()) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Error case.") ret = nil err = fmt.Errorf("intentional test error") - m.SetPodStatus(pod, getRandomPodStatus()) + m.SetPodStatus(logger, pod, getRandomPodStatus()) verifyActions(t, m, []core.Action{getAction()}) } func TestStaleUpdates(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) pod := getTestPod() client := fake.NewSimpleClientset(pod) m := newTestManager(client) status := v1.PodStatus{Message: "initial status"} - m.SetPodStatus(pod, status) + m.SetPodStatus(logger, pod, status) status.Message = "first version bump" - m.SetPodStatus(pod, status) + m.SetPodStatus(logger, pod, status) status.Message = "second version bump" - m.SetPodStatus(pod, status) + m.SetPodStatus(logger, pod, status) t.Logf("sync batch before syncPods pushes latest status, resulting in one update during the batch") - m.syncBatch(true) + m.syncBatch(ctx, true) verifyUpdates(t, m, 0) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing left in the channel to sync") verifyActions(t, m, []core.Action{}) - t.Log("Unchanged status should not send an update") - m.SetPodStatus(pod, status) + logger.Info("Unchanged status should not send an update") + m.SetPodStatus(logger, pod, status) verifyUpdates(t, m, 0) - t.Log("... even if it's stale as long as nothing changes") + logger.Info("... even if it's stale as long as nothing changes") mirrorPodUID := kubetypes.MirrorPodUID(pod.UID) m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 - m.SetPodStatus(pod, status) - m.syncBatch(true) + m.SetPodStatus(logger, pod, status) + m.syncBatch(ctx, true) verifyActions(t, m, []core.Action{getAction()}) - t.Logf("Nothing stuck in the pipe.") + logger.Info("Nothing stuck in the pipe") verifyUpdates(t, m, 0) } @@ -571,6 +589,7 @@ func TestStatusNormalizeTimeStamp(t *testing.T) { } func TestStaticPod(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} mirrorPod := getTestPod() @@ -589,7 +608,7 @@ func TestStaticPod(t *testing.T) { status := getRandomPodStatus() now := metav1.Now() status.StartTime = &now - m.SetPodStatus(staticPod, status) + m.SetPodStatus(logger, staticPod, status) t.Logf("Should be able to get the static pod status from status manager") retrievedStatus := expectPodStatus(t, m, staticPod) @@ -597,7 +616,7 @@ func TestStaticPod(t *testing.T) { assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") - m.syncBatch(true) + m.syncBatch(ctx, true) assert.Empty(t, m.kubeClient.(*fake.Clientset).Actions(), "Expected no updates after syncBatch") t.Logf("Create the mirror pod") @@ -610,11 +629,11 @@ func TestStaticPod(t *testing.T) { assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should sync pod because the corresponding mirror pod is created") - assert.Equal(t, 1, m.syncBatch(true)) + assert.Equal(t, 1, m.syncBatch(ctx, true)) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("syncBatch should not sync any pods because nothing is changed.") - m.testSyncBatch() + m.testSyncBatch(ctx) verifyActions(t, m, []core.Action{}) t.Logf("Change mirror pod identity.") @@ -624,11 +643,12 @@ func TestStaticPod(t *testing.T) { m.podManager.(mutablePodManager).AddPod(mirrorPod) t.Logf("Should not update to mirror pod, because UID has changed.") - assert.Equal(t, 1, m.syncBatch(true)) + assert.Equal(t, 1, m.syncBatch(ctx, true)) verifyActions(t, m, []core.Action{getAction()}) } func TestTerminatePod(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() testPod.Spec.InitContainers = []v1.Container{ @@ -641,7 +661,7 @@ func TestTerminatePod(t *testing.T) { {Name: "test-2"}, {Name: "test-3"}, } - t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.") + logger.Info("update the pod's status to Failed. TerminatePod should preserve this status update.") firstStatus := getRandomPodStatus() firstStatus.Phase = v1.PodFailed firstStatus.InitContainerStatuses = []v1.ContainerStatus{ @@ -657,9 +677,9 @@ func TestTerminatePod(t *testing.T) { {Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, {Name: "test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 0}}}, } - syncer.SetPodStatus(testPod, firstStatus) + syncer.SetPodStatus(logger, testPod, firstStatus) - t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod") + logger.Info("set the testPod to a pod with Phase running, to simulate a stale pod") testPod.Status = getRandomPodStatus() testPod.Status.Phase = v1.PodRunning testPod.Status.InitContainerStatuses = []v1.ContainerStatus{ @@ -673,9 +693,9 @@ func TestTerminatePod(t *testing.T) { {Name: "test-3", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}, } - syncer.TerminatePod(testPod) + syncer.TerminatePod(logger, testPod) - t.Logf("we expect the container statuses to have changed to terminated") + logger.Info("we expect the container statuses to have changed to terminated") newStatus := expectPodStatus(t, syncer, testPod) for i := range newStatus.ContainerStatuses { assert.NotNil(t, newStatus.ContainerStatuses[i].State.Terminated, "expected containers to be terminated") @@ -686,30 +706,31 @@ func TestTerminatePod(t *testing.T) { expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: kubecontainer.ContainerReasonStatusUnknown, Message: "The container could not be located when the pod was terminated", ExitCode: 137}} if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) { - t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState)) + logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState)) } if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { - t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses) } if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) { - t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses) } if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { - t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState)) + logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState)) } if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) { - t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses) } if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, firstStatus.ContainerStatuses[2].State) { - t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses) } - t.Logf("we expect the previous status update to be preserved.") + logger.Info("we expect the previous status update to be preserved.") assert.Equal(t, newStatus.Phase, firstStatus.Phase) assert.Equal(t, newStatus.Message, firstStatus.Message) } func TestTerminatePodWaiting(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() testPod.Spec.InitContainers = []v1.Container{ @@ -722,7 +743,7 @@ func TestTerminatePodWaiting(t *testing.T) { {Name: "test-2"}, {Name: "test-3"}, } - t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.") + logger.Info("update the pod's status to Failed. TerminatePod should preserve this status update.") firstStatus := getRandomPodStatus() firstStatus.Phase = v1.PodFailed firstStatus.InitContainerStatuses = []v1.ContainerStatus{ @@ -739,9 +760,9 @@ func TestTerminatePodWaiting(t *testing.T) { {Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}}, {Name: "test-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "Test"}}}, } - syncer.SetPodStatus(testPod, firstStatus) + syncer.SetPodStatus(logger, testPod, firstStatus) - t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod") + logger.Info("set the testPod to a pod with Phase running, to simulate a stale pod") testPod.Status = getRandomPodStatus() testPod.Status.Phase = v1.PodRunning testPod.Status.InitContainerStatuses = []v1.ContainerStatus{ @@ -755,9 +776,9 @@ func TestTerminatePodWaiting(t *testing.T) { {Name: "test-3", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}, } - syncer.TerminatePod(testPod) + syncer.TerminatePod(logger, testPod) - t.Logf("we expect the container statuses to have changed to terminated") + logger.Info("we expect the container statuses to have changed to terminated") newStatus := expectPodStatus(t, syncer, testPod) for _, container := range newStatus.ContainerStatuses { assert.NotNil(t, container.State.Terminated, "expected containers to be terminated") @@ -771,30 +792,31 @@ func TestTerminatePodWaiting(t *testing.T) { expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: kubecontainer.ContainerReasonStatusUnknown, Message: "The container could not be located when the pod was terminated", ExitCode: 137}} if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) { - t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState)) + logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState)) } if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) { - t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses) } if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) { - t.Errorf("waiting container state not defaulted: %s", cmp.Diff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State)) + logger.Error(nil, "waiting container state not defaulted", "diff", cmp.Diff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State)) } if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) { - t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState)) + logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState)) } if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) { - t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses) + logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses) } if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, expectUnknownState) { - t.Errorf("waiting container state not defaulted: %s", cmp.Diff(newStatus.ContainerStatuses[2].State, expectUnknownState)) + logger.Error(nil, "waiting container state not defaulted", "diff", cmp.Diff(newStatus.ContainerStatuses[2].State, expectUnknownState)) } - t.Logf("we expect the previous status update to be preserved.") + logger.Info("we expect the previous status update to be preserved.") assert.Equal(t, newStatus.Phase, firstStatus.Phase) assert.Equal(t, newStatus.Message, firstStatus.Message) } func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod { pod := getTestPod() for i := 0; i < initContainers; i++ { @@ -816,25 +838,30 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) { t.Helper() if state.Terminated == nil || state.Running != nil || state.Waiting != nil { - t.Fatalf("unexpected state: %#v", state) + logger.Error(nil, "unexpected state", "state", state) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } if state.Terminated.ExitCode != 137 || state.Terminated.Reason != kubecontainer.ContainerReasonStatusUnknown || len(state.Terminated.Message) == 0 { - t.Fatalf("unexpected terminated state: %#v", state.Terminated) + logger.Error(nil, "unexpected terminated state", "terminatedState", state.Terminated) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) { t.Helper() if state.Terminated == nil || state.Running != nil || state.Waiting != nil { - t.Fatalf("unexpected state: %#v", state) + logger.Error(nil, "unexpected state", "state", state) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } if state.Terminated.ExitCode != exitCode { - t.Fatalf("unexpected terminated state: %#v", state.Terminated) + logger.Error(nil, "unexpected terminated state", "terminatedState", state.Terminated) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } expectWaiting := func(t *testing.T, state v1.ContainerState) { t.Helper() if state.Terminated != nil || state.Running != nil || state.Waiting == nil { - t.Fatalf("unexpected state: %#v", state) + logger.Error(nil, "unexpected state", "state", state) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } @@ -880,7 +907,8 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { expectFn: func(t *testing.T, status v1.PodStatus) { container := status.ContainerStatuses[0] if container.LastTerminationState.Terminated.ExitCode != 2 { - t.Fatalf("unexpected last state: %#v", container.LastTerminationState) + logger.Error(nil, "unexpected last state", "lastTerminationState", container.LastTerminationState) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } expectTerminatedUnknown(t, container.State) }, @@ -1084,7 +1112,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager) original := tc.pod.DeepCopy() - syncer.SetPodStatus(original, original.Status) + syncer.SetPodStatus(logger, original, original.Status) copied := tc.pod.DeepCopy() if tc.updateFn != nil { @@ -1092,7 +1120,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { } expected := copied.DeepCopy() - syncer.TerminatePod(copied) + syncer.TerminatePod(logger, copied) status := expectPodStatus(t, syncer, tc.pod.DeepCopy()) if tc.expectFn != nil { tc.expectFn(t, status) @@ -1101,9 +1129,11 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { if !reflect.DeepEqual(expected.Status, status) { diff := cmp.Diff(expected.Status, status) if len(diff) == 0 { - t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status) + logger.Error(nil, "diff returned no results for failed DeepEqual", "expectedStatus", expected.Status, "actualStatus", status) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - t.Fatalf("unexpected status: %s", diff) + logger.Error(nil, "unexpected status", "diff", diff) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }) } @@ -1165,22 +1195,25 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager) pod := getTestPod() pod.Status = tc.status - syncer.TerminatePod(pod) + syncer.TerminatePod(logger, pod) gotStatus := expectPodStatus(t, syncer, pod.DeepCopy()) if diff := cmp.Diff(tc.wantStatus, gotStatus, cmpopts.IgnoreFields(v1.PodStatus{}, "StartTime")); diff != "" { - t.Fatalf("unexpected status: %s", diff) + logger.Error(nil, "unexpected status", "diff", diff) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }) } } func TestSetContainerReadiness(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} containerStatuses := []v1.ContainerStatus{ @@ -1232,44 +1265,45 @@ func TestSetContainerReadiness(t *testing.T) { m.podManager.(mutablePodManager).AddPod(pod) t.Log("Setting readiness before status should fail.") - m.SetContainerReadiness(pod.UID, cID1, true) + m.SetContainerReadiness(logger, pod.UID, cID1, true) verifyUpdates(t, m, 0) if status, ok := m.GetPodStatus(pod.UID); ok { t.Errorf("Unexpected PodStatus: %+v", status) } t.Log("Setting initial status.") - m.SetPodStatus(pod, status) + m.SetPodStatus(logger, pod, status) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyReadiness("initial", &status, false, false, false) t.Log("Setting unchanged readiness should do nothing.") - m.SetContainerReadiness(pod.UID, cID1, false) + m.SetContainerReadiness(logger, pod.UID, cID1, false) verifyUpdates(t, m, 0) status = expectPodStatus(t, m, pod) verifyReadiness("unchanged", &status, false, false, false) t.Log("Setting container readiness should generate update but not pod readiness.") - m.SetContainerReadiness(pod.UID, cID1, true) + m.SetContainerReadiness(logger, pod.UID, cID1, true) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyReadiness("c1 ready", &status, true, false, false) t.Log("Setting both containers to ready should update pod readiness.") - m.SetContainerReadiness(pod.UID, cID2, true) + m.SetContainerReadiness(logger, pod.UID, cID2, true) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyReadiness("all ready", &status, true, true, true) t.Log("Setting non-existent container readiness should fail.") - m.SetContainerReadiness(pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true) + m.SetContainerReadiness(logger, pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true) verifyUpdates(t, m, 0) status = expectPodStatus(t, m, pod) verifyReadiness("ignore non-existent", &status, true, true, true) } func TestSetContainerStartup(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"} cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"} containerStatuses := []v1.ContainerStatus{ @@ -1299,61 +1333,63 @@ func TestSetContainerStartup(t *testing.T) { switch c.ContainerID { case cID1.String(): if (c.Started != nil && *c.Started) != c1Started { - t.Errorf("[%s] Expected startup of c1 to be %v but was %v", step, c1Started, c.Started) + logger.Error(nil, "Error in startup of c1", "expected", c1Started, "current", c.Started) } case cID2.String(): if (c.Started != nil && *c.Started) != c2Started { - t.Errorf("[%s] Expected startup of c2 to be %v but was %v", step, c2Started, c.Started) + logger.Error(nil, "Error in startup of c2", "step", step, "expected", c2Started, "current", c.Started) } default: - t.Fatalf("[%s] Unexpected container: %+v", step, c) + logger.Error(nil, "Unexpected container", "step", step, "container", c) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } } } m := newTestManager(&fake.Clientset{}) - // Add test pod because the container spec has been changed. + // Add test pod because the container spec has been changed m.podManager.(mutablePodManager).AddPod(pod) - t.Log("Setting startup before status should fail.") - m.SetContainerStartup(pod.UID, cID1, true) + logger.Info("Setting startup before status should fail") + m.SetContainerStartup(logger, pod.UID, cID1, true) verifyUpdates(t, m, 0) if status, ok := m.GetPodStatus(pod.UID); ok { t.Errorf("Unexpected PodStatus: %+v", status) } - t.Log("Setting initial status.") - m.SetPodStatus(pod, status) + logger.Info("Setting initial status") + m.SetPodStatus(logger, pod, status) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyStartup("initial", &status, false, false, false) - t.Log("Setting unchanged startup should do nothing.") - m.SetContainerStartup(pod.UID, cID1, false) + logger.Info("Setting unchanged startup should do nothing") + m.SetContainerStartup(logger, pod.UID, cID1, false) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyStartup("unchanged", &status, false, false, false) - t.Log("Setting container startup should generate update but not pod startup.") - m.SetContainerStartup(pod.UID, cID1, true) + logger.Info("Setting container startup should generate update but not pod startup") + m.SetContainerStartup(logger, pod.UID, cID1, true) verifyUpdates(t, m, 1) // Started = nil to false status = expectPodStatus(t, m, pod) verifyStartup("c1 ready", &status, true, false, false) - t.Log("Setting both containers to ready should update pod startup.") - m.SetContainerStartup(pod.UID, cID2, true) + logger.Info("Setting both containers to ready should update pod startup") + m.SetContainerStartup(logger, pod.UID, cID2, true) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyStartup("all ready", &status, true, true, true) - t.Log("Setting non-existent container startup should fail.") - m.SetContainerStartup(pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true) + logger.Info("Setting non-existent container startup should fail") + m.SetContainerStartup(logger, pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true) verifyUpdates(t, m, 0) status = expectPodStatus(t, m, pod) verifyStartup("ignore non-existent", &status, true, true, true) } func TestSyncBatchCleanupVersions(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) m := newTestManager(&fake.Clientset{}) testPod := getTestPod() mirrorPod := getTestPod() @@ -1367,16 +1403,16 @@ func TestSyncBatchCleanupVersions(t *testing.T) { t.Logf("Orphaned pods should be removed.") m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 - m.syncBatch(true) + m.syncBatch(ctx, true) if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { - t.Errorf("Should have cleared status for testPod") + logger.Error(nil, "Should have cleared status for testPod") } if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; ok { - t.Errorf("Should have cleared status for mirrorPod") + logger.Error(nil, "Should have cleared status for mirrorPod") } t.Logf("Non-orphaned pods should not be removed.") - m.SetPodStatus(testPod, getRandomPodStatus()) + m.SetPodStatus(logger, testPod, getRandomPodStatus()) m.podManager.(mutablePodManager).AddPod(mirrorPod) staticPod := mirrorPod staticPod.UID = "static-uid" @@ -1384,7 +1420,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) { m.podManager.(mutablePodManager).AddPod(staticPod) m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 - m.testSyncBatch() + m.testSyncBatch(ctx) if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; !ok { t.Errorf("Should not have cleared status for testPod") } @@ -1394,12 +1430,13 @@ func TestSyncBatchCleanupVersions(t *testing.T) { } func TestReconcilePodStatus(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) testPod := getTestPod() client := fake.NewSimpleClientset(testPod) syncer := newTestManager(client) - syncer.SetPodStatus(testPod, getRandomPodStatus()) + syncer.SetPodStatus(logger, testPod, getRandomPodStatus()) t.Logf("Call syncBatch directly to test reconcile") - syncer.syncBatch(true) // The apiStatusVersions should be set now + syncer.syncBatch(ctx, true) // The apiStatusVersions should be set now client.ClearActions() podStatus, ok := syncer.GetPodStatus(testPod.UID) @@ -1410,11 +1447,11 @@ func TestReconcilePodStatus(t *testing.T) { t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") syncer.podManager.(mutablePodManager).UpdatePod(testPod) - if syncer.needsReconcile(testPod.UID, podStatus) { + if syncer.needsReconcile(logger, testPod.UID, podStatus) { t.Fatalf("Pod status is the same, a reconciliation is not needed") } - syncer.SetPodStatus(testPod, podStatus) - syncer.syncBatch(true) + syncer.SetPodStatus(logger, testPod, podStatus) + syncer.syncBatch(ctx, true) verifyActions(t, syncer, []core.Action{}) // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), @@ -1425,21 +1462,21 @@ func TestReconcilePodStatus(t *testing.T) { normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy() testPod.Status.StartTime = &normalizedStartTime syncer.podManager.(mutablePodManager).UpdatePod(testPod) - if syncer.needsReconcile(testPod.UID, podStatus) { + if syncer.needsReconcile(logger, testPod.UID, podStatus) { t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") } - syncer.SetPodStatus(testPod, podStatus) - syncer.syncBatch(true) + syncer.SetPodStatus(logger, testPod, podStatus) + syncer.syncBatch(ctx, true) verifyActions(t, syncer, []core.Action{}) t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") changedPodStatus := getRandomPodStatus() syncer.podManager.(mutablePodManager).UpdatePod(testPod) - if !syncer.needsReconcile(testPod.UID, changedPodStatus) { + if !syncer.needsReconcile(logger, testPod.UID, changedPodStatus) { t.Fatalf("Pod status is different, a reconciliation is needed") } - syncer.SetPodStatus(testPod, changedPodStatus) - syncer.syncBatch(true) + syncer.SetPodStatus(logger, testPod, changedPodStatus) + syncer.syncBatch(ctx, true) verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) } @@ -1452,6 +1489,7 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { } func TestDeletePodBeforeFinished(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) pod := getTestPod() t.Logf("Set the deletion timestamp.") pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} @@ -1460,12 +1498,13 @@ func TestDeletePodBeforeFinished(t *testing.T) { m.podManager.(mutablePodManager).AddPod(pod) status := getRandomPodStatus() status.Phase = v1.PodFailed - m.SetPodStatus(pod, status) + m.SetPodStatus(logger, pod, status) t.Logf("Expect not to see a delete action as the pod isn't finished yet (TerminatePod isn't called)") verifyActions(t, m, []core.Action{getAction(), patchAction()}) } func TestDeletePodFinished(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) pod := getTestPod() t.Logf("Set the deletion timestamp.") pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} @@ -1474,12 +1513,13 @@ func TestDeletePodFinished(t *testing.T) { m.podManager.(mutablePodManager).AddPod(pod) status := getRandomPodStatus() status.Phase = v1.PodFailed - m.TerminatePod(pod) + m.TerminatePod(logger, pod) t.Logf("Expect to see a delete action as the pod is finished (TerminatePod called)") verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()}) } func TestDoNotDeleteMirrorPods(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} mirrorPod := getTestPod() @@ -1502,7 +1542,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { status := getRandomPodStatus() now := metav1.Now() status.StartTime = &now - m.SetPodStatus(staticPod, status) + m.SetPodStatus(logger, staticPod, status) t.Logf("Expect not to see a delete action.") verifyActions(t, m, []core.Action{getAction(), patchAction()})