Merge pull request #130852 from Chulong-Li/contextual-logging

Migrate pkg/kubelet/status to contextual logging
This commit is contained in:
Kubernetes Prow Robot
2025-04-23 13:31:43 -07:00
committed by GitHub
12 changed files with 307 additions and 247 deletions

View File

@@ -177,6 +177,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,

View File

@@ -243,6 +243,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,

View File

@@ -55,6 +55,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/status/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,

View File

@@ -1810,7 +1810,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
// Start component sync loops.
kl.statusManager.Start()
kl.statusManager.Start(ctx)
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
@@ -1889,6 +1889,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
attribute.String("k8s.pod.update_type", updateType.String()),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
logger := klog.FromContext(ctx)
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
if err != nil {
@@ -1949,7 +1950,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}
@@ -1962,7 +1963,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
@@ -2100,6 +2101,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
semconv.K8SPodNameKey.String(pod.Name),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
logger := klog.FromContext(ctx)
defer otelSpan.End()
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
@@ -2113,7 +2115,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
if podStatusFn != nil {
podStatusFn(&apiPodStatus)
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
if gracePeriod != nil {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
@@ -2188,7 +2190,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
// information about the container end states (including exit codes) - when
// SyncTerminatedPod is called the containers may already be removed.
apiPodStatus = kl.generateAPIPodStatus(pod, stoppedPodStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
// we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
@@ -2250,6 +2252,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
semconv.K8SPodNameKey.String(pod.Name),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
logger := klog.FromContext(ctx)
defer otelSpan.End()
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
@@ -2263,7 +2266,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
// before syncTerminatedPod is invoked)
@@ -2310,7 +2313,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
kl.usernsManager.Release(pod.UID)
// mark the final pod status
kl.statusManager.TerminatePod(pod)
kl.statusManager.TerminatePod(logger, pod)
klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
@@ -2380,7 +2383,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
// and updates the pod to the failed phase in the status manager.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(pod, v1.PodStatus{
kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{
QOSClass: v1qos.GetPodQOS(pod), // keep it as is
Phase: v1.PodFailed,
Reason: reason,
@@ -2508,6 +2511,7 @@ func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpd
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
logger := klog.FromContext(ctx)
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
@@ -2578,7 +2582,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
kl.statusManager.SetContainerReadiness(logger, update.PodUID, update.ContainerID, ready)
status := "not ready"
if ready {
@@ -2587,7 +2591,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
kl.statusManager.SetContainerStartup(logger, update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {

View File

@@ -1113,7 +1113,7 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po
for _, pod := range mirrorPods {
podUIDs[pod.UID] = true
}
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
kl.statusManager.RemoveOrphanedStatuses(klog.TODO(), podUIDs)
}
// HandlePodCleanups performs a series of cleanup work, including terminating

View File

@@ -47,6 +47,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
"k8s.io/kubelet/pkg/cri/streaming/remotecommand"
_ "k8s.io/kubernetes/pkg/apis/core/install"
@@ -3866,11 +3867,12 @@ func Test_generateAPIPodStatus(t *testing.T) {
for _, test := range tests {
for _, enablePodReadyToStartContainersCondition := range []bool{false, true} {
t.Run(test.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodReadyToStartContainersCondition, enablePodReadyToStartContainersCondition)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
kl.statusManager.SetPodStatus(test.pod, test.previousStatus)
kl.statusManager.SetPodStatus(logger, test.pod, test.previousStatus)
for _, name := range test.unreadyContainer {
kl.readinessManager.Set(kubecontainer.BuildContainerID("", findContainerStatusByName(test.expected, name).ContainerID), results.Failure, test.pod)
}
@@ -3987,12 +3989,13 @@ func Test_generateAPIPodStatusForInPlaceVPAEnabled(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
oldStatus := test.pod.Status
kl.statusManager.SetPodStatus(test.pod, oldStatus)
kl.statusManager.SetPodStatus(logger, test.pod, oldStatus)
actual := kl.generateAPIPodStatus(test.pod, &testKubecontainerPodStatus /* criStatus */, false /* test.isPodTerminal */)
for _, c := range actual.Conditions {
if c.Type == v1.PodResizePending || c.Type == v1.PodResizeInProgress {

View File

@@ -1523,6 +1523,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
}
func TestFilterOutInactivePods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@@ -1550,7 +1551,7 @@ func TestFilterOutInactivePods(t *testing.T) {
// pod that is running but has been rejected by admission is excluded
pods[5].Status.Phase = v1.PodRunning
kubelet.statusManager.SetPodStatus(pods[5], v1.PodStatus{Phase: v1.PodFailed})
kubelet.statusManager.SetPodStatus(logger, pods[5], v1.PodStatus{Phase: v1.PodFailed})
// pod that is running according to the api but is known terminated is excluded
pods[6].Status.Phase = v1.PodRunning
@@ -1839,6 +1840,7 @@ func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod,
}
func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@@ -1850,8 +1852,8 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
syncAndVerifyPodDir(t, testKubelet, pods, pods, true)
// Pod 1 failed, and pod 2 succeeded. None of the pod directories should be
// deleted.
kl.statusManager.SetPodStatus(pods[1], v1.PodStatus{Phase: v1.PodFailed})
kl.statusManager.SetPodStatus(pods[2], v1.PodStatus{Phase: v1.PodSucceeded})
kl.statusManager.SetPodStatus(logger, pods[1], v1.PodStatus{Phase: v1.PodFailed})
kl.statusManager.SetPodStatus(logger, pods[2], v1.PodStatus{Phase: v1.PodSucceeded})
syncAndVerifyPodDir(t, testKubelet, pods, pods, true)
}

View File

@@ -27,9 +27,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/test/utils/ktesting"
)
func init() {
@@ -547,14 +549,15 @@ func TestUpdatePodStatusWithInitContainers(t *testing.T) {
}
}
func (m *manager) extractedReadinessHandling() {
func (m *manager) extractedReadinessHandling(logger klog.Logger) {
update := <-m.readinessManager.Updates()
// This code corresponds to an extract from kubelet.syncLoopIteration()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
m.statusManager.SetContainerReadiness(logger, update.PodUID, update.ContainerID, ready)
}
func TestUpdateReadiness(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testPod := getTestPod()
setTestProbe(testPod, readiness, v1.Probe{})
m := newTestManager()
@@ -562,7 +565,7 @@ func TestUpdateReadiness(t *testing.T) {
// Start syncing readiness without leaking goroutine.
stopCh := make(chan struct{})
go wait.Until(m.extractedReadinessHandling, 0, stopCh)
go wait.Until(func() { m.extractedReadinessHandling(logger) }, 0, stopCh)
defer func() {
close(stopCh)
// Send an update to exit extractedReadinessHandling()
@@ -573,7 +576,7 @@ func TestUpdateReadiness(t *testing.T) {
exec.set(probe.Success, nil)
m.prober.exec = &exec
m.statusManager.SetPodStatus(testPod, getTestRunningStatus())
m.statusManager.SetPodStatus(logger, testPod, getTestRunningStatus())
m.AddPod(testPod)
probePaths := []probeKey{{testPodUID, testContainerName, readiness}}

View File

@@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@@ -80,6 +81,7 @@ func TestTCPPortExhaustion(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
@@ -133,7 +135,7 @@ func TestTCPPortExhaustion(t *testing.T) {
})
}
podManager.AddPod(&pod)
m.statusManager.SetPodStatus(&pod, pod.Status)
m.statusManager.SetPodStatus(logger, &pod, pod.Status)
m.AddPod(&pod)
}
t.Logf("Adding %d pods with %d containers each in %v", numTestPods, numContainers, time.Since(now))

View File

@@ -17,7 +17,6 @@ limitations under the License.
package prober
import (
"context"
"fmt"
"testing"
"time"
@@ -32,12 +31,14 @@ import (
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/test/utils/ktesting"
)
func init() {
}
func TestDoProbe(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
for _, probeType := range [...]probeType{liveness, readiness, startup} {
@@ -131,10 +132,9 @@ func TestDoProbe(t *testing.T) {
}
for i, test := range tests {
ctx := context.Background()
w := newTestWorker(m, probeType, test.probe)
if test.podStatus != nil {
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
m.statusManager.SetPodStatus(logger, w.pod, *test.podStatus)
}
if test.setDeletionTimestamp {
now := metav1.Now()
@@ -158,14 +158,14 @@ func TestDoProbe(t *testing.T) {
}
func TestInitialDelay(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
for _, probeType := range [...]probeType{liveness, readiness, startup} {
w := newTestWorker(m, probeType, v1.Probe{
InitialDelaySeconds: 10,
})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(probeType != startup))
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(probeType != startup))
expectContinue(t, w, w.doProbe(ctx), "during initial delay")
// Default value depends on probe, Success for liveness, Failure for readiness, Unknown for startup
@@ -182,7 +182,7 @@ func TestInitialDelay(t *testing.T) {
laterStatus := getTestRunningStatusWithStarted(probeType != startup)
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
time.Now().Add(-100 * time.Second)
m.statusManager.SetPodStatus(w.pod, laterStatus)
m.statusManager.SetPodStatus(logger, w.pod, laterStatus)
// Second call should succeed (already waited).
expectContinue(t, w, w.doProbe(ctx), "after initial delay")
@@ -191,10 +191,10 @@ func TestInitialDelay(t *testing.T) {
}
func TestFailureThreshold(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
w := newTestWorker(m, readiness, v1.Probe{SuccessThreshold: 1, FailureThreshold: 3})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatus())
for i := 0; i < 2; i++ {
// First probe should succeed.
@@ -226,10 +226,10 @@ func TestFailureThreshold(t *testing.T) {
}
func TestSuccessThreshold(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
w := newTestWorker(m, readiness, v1.Probe{SuccessThreshold: 3, FailureThreshold: 1})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatus())
// Start out failure.
w.resultsManager.Set(testContainerID, results.Failure, &v1.Pod{})
@@ -261,12 +261,12 @@ func TestSuccessThreshold(t *testing.T) {
}
func TestStartupProbeSuccessThreshold(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
successThreshold := 1
failureThreshold := 3
w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: int32(successThreshold), FailureThreshold: int32(failureThreshold)})
m.statusManager.SetPodStatus(w.pod, getTestNotRunningStatus())
m.statusManager.SetPodStatus(logger, w.pod, getTestNotRunningStatus())
m.prober.exec = fakeExecProber{probe.Success, nil}
for i := 0; i < successThreshold+1; i++ {
@@ -294,12 +294,12 @@ func TestStartupProbeSuccessThreshold(t *testing.T) {
}
func TestStartupProbeFailureThreshold(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
successThreshold := 1
failureThreshold := 3
w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: int32(successThreshold), FailureThreshold: int32(failureThreshold)})
m.statusManager.SetPodStatus(w.pod, getTestNotRunningStatus())
m.statusManager.SetPodStatus(logger, w.pod, getTestNotRunningStatus())
m.prober.exec = fakeExecProber{probe.Failure, nil}
for i := 0; i < failureThreshold+1; i++ {
@@ -346,12 +346,13 @@ func TestStartupProbeFailureThreshold(t *testing.T) {
}
func TestCleanUp(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
m := newTestManager()
for _, probeType := range [...]probeType{liveness, readiness, startup} {
key := probeKey{testPodUID, testContainerName, probeType}
w := newTestWorker(m, probeType, v1.Probe{})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(probeType != startup))
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(probeType != startup))
go w.run()
m.workers[key] = w
@@ -411,13 +412,13 @@ func resultsManager(m *manager, probeType probeType) results.Manager {
}
func TestOnHoldOnLivenessOrStartupCheckFailure(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
for _, probeType := range [...]probeType{liveness, startup} {
w := newTestWorker(m, probeType, v1.Probe{SuccessThreshold: 1, FailureThreshold: 1})
status := getTestRunningStatusWithStarted(probeType != startup)
m.statusManager.SetPodStatus(w.pod, status)
m.statusManager.SetPodStatus(logger, w.pod, status)
// First probe should fail.
m.prober.exec = fakeExecProber{probe.Failure, nil}
@@ -439,7 +440,7 @@ func TestOnHoldOnLivenessOrStartupCheckFailure(t *testing.T) {
// Set a new container ID to lift the hold. The next probe will succeed.
status.ContainerStatuses[0].ContainerID = "test://newCont_ID"
m.statusManager.SetPodStatus(w.pod, status)
m.statusManager.SetPodStatus(logger, w.pod, status)
msg = "hold lifted"
expectContinue(t, w, w.doProbe(ctx), msg)
expectResult(t, w, results.Success, msg)
@@ -452,10 +453,10 @@ func TestOnHoldOnLivenessOrStartupCheckFailure(t *testing.T) {
}
func TestResultRunOnLivenessCheckFailure(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
w := newTestWorker(m, liveness, v1.Probe{SuccessThreshold: 1, FailureThreshold: 3})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatus())
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatus())
m.prober.exec = fakeExecProber{probe.Success, nil}
msg := "initial probe success"
@@ -494,10 +495,10 @@ func TestResultRunOnLivenessCheckFailure(t *testing.T) {
}
func TestResultRunOnStartupCheckFailure(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: 1, FailureThreshold: 3})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(false))
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(false))
// Below FailureThreshold leaves probe state unchanged
// which is failed for startup at first.
@@ -530,17 +531,17 @@ func TestResultRunOnStartupCheckFailure(t *testing.T) {
}
func TestLivenessProbeDisabledByStarted(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
w := newTestWorker(m, liveness, v1.Probe{SuccessThreshold: 1, FailureThreshold: 1})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(false))
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(false))
// livenessProbe fails, but is disabled
m.prober.exec = fakeExecProber{probe.Failure, nil}
msg := "Not started, probe failure, result success"
expectContinue(t, w, w.doProbe(ctx), msg)
expectResult(t, w, results.Success, msg)
// setting started state
m.statusManager.SetContainerStartup(w.pod.UID, w.containerID, true)
m.statusManager.SetContainerStartup(logger, w.pod.UID, w.containerID, true)
// livenessProbe fails
m.prober.exec = fakeExecProber{probe.Failure, nil}
msg = "Started, probe failure, result failure"
@@ -549,10 +550,10 @@ func TestLivenessProbeDisabledByStarted(t *testing.T) {
}
func TestStartupProbeDisabledByStarted(t *testing.T) {
ctx := context.Background()
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager()
w := newTestWorker(m, startup, v1.Probe{SuccessThreshold: 1, FailureThreshold: 2})
m.statusManager.SetPodStatus(w.pod, getTestRunningStatusWithStarted(false))
m.statusManager.SetPodStatus(logger, w.pod, getTestRunningStatusWithStarted(false))
// startupProbe fails < FailureThreshold, stays unknown
m.prober.exec = fakeExecProber{probe.Failure, nil}
msg := "Not started, probe failure, result unknown"
@@ -564,7 +565,7 @@ func TestStartupProbeDisabledByStarted(t *testing.T) {
expectContinue(t, w, w.doProbe(ctx), msg)
expectResult(t, w, results.Success, msg)
// setting started state
m.statusManager.SetContainerStartup(w.pod.UID, w.containerID, true)
m.statusManager.SetContainerStartup(logger, w.pod.UID, w.containerID, true)
// startupProbe fails, but is disabled
m.prober.exec = fakeExecProber{probe.Failure, nil}
msg = "Started, probe failure, result success"

View File

@@ -131,26 +131,26 @@ type Manager interface {
PodStatusProvider
// Start the API server status sync loop.
Start()
Start(ctx context.Context)
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
SetPodStatus(pod *v1.Pod, status v1.PodStatus)
SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus)
// SetContainerReadiness updates the cached container status with the given readiness, and
// triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
// SetContainerStartup updates the cached container status with the given startup, and
// triggers a status update.
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool)
// TerminatePod resets the container status for the provided pod to terminated and triggers
// a status update.
TerminatePod(pod *v1.Pod)
TerminatePod(logger klog.Logger, pod *v1.Pod)
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
RemoveOrphanedStatuses(logger klog.Logger, podUIDs map[types.UID]bool)
// GetPodResizeConditions returns cached PodStatus Resize conditions value
GetPodResizeConditions(podUID types.UID) []*v1.PodCondition
@@ -218,16 +218,17 @@ func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
return apiequality.Semantic.DeepEqual(oldCopy, status)
}
func (m *manager) Start() {
func (m *manager) Start(ctx context.Context) {
logger := klog.FromContext(ctx)
// Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components.
if m.kubeClient == nil {
klog.InfoS("Kubernetes client is nil, not starting status manager")
logger.Info("Kubernetes client is nil, not starting status manager")
return
}
klog.InfoS("Starting to sync pod status with apiserver")
logger.Info("Starting to sync pod status with apiserver")
//nolint:staticcheck // SA1015 Ticker can leak since this is only called once and doesn't handle termination.
syncTicker := time.NewTicker(syncPeriod).C
@@ -237,11 +238,11 @@ func (m *manager) Start() {
for {
select {
case <-m.podStatusChannel:
klog.V(4).InfoS("Syncing updated statuses")
m.syncBatch(false)
logger.V(4).Info("Syncing updated statuses")
m.syncBatch(ctx, false)
case <-syncTicker:
klog.V(4).InfoS("Syncing all statuses")
m.syncBatch(true)
logger.V(4).Info("Syncing all statuses")
m.syncBatch(ctx, true)
}
}
}, 0)
@@ -315,7 +316,7 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
return status.status, ok
}
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
@@ -328,22 +329,22 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
// Force a status update if deletion timestamp is set. This is necessary
// because if the pod is in the non-running state, the pod worker still
// needs to be able to trigger an update and/or deletion.
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil, false)
m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false)
}
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
klog.V(4).InfoS("Pod has been deleted, no need to update readiness", "podUID", string(podUID))
logger.V(4).Info("Pod has been deleted, no need to update readiness", "podUID", podUID)
return
}
oldStatus, found := m.podStatuses[pod.UID]
if !found {
klog.InfoS("Container readiness changed before pod has synced",
logger.Info("Container readiness changed before pod has synced",
"pod", klog.KObj(pod),
"containerID", containerID.String())
return
@@ -352,14 +353,14 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
// Find the container to update.
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
klog.InfoS("Container readiness changed for unknown container",
logger.Info("Container readiness changed for unknown container",
"pod", klog.KObj(pod),
"containerID", containerID.String())
return
}
if containerStatus.Ready == ready {
klog.V(4).InfoS("Container readiness unchanged",
logger.V(4).Info("Container readiness unchanged",
"ready", ready,
"pod", klog.KObj(pod),
"containerID", containerID.String())
@@ -383,7 +384,7 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
if conditionIndex != -1 {
status.Conditions[conditionIndex] = condition
} else {
klog.InfoS("PodStatus missing condition type", "conditionType", conditionType, "status", status)
logger.Info("PodStatus missing condition type", "conditionType", conditionType, "status", status)
status.Conditions = append(status.Conditions, condition)
}
}
@@ -391,22 +392,22 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
allContainerStatuses := append(status.InitContainerStatuses, status.ContainerStatuses...)
updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(pod, &oldStatus.status, status.Conditions, allContainerStatuses, status.Phase))
updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(pod, &oldStatus.status, allContainerStatuses, status.Phase))
m.updateStatusInternal(pod, status, false, false)
m.updateStatusInternal(logger, pod, status, false, false)
}
func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
klog.V(4).InfoS("Pod has been deleted, no need to update startup", "podUID", string(podUID))
logger.V(4).Info("Pod has been deleted, no need to update startup", "podUID", string(podUID))
return
}
oldStatus, found := m.podStatuses[pod.UID]
if !found {
klog.InfoS("Container startup changed before pod has synced",
logger.Info("Container startup changed before pod has synced",
"pod", klog.KObj(pod),
"containerID", containerID.String())
return
@@ -415,14 +416,14 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine
// Find the container to update.
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
klog.InfoS("Container startup changed for unknown container",
logger.Info("Container startup changed for unknown container",
"pod", klog.KObj(pod),
"containerID", containerID.String())
return
}
if containerStatus.Started != nil && *containerStatus.Started == started {
klog.V(4).InfoS("Container startup unchanged",
logger.V(4).Info("Container startup unchanged",
"pod", klog.KObj(pod),
"containerID", containerID.String())
return
@@ -433,7 +434,7 @@ func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontaine
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Started = &started
m.updateStatusInternal(pod, status, false, false)
m.updateStatusInternal(logger, pod, status, false, false)
}
func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
@@ -464,7 +465,7 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta
// to override waiting containers (unless there is evidence the pod previously started those containers).
// It also makes sure that pods are transitioned to a terminal phase (Failed or Succeeded) before
// their deletion.
func (m *manager) TerminatePod(pod *v1.Pod) {
func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
@@ -478,7 +479,7 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
status := *oldStatus.DeepCopy()
// once a pod has initialized, any missing status is treated as a failure
if hasPodInitialized(pod) {
if hasPodInitialized(logger, pod) {
for i := range status.ContainerStatuses {
if status.ContainerStatuses[i].State.Terminated != nil {
continue
@@ -516,23 +517,23 @@ func (m *manager) TerminatePod(pod *v1.Pod) {
// do nothing, already terminal
case v1.PodPending, v1.PodRunning:
if status.Phase == v1.PodRunning && isCached {
klog.InfoS("Terminal running pod should have already been marked as failed, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.Info("Terminal running pod should have already been marked as failed, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
klog.V(3).InfoS("Marking terminal pod as failed", "oldPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(3).Info("Marking terminal pod as failed", "oldPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
status.Phase = v1.PodFailed
default:
klog.ErrorS(fmt.Errorf("unknown phase: %v", status.Phase), "Unknown phase, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.Error(fmt.Errorf("unknown phase: %v", status.Phase), "Unknown phase, programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
status.Phase = v1.PodFailed
}
}
klog.V(5).InfoS("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID)
m.updateStatusInternal(pod, status, true, true)
logger.V(5).Info("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID)
m.updateStatusInternal(logger, pod, status, true, true)
}
// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which
// implies those containers should not be transitioned to terminated status.
func hasPodInitialized(pod *v1.Pod) bool {
func hasPodInitialized(logger klog.Logger, pod *v1.Pod) bool {
// a pod without init containers is always initialized
if len(pod.Spec.InitContainers) == 0 {
return true
@@ -547,7 +548,7 @@ func hasPodInitialized(pod *v1.Pod) bool {
if l := len(pod.Status.InitContainerStatuses); l > 0 {
container, ok := kubeutil.GetContainerByIndex(pod.Spec.InitContainers, pod.Status.InitContainerStatuses, l-1)
if !ok {
klog.V(4).InfoS("Mismatch between pod spec and status, likely programmer error", "pod", klog.KObj(pod), "containerName", container.Name)
logger.V(4).Info("Mismatch between pod spec and status, likely programmer error", "pod", klog.KObj(pod), "containerName", container.Name)
return false
}
@@ -639,7 +640,7 @@ func checkContainerStateTransition(oldStatuses, newStatuses *v1.PodStatus, podSp
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary.
// This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) {
func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) {
var oldStatus v1.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
@@ -647,7 +648,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// TODO(#116484): Also assign terminal phase to static pods.
if !kubetypes.IsStaticPod(pod) {
if cachedStatus.podIsFinished && !podIsFinished {
klog.InfoS("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error.", "pod", klog.KObj(pod))
logger.Info("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error", "pod", klog.KObj(pod))
podIsFinished = true
}
}
@@ -659,7 +660,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// Check for illegal state transition in containers
if err := checkContainerStateTransition(&oldStatus, &status, &pod.Spec); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
logger.Error(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return
}
@@ -699,7 +700,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// Perform some more extensive logging of container termination state to assist in
// debugging production races (generally not needed).
if klogV := klog.V(5); klogV.Enabled() {
if loggerV := logger.V(5); loggerV.Enabled() {
var containers []string
for _, s := range append(append([]v1.ContainerStatus(nil), status.InitContainerStatuses...), status.ContainerStatuses...) {
var current, previous string
@@ -726,13 +727,13 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
containers = append(containers, fmt.Sprintf("(%s state=%s previous=%s)", s.Name, current, previous))
}
sort.Strings(containers)
klogV.InfoS("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
loggerV.Info("updateStatusInternal", "version", cachedStatus.version+1, "podIsFinished", podIsFinished, "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
}
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status)
logger.V(3).Info("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status)
return
}
@@ -789,12 +790,12 @@ func (m *manager) deletePodStatus(uid types.UID) {
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
func (m *manager) RemoveOrphanedStatuses(logger klog.Logger, podUIDs map[types.UID]bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for key := range m.podStatuses {
if _, ok := podUIDs[key]; !ok {
klog.V(5).InfoS("Removing pod from status map.", "podUID", key)
logger.V(5).Info("Removing pod from status map", "podUID", key)
delete(m.podStatuses, key)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
delete(m.podResizeConditions, key)
@@ -805,7 +806,8 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
// syncBatch syncs pods statuses with the apiserver. Returns the number of syncs
// attempted for testing.
func (m *manager) syncBatch(all bool) int {
func (m *manager) syncBatch(ctx context.Context, all bool) int {
logger := klog.FromContext(ctx)
type podSync struct {
podUID types.UID
statusUID kubetypes.MirrorPodUID
@@ -837,7 +839,7 @@ func (m *manager) syncBatch(all bool) int {
uidOfStatus := kubetypes.MirrorPodUID(uid)
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
if mirrorUID == "" {
klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping",
logger.V(5).Info("Static pod does not have a corresponding mirror pod; skipping",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName))
continue
@@ -858,9 +860,9 @@ func (m *manager) syncBatch(all bool) int {
// Ensure that any new status, or mismatched status, or pod that is ready for
// deletion gets updated. If a status update fails we retry the next time any
// other pod is updated.
if m.needsUpdate(types.UID(uidOfStatus), status) {
if m.needsUpdate(logger, types.UID(uidOfStatus), status) {
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
} else if m.needsReconcile(uid, status.status) {
} else if m.needsReconcile(logger, uid, status.status) {
// Delete the apiStatusVersions here to force an update on the pod status
// In most cases the deleted apiStatusVersions here should be filled
// soon after the following syncPod() [If the syncPod() sync an update
@@ -872,19 +874,20 @@ func (m *manager) syncBatch(all bool) int {
}()
for _, update := range updatedStatuses {
klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(update.podUID, update.status)
logger.V(5).Info("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(ctx, update.podUID, update.status)
}
return len(updatedStatuses)
}
// syncPod syncs the given status with the API server. The caller must not hold the status lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
func (m *manager) syncPod(ctx context.Context, uid types.UID, status versionedPodStatus) {
logger := klog.FromContext(ctx)
// TODO: make me easier to express from client code
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(ctx, status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(3).InfoS("Pod does not exist on the server",
logger.V(3).Info("Pod does not exist on the server",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName))
// If the Pod is deleted the status will be cleared in
@@ -892,17 +895,16 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
return
}
if err != nil {
klog.InfoS("Failed to get status for pod",
logger.Error(err, "Failed to get status for pod",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName),
"err", err)
"pod", klog.KRef(status.podNamespace, status.podName))
return
}
translatedUID := m.podManager.TranslatePodUID(pod.UID)
// Type convert original uid just for the purpose of comparison.
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update",
logger.V(2).Info("Pod was deleted and then recreated, skipping status update",
"pod", klog.KObj(pod),
"oldPodUID", uid,
"podUID", translatedUID)
@@ -912,17 +914,17 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
mergedStatus := mergePodStatus(pod, pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(ctx, m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
logger.V(3).Info("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes))
if err != nil {
klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
logger.Error(err, "Failed to update status for pod", "pod", klog.KObj(pod))
return
}
if unchanged {
klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
logger.V(3).Info("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version)
} else {
klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
logger.V(3).Info("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus)
pod = newPod
// We pass a new object (result of API call which contains updated ResourceVersion)
m.podStartupLatencyHelper.RecordStatusUpdated(pod)
@@ -930,7 +932,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// measure how long the status update took to propagate from generation to update on the server
if status.at.IsZero() {
klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version)
logger.V(3).Info("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version)
} else {
duration := time.Since(status.at).Truncate(time.Millisecond)
metrics.PodStatusSyncDuration.Observe(duration.Seconds())
@@ -939,26 +941,26 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
// We don't handle graceful deletion of mirror pods.
if m.canBeDeleted(pod, status.status, status.podIsFinished) {
if m.canBeDeleted(logger, pod, status.status, status.podIsFinished) {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
// Use the pod UID as the precondition for deletion to prevent deleting a
// newly created pod with the same name and namespace.
Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
}
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, deleteOptions)
if err != nil {
klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
logger.Info("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
return
}
klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
logger.V(3).Info("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
m.deletePodStatus(uid)
}
}
// needsUpdate returns whether the status is stale for the given pod UID.
// This method is not thread safe, and must only be accessed by the sync thread.
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
func (m *manager) needsUpdate(logger klog.Logger, uid types.UID, status versionedPodStatus) bool {
latest, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(uid)]
if !ok || latest < status.version {
return true
@@ -967,10 +969,10 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
if !ok {
return false
}
return m.canBeDeleted(pod, status.status, status.podIsFinished)
return m.canBeDeleted(logger, pod, status.status, status.podIsFinished)
}
func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool {
func (m *manager) canBeDeleted(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool {
if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
return false
}
@@ -978,12 +980,12 @@ func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished b
// which comes from pod manager.
if !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
// For debugging purposes we also log the kubelet's local phase, when the deletion is delayed.
klog.V(3).InfoS("Delaying pod deletion as the phase is non-terminal", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(3).Info("Delaying pod deletion as the phase is non-terminal", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
return false
}
// If this is an update completing pod termination then we know the pod termination is finished.
if podIsFinished {
klog.V(3).InfoS("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(3).Info("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
return true
}
return false
@@ -998,18 +1000,18 @@ func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished b
// now the pod manager only supports getting mirror pod by static pod, so we have to pass
// static pod uid here.
// TODO(random-liu): Simplify the logic when mirror pod manager is added.
func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
func (m *manager) needsReconcile(logger klog.Logger, uid types.UID, status v1.PodStatus) bool {
// The pod could be a static pod, so we should translate first.
pod, ok := m.podManager.GetPodByUID(uid)
if !ok {
klog.V(4).InfoS("Pod has been deleted, no need to reconcile", "podUID", string(uid))
logger.V(4).Info("Pod has been deleted, no need to reconcile", "podUID", uid)
return false
}
// If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
if kubetypes.IsStaticPod(pod) {
mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
if !ok {
klog.V(4).InfoS("Static pod has no corresponding mirror pod, no need to reconcile", "pod", klog.KObj(pod))
logger.V(4).Info("Static pod has no corresponding mirror pod, no need to reconcile", "pod", klog.KObj(pod))
return false
}
pod = mirrorPod
@@ -1023,7 +1025,7 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
// reconcile is not needed. Just return.
return false
}
klog.V(3).InfoS("Pod status is inconsistent with cached status for pod, a reconciliation should be triggered",
logger.V(3).Info("Pod status is inconsistent with cached status for pod, a reconciliation should be triggered",
"pod", klog.KObj(pod),
"statusDiff", cmp.Diff(podStatus, &status))

View File

@@ -17,6 +17,7 @@ limitations under the License.
package status
import (
"context"
"fmt"
"math/rand"
"reflect"
@@ -39,6 +40,8 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -73,7 +76,7 @@ func getTestPod() *v1.Pod {
// will be triggered, which will mess up all the old unit test.
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in
// pod manager the same with cached ones before syncBatch(true) so as to avoid reconciling.
func (m *manager) testSyncBatch() {
func (m *manager) testSyncBatch(ctx context.Context) {
for uid, status := range m.podStatuses {
pod, ok := m.podManager.GetPodByUID(uid)
if ok {
@@ -84,7 +87,7 @@ func (m *manager) testSyncBatch() {
pod.Status = status.status
}
}
m.syncBatch(true)
m.syncBatch(ctx, true)
}
func newTestManager(kubeClient clientset.Interface) *manager {
@@ -105,8 +108,9 @@ func getRandomPodStatus() v1.PodStatus {
}
func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) {
_, ctx := ktesting.NewTestContext(t)
t.Helper()
manager.consumeUpdates()
manager.consumeUpdates(ctx)
actions := manager.kubeClient.(*fake.Clientset).Actions()
defer manager.kubeClient.(*fake.Clientset).ClearActions()
if len(actions) != len(expectedActions) {
@@ -122,20 +126,21 @@ func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action
}
func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
_, ctx := ktesting.NewTestContext(t)
t.Helper()
// Consume all updates in the channel.
numUpdates := manager.consumeUpdates()
numUpdates := manager.consumeUpdates(ctx)
if numUpdates != expectedUpdates {
t.Errorf("unexpected number of updates %d, expected %d", numUpdates, expectedUpdates)
}
}
func (m *manager) consumeUpdates() int {
func (m *manager) consumeUpdates(ctx context.Context) int {
updates := 0
for {
select {
case <-m.podStatusChannel:
updates += m.syncBatch(false)
updates += m.syncBatch(ctx, false)
default:
return updates
}
@@ -143,9 +148,10 @@ func (m *manager) consumeUpdates() int {
}
func TestNewStatus(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(logger, testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
status := expectPodStatus(t, syncer, testPod)
@@ -155,6 +161,7 @@ func TestNewStatus(t *testing.T) {
}
func TestNewStatusPreservesPodStartTime(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@@ -167,7 +174,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) {
now := metav1.Now()
startTime := metav1.NewTime(now.Time.Add(-1 * time.Minute))
pod.Status.StartTime = &startTime
syncer.SetPodStatus(pod, getRandomPodStatus())
syncer.SetPodStatus(logger, pod, getRandomPodStatus())
status := expectPodStatus(t, syncer, pod)
if !status.StartTime.Time.Equal(startTime.Time) {
@@ -187,6 +194,7 @@ func getReadyPodStatus() v1.PodStatus {
}
func TestNewStatusSetsReadyTransitionTime(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
podStatus := getReadyPodStatus()
pod := &v1.Pod{
@@ -197,33 +205,35 @@ func TestNewStatusSetsReadyTransitionTime(t *testing.T) {
},
Status: v1.PodStatus{},
}
syncer.SetPodStatus(pod, podStatus)
syncer.SetPodStatus(logger, pod, podStatus)
verifyUpdates(t, syncer, 1)
status := expectPodStatus(t, syncer, pod)
readyCondition := podutil.GetPodReadyCondition(status)
if readyCondition.LastTransitionTime.IsZero() {
t.Errorf("Unexpected: last transition time not set")
logger.Error(nil, "Unexpected: last transition time not set")
}
}
func TestChangedStatus(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(logger, testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(logger, testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
}
func TestChangedStatusKeepsStartTime(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
now := metav1.Now()
firstStatus := getRandomPodStatus()
firstStatus.StartTime = &now
syncer.SetPodStatus(testPod, firstStatus)
syncer.SetPodStatus(logger, testPod, firstStatus)
verifyUpdates(t, syncer, 1)
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(logger, testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
finalStatus := expectPodStatus(t, syncer, testPod)
if finalStatus.StartTime.IsZero() {
@@ -236,6 +246,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
}
func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
podStatus := getReadyPodStatus()
pod := &v1.Pod{
@@ -246,12 +257,12 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
},
Status: v1.PodStatus{},
}
syncer.SetPodStatus(pod, podStatus)
syncer.SetPodStatus(logger, pod, podStatus)
verifyUpdates(t, syncer, 1)
oldStatus := expectPodStatus(t, syncer, pod)
anotherStatus := getReadyPodStatus()
anotherStatus.Conditions[0].Status = v1.ConditionFalse
syncer.SetPodStatus(pod, anotherStatus)
syncer.SetPodStatus(logger, pod, anotherStatus)
verifyUpdates(t, syncer, 1)
newStatus := expectPodStatus(t, syncer, pod)
@@ -266,15 +277,17 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
}
func TestUnchangedStatus(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
podStatus := getRandomPodStatus()
syncer.SetPodStatus(testPod, podStatus)
syncer.SetPodStatus(testPod, podStatus)
syncer.SetPodStatus(logger, testPod, podStatus)
syncer.SetPodStatus(logger, testPod, podStatus)
verifyUpdates(t, syncer, 1)
}
func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
podStatus := getReadyPodStatus()
pod := &v1.Pod{
@@ -285,11 +298,11 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
},
Status: v1.PodStatus{},
}
syncer.SetPodStatus(pod, podStatus)
syncer.SetPodStatus(logger, pod, podStatus)
verifyUpdates(t, syncer, 1)
oldStatus := expectPodStatus(t, syncer, pod)
anotherStatus := getReadyPodStatus()
syncer.SetPodStatus(pod, anotherStatus)
syncer.SetPodStatus(logger, pod, anotherStatus)
// No update.
verifyUpdates(t, syncer, 0)
newStatus := expectPodStatus(t, syncer, pod)
@@ -305,24 +318,27 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
}
func TestSyncPodIgnoresNotFound(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
client := fake.Clientset{}
syncer := newTestManager(&client)
client.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod")
})
syncer.SetPodStatus(getTestPod(), getRandomPodStatus())
syncer.SetPodStatus(logger, getTestPod(), getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction()})
}
func TestSyncPod(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
syncer.kubeClient = fake.NewSimpleClientset(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(logger, testPod, getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
}
func TestSyncPodChecksMismatchedUID(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
pod := getTestPod()
pod.UID = "first"
@@ -331,11 +347,12 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) {
differentPod.UID = "second"
syncer.podManager.(mutablePodManager).AddPod(differentPod)
syncer.kubeClient = fake.NewSimpleClientset(pod)
syncer.SetPodStatus(differentPod, getRandomPodStatus())
syncer.SetPodStatus(logger, differentPod, getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction()})
}
func TestSyncPodNoDeadlock(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
client := &fake.Clientset{}
m := newTestManager(client)
pod := getTestPod()
@@ -360,71 +377,72 @@ func TestSyncPodNoDeadlock(t *testing.T) {
t.Logf("Pod not found.")
ret = nil
err = errors.NewNotFound(api.Resource("pods"), pod.Name)
m.SetPodStatus(pod, getRandomPodStatus())
m.SetPodStatus(logger, pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction()})
t.Logf("Pod was recreated.")
ret = getTestPod()
ret.UID = "other_pod"
err = nil
m.SetPodStatus(pod, getRandomPodStatus())
m.SetPodStatus(logger, pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction()})
t.Logf("Pod not deleted (success case).")
ret = getTestPod()
m.SetPodStatus(pod, getRandomPodStatus())
m.SetPodStatus(logger, pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Pod is terminated, but still running.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
m.SetPodStatus(pod, getRandomPodStatus())
m.SetPodStatus(logger, pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Pod is terminated successfully.")
pod.Status.ContainerStatuses[0].State.Running = nil
pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{}
m.SetPodStatus(pod, getRandomPodStatus())
m.SetPodStatus(logger, pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Error case.")
ret = nil
err = fmt.Errorf("intentional test error")
m.SetPodStatus(pod, getRandomPodStatus())
m.SetPodStatus(logger, pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction()})
}
func TestStaleUpdates(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
pod := getTestPod()
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
status := v1.PodStatus{Message: "initial status"}
m.SetPodStatus(pod, status)
m.SetPodStatus(logger, pod, status)
status.Message = "first version bump"
m.SetPodStatus(pod, status)
m.SetPodStatus(logger, pod, status)
status.Message = "second version bump"
m.SetPodStatus(pod, status)
m.SetPodStatus(logger, pod, status)
t.Logf("sync batch before syncPods pushes latest status, resulting in one update during the batch")
m.syncBatch(true)
m.syncBatch(ctx, true)
verifyUpdates(t, m, 0)
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{})
t.Log("Unchanged status should not send an update")
m.SetPodStatus(pod, status)
logger.Info("Unchanged status should not send an update")
m.SetPodStatus(logger, pod, status)
verifyUpdates(t, m, 0)
t.Log("... even if it's stale as long as nothing changes")
logger.Info("... even if it's stale as long as nothing changes")
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1
m.SetPodStatus(pod, status)
m.syncBatch(true)
m.SetPodStatus(logger, pod, status)
m.syncBatch(ctx, true)
verifyActions(t, m, []core.Action{getAction()})
t.Logf("Nothing stuck in the pipe.")
logger.Info("Nothing stuck in the pipe")
verifyUpdates(t, m, 0)
}
@@ -571,6 +589,7 @@ func TestStatusNormalizeTimeStamp(t *testing.T) {
}
func TestStaticPod(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
staticPod := getTestPod()
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
mirrorPod := getTestPod()
@@ -589,7 +608,7 @@ func TestStaticPod(t *testing.T) {
status := getRandomPodStatus()
now := metav1.Now()
status.StartTime = &now
m.SetPodStatus(staticPod, status)
m.SetPodStatus(logger, staticPod, status)
t.Logf("Should be able to get the static pod status from status manager")
retrievedStatus := expectPodStatus(t, m, staticPod)
@@ -597,7 +616,7 @@ func TestStaticPod(t *testing.T) {
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.")
m.syncBatch(true)
m.syncBatch(ctx, true)
assert.Empty(t, m.kubeClient.(*fake.Clientset).Actions(), "Expected no updates after syncBatch")
t.Logf("Create the mirror pod")
@@ -610,11 +629,11 @@ func TestStaticPod(t *testing.T) {
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should sync pod because the corresponding mirror pod is created")
assert.Equal(t, 1, m.syncBatch(true))
assert.Equal(t, 1, m.syncBatch(ctx, true))
verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("syncBatch should not sync any pods because nothing is changed.")
m.testSyncBatch()
m.testSyncBatch(ctx)
verifyActions(t, m, []core.Action{})
t.Logf("Change mirror pod identity.")
@@ -624,11 +643,12 @@ func TestStaticPod(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(mirrorPod)
t.Logf("Should not update to mirror pod, because UID has changed.")
assert.Equal(t, 1, m.syncBatch(true))
assert.Equal(t, 1, m.syncBatch(ctx, true))
verifyActions(t, m, []core.Action{getAction()})
}
func TestTerminatePod(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
testPod.Spec.InitContainers = []v1.Container{
@@ -641,7 +661,7 @@ func TestTerminatePod(t *testing.T) {
{Name: "test-2"},
{Name: "test-3"},
}
t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.")
logger.Info("update the pod's status to Failed. TerminatePod should preserve this status update.")
firstStatus := getRandomPodStatus()
firstStatus.Phase = v1.PodFailed
firstStatus.InitContainerStatuses = []v1.ContainerStatus{
@@ -657,9 +677,9 @@ func TestTerminatePod(t *testing.T) {
{Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
{Name: "test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 0}}},
}
syncer.SetPodStatus(testPod, firstStatus)
syncer.SetPodStatus(logger, testPod, firstStatus)
t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod")
logger.Info("set the testPod to a pod with Phase running, to simulate a stale pod")
testPod.Status = getRandomPodStatus()
testPod.Status.Phase = v1.PodRunning
testPod.Status.InitContainerStatuses = []v1.ContainerStatus{
@@ -673,9 +693,9 @@ func TestTerminatePod(t *testing.T) {
{Name: "test-3", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
}
syncer.TerminatePod(testPod)
syncer.TerminatePod(logger, testPod)
t.Logf("we expect the container statuses to have changed to terminated")
logger.Info("we expect the container statuses to have changed to terminated")
newStatus := expectPodStatus(t, syncer, testPod)
for i := range newStatus.ContainerStatuses {
assert.NotNil(t, newStatus.ContainerStatuses[i].State.Terminated, "expected containers to be terminated")
@@ -686,30 +706,31 @@ func TestTerminatePod(t *testing.T) {
expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: kubecontainer.ContainerReasonStatusUnknown, Message: "The container could not be located when the pod was terminated", ExitCode: 137}}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) {
t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState))
logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState))
}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses)
}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses)
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) {
t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState))
logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState))
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses)
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, firstStatus.ContainerStatuses[2].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses)
}
t.Logf("we expect the previous status update to be preserved.")
logger.Info("we expect the previous status update to be preserved.")
assert.Equal(t, newStatus.Phase, firstStatus.Phase)
assert.Equal(t, newStatus.Message, firstStatus.Message)
}
func TestTerminatePodWaiting(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
testPod.Spec.InitContainers = []v1.Container{
@@ -722,7 +743,7 @@ func TestTerminatePodWaiting(t *testing.T) {
{Name: "test-2"},
{Name: "test-3"},
}
t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.")
logger.Info("update the pod's status to Failed. TerminatePod should preserve this status update.")
firstStatus := getRandomPodStatus()
firstStatus.Phase = v1.PodFailed
firstStatus.InitContainerStatuses = []v1.ContainerStatus{
@@ -739,9 +760,9 @@ func TestTerminatePodWaiting(t *testing.T) {
{Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
{Name: "test-3", State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "Test"}}},
}
syncer.SetPodStatus(testPod, firstStatus)
syncer.SetPodStatus(logger, testPod, firstStatus)
t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod")
logger.Info("set the testPod to a pod with Phase running, to simulate a stale pod")
testPod.Status = getRandomPodStatus()
testPod.Status.Phase = v1.PodRunning
testPod.Status.InitContainerStatuses = []v1.ContainerStatus{
@@ -755,9 +776,9 @@ func TestTerminatePodWaiting(t *testing.T) {
{Name: "test-3", State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}},
}
syncer.TerminatePod(testPod)
syncer.TerminatePod(logger, testPod)
t.Logf("we expect the container statuses to have changed to terminated")
logger.Info("we expect the container statuses to have changed to terminated")
newStatus := expectPodStatus(t, syncer, testPod)
for _, container := range newStatus.ContainerStatuses {
assert.NotNil(t, container.State.Terminated, "expected containers to be terminated")
@@ -771,30 +792,31 @@ func TestTerminatePodWaiting(t *testing.T) {
expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: kubecontainer.ContainerReasonStatusUnknown, Message: "The container could not be located when the pod was terminated", ExitCode: 137}}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) {
t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState))
logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.InitContainerStatuses[0].State, expectUnknownState))
}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses)
}
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) {
t.Errorf("waiting container state not defaulted: %s", cmp.Diff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State))
logger.Error(nil, "waiting container state not defaulted", "diff", cmp.Diff(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State))
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) {
t.Errorf("terminated container state not defaulted: %s", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState))
logger.Error(nil, "terminated container state not defaulted", "diff", cmp.Diff(newStatus.ContainerStatuses[0].State, expectUnknownState))
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) {
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
logger.Error(nil, "existing terminated container state not preserved", "containerStatuses", newStatus.ContainerStatuses)
}
if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, expectUnknownState) {
t.Errorf("waiting container state not defaulted: %s", cmp.Diff(newStatus.ContainerStatuses[2].State, expectUnknownState))
logger.Error(nil, "waiting container state not defaulted", "diff", cmp.Diff(newStatus.ContainerStatuses[2].State, expectUnknownState))
}
t.Logf("we expect the previous status update to be preserved.")
logger.Info("we expect the previous status update to be preserved.")
assert.Equal(t, newStatus.Phase, firstStatus.Phase)
assert.Equal(t, newStatus.Message, firstStatus.Message)
}
func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
newPod := func(initContainers, containers int, fns ...func(*v1.Pod)) *v1.Pod {
pod := getTestPod()
for i := 0; i < initContainers; i++ {
@@ -816,25 +838,30 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
expectTerminatedUnknown := func(t *testing.T, state v1.ContainerState) {
t.Helper()
if state.Terminated == nil || state.Running != nil || state.Waiting != nil {
t.Fatalf("unexpected state: %#v", state)
logger.Error(nil, "unexpected state", "state", state)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if state.Terminated.ExitCode != 137 || state.Terminated.Reason != kubecontainer.ContainerReasonStatusUnknown || len(state.Terminated.Message) == 0 {
t.Fatalf("unexpected terminated state: %#v", state.Terminated)
logger.Error(nil, "unexpected terminated state", "terminatedState", state.Terminated)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
expectTerminated := func(t *testing.T, state v1.ContainerState, exitCode int32) {
t.Helper()
if state.Terminated == nil || state.Running != nil || state.Waiting != nil {
t.Fatalf("unexpected state: %#v", state)
logger.Error(nil, "unexpected state", "state", state)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if state.Terminated.ExitCode != exitCode {
t.Fatalf("unexpected terminated state: %#v", state.Terminated)
logger.Error(nil, "unexpected terminated state", "terminatedState", state.Terminated)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
expectWaiting := func(t *testing.T, state v1.ContainerState) {
t.Helper()
if state.Terminated != nil || state.Running != nil || state.Waiting == nil {
t.Fatalf("unexpected state: %#v", state)
logger.Error(nil, "unexpected state", "state", state)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
@@ -880,7 +907,8 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
expectFn: func(t *testing.T, status v1.PodStatus) {
container := status.ContainerStatuses[0]
if container.LastTerminationState.Terminated.ExitCode != 2 {
t.Fatalf("unexpected last state: %#v", container.LastTerminationState)
logger.Error(nil, "unexpected last state", "lastTerminationState", container.LastTerminationState)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
expectTerminatedUnknown(t, container.State)
},
@@ -1084,7 +1112,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
original := tc.pod.DeepCopy()
syncer.SetPodStatus(original, original.Status)
syncer.SetPodStatus(logger, original, original.Status)
copied := tc.pod.DeepCopy()
if tc.updateFn != nil {
@@ -1092,7 +1120,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
}
expected := copied.DeepCopy()
syncer.TerminatePod(copied)
syncer.TerminatePod(logger, copied)
status := expectPodStatus(t, syncer, tc.pod.DeepCopy())
if tc.expectFn != nil {
tc.expectFn(t, status)
@@ -1101,9 +1129,11 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
if !reflect.DeepEqual(expected.Status, status) {
diff := cmp.Diff(expected.Status, status)
if len(diff) == 0 {
t.Fatalf("diff returned no results for failed DeepEqual: %#v != %#v", expected.Status, status)
logger.Error(nil, "diff returned no results for failed DeepEqual", "expectedStatus", expected.Status, "actualStatus", status)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
t.Fatalf("unexpected status: %s", diff)
logger.Error(nil, "unexpected status", "diff", diff)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
})
}
@@ -1165,22 +1195,25 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
pod := getTestPod()
pod.Status = tc.status
syncer.TerminatePod(pod)
syncer.TerminatePod(logger, pod)
gotStatus := expectPodStatus(t, syncer, pod.DeepCopy())
if diff := cmp.Diff(tc.wantStatus, gotStatus, cmpopts.IgnoreFields(v1.PodStatus{}, "StartTime")); diff != "" {
t.Fatalf("unexpected status: %s", diff)
logger.Error(nil, "unexpected status", "diff", diff)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
})
}
}
func TestSetContainerReadiness(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"}
cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"}
containerStatuses := []v1.ContainerStatus{
@@ -1232,44 +1265,45 @@ func TestSetContainerReadiness(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(pod)
t.Log("Setting readiness before status should fail.")
m.SetContainerReadiness(pod.UID, cID1, true)
m.SetContainerReadiness(logger, pod.UID, cID1, true)
verifyUpdates(t, m, 0)
if status, ok := m.GetPodStatus(pod.UID); ok {
t.Errorf("Unexpected PodStatus: %+v", status)
}
t.Log("Setting initial status.")
m.SetPodStatus(pod, status)
m.SetPodStatus(logger, pod, status)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyReadiness("initial", &status, false, false, false)
t.Log("Setting unchanged readiness should do nothing.")
m.SetContainerReadiness(pod.UID, cID1, false)
m.SetContainerReadiness(logger, pod.UID, cID1, false)
verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod)
verifyReadiness("unchanged", &status, false, false, false)
t.Log("Setting container readiness should generate update but not pod readiness.")
m.SetContainerReadiness(pod.UID, cID1, true)
m.SetContainerReadiness(logger, pod.UID, cID1, true)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyReadiness("c1 ready", &status, true, false, false)
t.Log("Setting both containers to ready should update pod readiness.")
m.SetContainerReadiness(pod.UID, cID2, true)
m.SetContainerReadiness(logger, pod.UID, cID2, true)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyReadiness("all ready", &status, true, true, true)
t.Log("Setting non-existent container readiness should fail.")
m.SetContainerReadiness(pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
m.SetContainerReadiness(logger, pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod)
verifyReadiness("ignore non-existent", &status, true, true, true)
}
func TestSetContainerStartup(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cID1 := kubecontainer.ContainerID{Type: "test", ID: "1"}
cID2 := kubecontainer.ContainerID{Type: "test", ID: "2"}
containerStatuses := []v1.ContainerStatus{
@@ -1299,61 +1333,63 @@ func TestSetContainerStartup(t *testing.T) {
switch c.ContainerID {
case cID1.String():
if (c.Started != nil && *c.Started) != c1Started {
t.Errorf("[%s] Expected startup of c1 to be %v but was %v", step, c1Started, c.Started)
logger.Error(nil, "Error in startup of c1", "expected", c1Started, "current", c.Started)
}
case cID2.String():
if (c.Started != nil && *c.Started) != c2Started {
t.Errorf("[%s] Expected startup of c2 to be %v but was %v", step, c2Started, c.Started)
logger.Error(nil, "Error in startup of c2", "step", step, "expected", c2Started, "current", c.Started)
}
default:
t.Fatalf("[%s] Unexpected container: %+v", step, c)
logger.Error(nil, "Unexpected container", "step", step, "container", c)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
}
m := newTestManager(&fake.Clientset{})
// Add test pod because the container spec has been changed.
// Add test pod because the container spec has been changed
m.podManager.(mutablePodManager).AddPod(pod)
t.Log("Setting startup before status should fail.")
m.SetContainerStartup(pod.UID, cID1, true)
logger.Info("Setting startup before status should fail")
m.SetContainerStartup(logger, pod.UID, cID1, true)
verifyUpdates(t, m, 0)
if status, ok := m.GetPodStatus(pod.UID); ok {
t.Errorf("Unexpected PodStatus: %+v", status)
}
t.Log("Setting initial status.")
m.SetPodStatus(pod, status)
logger.Info("Setting initial status")
m.SetPodStatus(logger, pod, status)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyStartup("initial", &status, false, false, false)
t.Log("Setting unchanged startup should do nothing.")
m.SetContainerStartup(pod.UID, cID1, false)
logger.Info("Setting unchanged startup should do nothing")
m.SetContainerStartup(logger, pod.UID, cID1, false)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyStartup("unchanged", &status, false, false, false)
t.Log("Setting container startup should generate update but not pod startup.")
m.SetContainerStartup(pod.UID, cID1, true)
logger.Info("Setting container startup should generate update but not pod startup")
m.SetContainerStartup(logger, pod.UID, cID1, true)
verifyUpdates(t, m, 1) // Started = nil to false
status = expectPodStatus(t, m, pod)
verifyStartup("c1 ready", &status, true, false, false)
t.Log("Setting both containers to ready should update pod startup.")
m.SetContainerStartup(pod.UID, cID2, true)
logger.Info("Setting both containers to ready should update pod startup")
m.SetContainerStartup(logger, pod.UID, cID2, true)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyStartup("all ready", &status, true, true, true)
t.Log("Setting non-existent container startup should fail.")
m.SetContainerStartup(pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
logger.Info("Setting non-existent container startup should fail")
m.SetContainerStartup(logger, pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod)
verifyStartup("ignore non-existent", &status, true, true, true)
}
func TestSyncBatchCleanupVersions(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
m := newTestManager(&fake.Clientset{})
testPod := getTestPod()
mirrorPod := getTestPod()
@@ -1367,16 +1403,16 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
t.Logf("Orphaned pods should be removed.")
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.syncBatch(true)
m.syncBatch(ctx, true)
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok {
t.Errorf("Should have cleared status for testPod")
logger.Error(nil, "Should have cleared status for testPod")
}
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)]; ok {
t.Errorf("Should have cleared status for mirrorPod")
logger.Error(nil, "Should have cleared status for mirrorPod")
}
t.Logf("Non-orphaned pods should not be removed.")
m.SetPodStatus(testPod, getRandomPodStatus())
m.SetPodStatus(logger, testPod, getRandomPodStatus())
m.podManager.(mutablePodManager).AddPod(mirrorPod)
staticPod := mirrorPod
staticPod.UID = "static-uid"
@@ -1384,7 +1420,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(staticPod)
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.testSyncBatch()
m.testSyncBatch(ctx)
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; !ok {
t.Errorf("Should not have cleared status for testPod")
}
@@ -1394,12 +1430,13 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
}
func TestReconcilePodStatus(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
testPod := getTestPod()
client := fake.NewSimpleClientset(testPod)
syncer := newTestManager(client)
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(logger, testPod, getRandomPodStatus())
t.Logf("Call syncBatch directly to test reconcile")
syncer.syncBatch(true) // The apiStatusVersions should be set now
syncer.syncBatch(ctx, true) // The apiStatusVersions should be set now
client.ClearActions()
podStatus, ok := syncer.GetPodStatus(testPod.UID)
@@ -1410,11 +1447,11 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing")
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) {
if syncer.needsReconcile(logger, testPod.UID, podStatus) {
t.Fatalf("Pod status is the same, a reconciliation is not needed")
}
syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch(true)
syncer.SetPodStatus(logger, testPod, podStatus)
syncer.syncBatch(ctx, true)
verifyActions(t, syncer, []core.Action{})
// If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond),
@@ -1425,21 +1462,21 @@ func TestReconcilePodStatus(t *testing.T) {
normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy()
testPod.Status.StartTime = &normalizedStartTime
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) {
if syncer.needsReconcile(logger, testPod.UID, podStatus) {
t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
}
syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch(true)
syncer.SetPodStatus(logger, testPod, podStatus)
syncer.syncBatch(ctx, true)
verifyActions(t, syncer, []core.Action{})
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
changedPodStatus := getRandomPodStatus()
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if !syncer.needsReconcile(testPod.UID, changedPodStatus) {
if !syncer.needsReconcile(logger, testPod.UID, changedPodStatus) {
t.Fatalf("Pod status is different, a reconciliation is needed")
}
syncer.SetPodStatus(testPod, changedPodStatus)
syncer.syncBatch(true)
syncer.SetPodStatus(logger, testPod, changedPodStatus)
syncer.syncBatch(ctx, true)
verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
}
@@ -1452,6 +1489,7 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
}
func TestDeletePodBeforeFinished(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
pod := getTestPod()
t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
@@ -1460,12 +1498,13 @@ func TestDeletePodBeforeFinished(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(pod)
status := getRandomPodStatus()
status.Phase = v1.PodFailed
m.SetPodStatus(pod, status)
m.SetPodStatus(logger, pod, status)
t.Logf("Expect not to see a delete action as the pod isn't finished yet (TerminatePod isn't called)")
verifyActions(t, m, []core.Action{getAction(), patchAction()})
}
func TestDeletePodFinished(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
pod := getTestPod()
t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
@@ -1474,12 +1513,13 @@ func TestDeletePodFinished(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(pod)
status := getRandomPodStatus()
status.Phase = v1.PodFailed
m.TerminatePod(pod)
m.TerminatePod(logger, pod)
t.Logf("Expect to see a delete action as the pod is finished (TerminatePod called)")
verifyActions(t, m, []core.Action{getAction(), patchAction(), deleteAction()})
}
func TestDoNotDeleteMirrorPods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
staticPod := getTestPod()
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
mirrorPod := getTestPod()
@@ -1502,7 +1542,7 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
status := getRandomPodStatus()
now := metav1.Now()
status.StartTime = &now
m.SetPodStatus(staticPod, status)
m.SetPodStatus(logger, staticPod, status)
t.Logf("Expect not to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), patchAction()})