From bf98e45afb3a9a95cb4ef648eaa73d6d09587df4 Mon Sep 17 00:00:00 2001 From: Chulong Li Date: Sun, 13 Apr 2025 19:07:58 +0800 Subject: [PATCH] Migrate pkg/kubelet/volumemanager to contextual logging Remove context.TODO and context.Background Fix linter error in volume_manager_test Fix QF1008: Could remove embedded field "ObjectMeta" from selector Remove the extra code change Remove the extra change Update the NewTestContext --- hack/golangci-hints.yaml | 1 + hack/golangci.yaml | 1 + hack/logcheck.conf | 1 + .../cache/actual_state_of_world.go | 28 +++-- .../cache/actual_state_of_world_test.go | 15 ++- .../cache/desired_state_of_world.go | 24 ++-- .../cache/desired_state_of_world_test.go | 37 +++--- .../volumemanager/metrics/metrics_test.go | 4 +- .../desired_state_of_world_populator.go | 42 ++++--- .../desired_state_of_world_populator_test.go | 51 ++++---- .../volumemanager/reconciler/reconciler.go | 24 ++-- .../reconciler/reconciler_common.go | 74 ++++++------ .../reconciler/reconciler_test.go | 111 ++++++++++-------- .../volumemanager/reconciler/reconstruct.go | 57 ++++----- .../reconciler/reconstruct_common.go | 14 +-- .../reconciler/reconstruct_test.go | 14 ++- pkg/kubelet/volumemanager/volume_manager.go | 19 +-- .../volumemanager/volume_manager_test.go | 34 ++++-- 18 files changed, 311 insertions(+), 240 deletions(-) diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 4f2d0910ea6..99d7190b35c 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -218,6 +218,7 @@ linters: contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.* contextual k8s.io/kubernetes/pkg/kubelet/pod/.* contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* + contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci.yaml b/hack/golangci.yaml index 314012cf5df..a076413522d 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -232,6 +232,7 @@ linters: contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.* contextual k8s.io/kubernetes/pkg/kubelet/pod/.* contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* + contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 2c27499bb5e..14535a0fa6f 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -64,6 +64,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.* contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.* contextual k8s.io/kubernetes/pkg/kubelet/pod/.* contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* +contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index ad324475182..ea5cf573da3 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -68,7 +68,7 @@ type ActualStateOfWorld interface { // volume indicates it requires remounting on pod updates). Atomically // updating volumes depend on this to update the contents of the volume on // pod update. - MarkRemountRequired(podName volumetypes.UniquePodName) + MarkRemountRequired(logger klog.Logger, podName volumetypes.UniquePodName) // SetDeviceMountState sets device mount state for the given volume. When deviceMountState is set to DeviceGloballyMounted // then device is mounted at a global mount point. When it is set to DeviceMountUncertain then also it means volume @@ -110,7 +110,7 @@ type ActualStateOfWorld interface { // volumes, depend on this to update the contents of the volume. // All volume mounting calls should be idempotent so a second mount call for // volumes that do not need to update contents should not fail. - PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) + PodExistsInVolume(logger klog.Logger, podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) // PodRemovedFromVolume returns true if the given pod does not exist in the list of // mountedPods for the given volume in the cache, indicating that the pod has @@ -182,7 +182,7 @@ type ActualStateOfWorld interface { GetAttachedVolume(volumeName v1.UniqueVolumeName) (AttachedVolume, bool) // Add the specified volume to ASW as uncertainly attached. - AddAttachUncertainReconstructedVolume(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error + AddAttachUncertainReconstructedVolume(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error // UpdateReconstructedDevicePath updates devicePath of a reconstructed volume // from Node.Status.VolumesAttached. The ASW is updated only when the volume is still @@ -393,13 +393,14 @@ func (asw *actualStateOfWorld) MarkVolumeAsAttached( pluginIsAttachable = volumeAttachabilityTrue } - return asw.addVolume(volumeName, volumeSpec, devicePath, pluginIsAttachable) + return asw.addVolume(logger, volumeName, volumeSpec, devicePath, pluginIsAttachable) } func (asw *actualStateOfWorld) AddAttachUncertainReconstructedVolume( + logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, _ types.NodeName, devicePath string) error { - return asw.addVolume(volumeName, volumeSpec, devicePath, volumeAttachabilityUncertain) + return asw.addVolume(logger, volumeName, volumeSpec, devicePath, volumeAttachabilityUncertain) } func (asw *actualStateOfWorld) MarkVolumeAsUncertain( @@ -679,7 +680,7 @@ func (asw *actualStateOfWorld) IsVolumeMountedElsewhere(volumeName v1.UniqueVolu // volume with the same generated name already exists, this is a noop. If no // volume plugin can support the given volumeSpec or more than one plugin can // support it, an error is returned. -func (asw *actualStateOfWorld) addVolume( +func (asw *actualStateOfWorld) addVolume(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string, attachability volumeAttachability) error { asw.Lock() defer asw.Unlock() @@ -717,7 +718,7 @@ func (asw *actualStateOfWorld) addVolume( } else { // If volume object already exists, update the fields such as device path volumeObj.devicePath = devicePath - klog.V(2).InfoS("Volume is already added to attachedVolume list, update device path", "volumeName", volumeName, "path", devicePath) + logger.V(2).Info("Volume is already added to attachedVolume list, update device path", "volumeName", volumeName, "path", devicePath) } asw.attachedVolumes[volumeName] = volumeObj @@ -807,6 +808,7 @@ func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeNam } func (asw *actualStateOfWorld) MarkRemountRequired( + logger klog.Logger, podName volumetypes.UniquePodName) { asw.Lock() defer asw.Unlock() @@ -816,7 +818,7 @@ func (asw *actualStateOfWorld) MarkRemountRequired( asw.volumePluginMgr.FindPluginBySpec(podObj.volumeSpec) if err != nil || volumePlugin == nil { // Log and continue processing - klog.ErrorS(nil, "MarkRemountRequired failed to FindPluginBySpec for volume", "uniquePodName", podObj.podName, "podUID", podObj.podUID, "volumeName", volumeName, "volumeSpecName", podObj.volumeSpec.Name()) + logger.Error(nil, "MarkRemountRequired failed to FindPluginBySpec for volume", "uniquePodName", podObj.podName, "podUID", podObj.podUID, "volumeName", volumeName, "volumeSpecName", podObj.volumeSpec.Name()) continue } @@ -926,7 +928,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro return nil } -func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) { +func (asw *actualStateOfWorld) PodExistsInVolume(logger klog.Logger, podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) { asw.RLock() defer asw.RUnlock() @@ -952,7 +954,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodNa if podObj.remountRequired { return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) } - if currentSize, expandVolume := asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize); expandVolume { + if currentSize, expandVolume := asw.volumeNeedsExpansion(logger, volumeObj, desiredVolumeSize); expandVolume { return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName, currentSize) } } @@ -974,7 +976,7 @@ func (asw *actualStateOfWorld) PodHasMountedVolumes(podName volumetypes.UniquePo return false } -func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) { +func (asw *actualStateOfWorld) volumeNeedsExpansion(logger klog.Logger, volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) { currentSize := volumeObj.persistentVolumeSize.DeepCopy() if volumeObj.volumeInUseErrorForExpansion { return currentSize, false @@ -983,13 +985,13 @@ func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, de return currentSize, false } - klog.V(5).InfoS("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName) + logger.V(5).Info("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName) if desiredVolumeSize.Cmp(volumeObj.persistentVolumeSize) > 0 { volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec) if err != nil || volumePlugin == nil { // Log and continue processing - klog.InfoS("PodExistsInVolume failed to find expandable plugin", + logger.Info("PodExistsInVolume failed to find expandable plugin", "volume", volumeObj.volumeName, "volumeSpecName", volumeObj.spec.Name()) return currentSize, false diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 5191c8edef7..f7ccaded3ce 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -470,6 +470,7 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { // Test if volumes that were recorded to be read from disk during reconstruction // are handled correctly by the ASOW. func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) tests := []struct { name string opCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error @@ -585,7 +586,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec( plugin, volumeSpec1) require.NoError(t, err) - err = asw.AddAttachUncertainReconstructedVolume(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath) + err = asw.AddAttachUncertainReconstructedVolume(logger, generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath) if err != nil { t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) } @@ -1077,7 +1078,7 @@ func TestUncertainVolumeMounts(t *testing.T) { t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name()) } - volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, resource.Quantity{}, "") + volExists, _, _ := asw.PodExistsInVolume(logger, podName1, generatedVolumeName1, resource.Quantity{}, "") if volExists { t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) } @@ -1215,8 +1216,9 @@ func verifyPodExistsInVolumeAswWithSELinux( expectedDevicePath string, expectedSELinuxLabel string, asw ActualStateOfWorld) { + logger, _ := ktesting.NewTestContext(t) podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(expectedPodName, expectedVolumeName, resource.Quantity{}, expectedSELinuxLabel) + asw.PodExistsInVolume(logger, expectedPodName, expectedVolumeName, resource.Quantity{}, expectedSELinuxLabel) if err != nil { t.Fatalf( "ASW PodExistsInVolume failed. Expected: Actual: <%v>", err) @@ -1257,8 +1259,9 @@ func verifyPodDoesntExistInVolumeAsw( volumeToCheck v1.UniqueVolumeName, expectVolumeToExist bool, asw ActualStateOfWorld) { + logger, _ := ktesting.NewTestContext(t) podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{}, "") + asw.PodExistsInVolume(logger, podToCheck, volumeToCheck, resource.Quantity{}, "") if !expectVolumeToExist && err == nil { t.Fatalf( "ASW PodExistsInVolume did not return error. Expected: Actual: <%v>", err) @@ -1288,8 +1291,8 @@ func verifyPodExistsInVolumeSELinuxMismatch( volumeToCheck v1.UniqueVolumeName, unexpectedSELinuxLabel string, asw ActualStateOfWorld) { - - podExistsInVolume, _, err := asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{}, unexpectedSELinuxLabel) + logger, _ := ktesting.NewTestContext(t) + podExistsInVolume, _, err := asw.PodExistsInVolume(logger, podToCheck, volumeToCheck, resource.Quantity{}, unexpectedSELinuxLabel) if podExistsInVolume { t.Errorf("expected Pod %s not to exists, but it does", podToCheck) } diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index afde30bf7f7..f09c4bec21b 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -61,7 +61,7 @@ type DesiredStateOfWorld interface { // added. // If a pod with the same unique name already exists under the specified // volume, this is a no-op. - AddPodToVolume(podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGIDValue string, seLinuxContainerContexts []*v1.SELinuxOptions) (v1.UniqueVolumeName, error) + AddPodToVolume(logger klog.Logger, podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGIDValue string, seLinuxContainerContexts []*v1.SELinuxOptions) (v1.UniqueVolumeName, error) // MarkVolumesReportedInUse sets the ReportedInUse value to true for the // reportedVolumes. For volumes not in the reportedVolumes list, the @@ -258,6 +258,7 @@ const ( ) func (dsw *desiredStateOfWorld) AddPodToVolume( + logger klog.Logger, podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, @@ -274,7 +275,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( volumeSpec.Name(), err) } - volumePluginName := getVolumePluginNameWithDriver(volumePlugin, volumeSpec) + volumePluginName := getVolumePluginNameWithDriver(logger, volumePlugin, volumeSpec) accessMode := getVolumeAccessMode(volumeSpec) var volumeName v1.UniqueVolumeName @@ -301,11 +302,11 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( volumeName = util.GetUniqueVolumeNameFromSpecWithPod(podName, volumePlugin, volumeSpec) } - seLinuxFileLabel, pluginSupportsSELinuxContextMount, err := dsw.getSELinuxLabel(volumeSpec, seLinuxContainerContexts, pod.Spec.SecurityContext) + seLinuxFileLabel, pluginSupportsSELinuxContextMount, err := dsw.getSELinuxLabel(logger, volumeSpec, seLinuxContainerContexts, pod.Spec.SecurityContext) if err != nil { return "", err } - klog.V(4).InfoS("expected volume SELinux label context", "volume", volumeSpec.Name(), "label", seLinuxFileLabel) + logger.V(4).Info("expected volume SELinux label context", "volume", volumeSpec.Name(), "label", seLinuxFileLabel) if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists { var sizeLimit *resource.Quantity @@ -325,7 +326,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( effectiveSELinuxMountLabel := seLinuxFileLabel if !util.VolumeSupportsSELinuxMount(volumeSpec) { // Clear SELinux label for the volume with unsupported access modes. - klog.V(4).InfoS("volume does not support SELinux context mount, clearing the expected label", "volume", volumeSpec.Name()) + logger.V(4).Info("volume does not support SELinux context mount, clearing the expected label", "volume", volumeSpec.Name()) effectiveSELinuxMountLabel = "" } if seLinuxFileLabel != "" { @@ -368,6 +369,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( fullErr := fmt.Errorf("conflicting SELinux labels of volume %s: %q and %q", volumeSpec.Name(), existingVolume.originalSELinuxLabel, seLinuxFileLabel) supported := util.VolumeSupportsSELinuxMount(volumeSpec) err := handleSELinuxMetricError( + logger, fullErr, supported, seLinuxVolumeContextMismatchWarnings.WithLabelValues(volumePluginName, accessMode), @@ -396,7 +398,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( // if the plugin supports mounting the volume with SELinux context. // It returns error if the SELinux label cannot be constructed or when the volume is used with multiple SELinux // labels. -func (dsw *desiredStateOfWorld) getSELinuxLabel(volumeSpec *volume.Spec, seLinuxContainerContexts []*v1.SELinuxOptions, podSecurityContext *v1.PodSecurityContext) (seLinuxFileLabel string, pluginSupportsSELinuxContextMount bool, err error) { +func (dsw *desiredStateOfWorld) getSELinuxLabel(logger klog.Logger, volumeSpec *volume.Spec, seLinuxContainerContexts []*v1.SELinuxOptions, podSecurityContext *v1.PodSecurityContext) (seLinuxFileLabel string, pluginSupportsSELinuxContextMount bool, err error) { labelInfo, err := util.GetMountSELinuxLabel(volumeSpec, seLinuxContainerContexts, podSecurityContext, dsw.volumePluginMgr, dsw.seLinuxTranslator) if err != nil { accessMode := getVolumeAccessMode(volumeSpec) @@ -404,6 +406,7 @@ func (dsw *desiredStateOfWorld) getSELinuxLabel(volumeSpec *volume.Spec, seLinux if util.IsSELinuxLabelTranslationError(err) { err := handleSELinuxMetricError( + logger, err, seLinuxSupported, seLinuxContainerContextWarnings.WithLabelValues(accessMode), @@ -412,6 +415,7 @@ func (dsw *desiredStateOfWorld) getSELinuxLabel(volumeSpec *volume.Spec, seLinux } if util.IsMultipleSELinuxLabelsError(err) { err := handleSELinuxMetricError( + logger, err, seLinuxSupported, seLinuxPodContextMismatchWarnings.WithLabelValues(accessMode), @@ -637,7 +641,7 @@ func (dsw *desiredStateOfWorld) MarkVolumeAttachability(volumeName v1.UniqueVolu } // Based on isRWOP, bump the right warning / error metric and either consume the error or return it. -func handleSELinuxMetricError(err error, seLinuxSupported bool, warningMetric, errorMetric metrics.GaugeMetric) error { +func handleSELinuxMetricError(logger klog.Logger, err error, seLinuxSupported bool, warningMetric, errorMetric metrics.GaugeMetric) error { if seLinuxSupported { errorMetric.Add(1.0) return err @@ -645,12 +649,12 @@ func handleSELinuxMetricError(err error, seLinuxSupported bool, warningMetric, e // This is not an error yet, but it will be when support for other access modes is added. warningMetric.Add(1.0) - klog.V(4).ErrorS(err, "Please report this error in https://github.com/kubernetes/enhancements/issues/1710, together with full Pod yaml file") + logger.V(4).Error(err, "Please report this error in https://github.com/kubernetes/enhancements/issues/1710, together with full Pod yaml file") return nil } // Return the volume plugin name, together with the CSI driver name if it's a CSI volume. -func getVolumePluginNameWithDriver(plugin volume.VolumePlugin, spec *volume.Spec) string { +func getVolumePluginNameWithDriver(logger klog.Logger, plugin volume.VolumePlugin, spec *volume.Spec) string { pluginName := plugin.GetPluginName() if pluginName != csi.CSIPluginName { return pluginName @@ -660,7 +664,7 @@ func getVolumePluginNameWithDriver(plugin volume.VolumePlugin, spec *volume.Spec driverName, err := csi.GetCSIDriverName(spec) if err != nil { // In theory this is unreachable - such volume would not pass validation. - klog.V(4).ErrorS(err, "failed to get CSI driver name from volume spec") + logger.V(4).Error(err, "failed to get CSI driver name from volume spec") driverName = "unknown" } // `/` is used to separate plugin + CSI driver in util.GetUniqueVolumeName() too diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world_test.go index 970d52b8aa6..33b21e7a503 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world_test.go @@ -25,6 +25,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" @@ -37,6 +38,7 @@ import ( // Verifies newly added pod/volume exists via // PodExistsInVolume() VolumeExists() and GetVolumesToMount() func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // Arrange volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -65,7 +67,7 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) { // Act generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) // Assert if err != nil { @@ -83,6 +85,7 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) { // Verifies newly added pod/volume exists via // PodExistsInVolume() VolumeExists() and GetVolumesToMount() and no errors. func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // Arrange volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -111,12 +114,12 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) { // Act generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } generatedVolumeName2, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -136,6 +139,7 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) { // Verities generated names are same for different pods if volume is device mountable or attachable // Verities generated names are different for different pods if volume is not device mountble and attachable func Test_AddPodToVolume_Positive_NamesForDifferentPodsAndDifferentVolumes(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // Arrange fakeVolumeHost := volumetesting.NewFakeVolumeHost(t, "", /* rootDir */ @@ -274,8 +278,8 @@ func Test_AddPodToVolume_Positive_NamesForDifferentPodsAndDifferentVolumes(t *te for name, v := range testcases { volumeSpec1 := &volume.Spec{Volume: &v.pod1.Spec.Volumes[0]} volumeSpec2 := &volume.Spec{Volume: &v.pod2.Spec.Volumes[0]} - generatedVolumeName1, err1 := dsw.AddPodToVolume(util.GetUniquePodName(v.pod1), v.pod1, volumeSpec1, volumeSpec1.Name(), "", nil) - generatedVolumeName2, err2 := dsw.AddPodToVolume(util.GetUniquePodName(v.pod2), v.pod2, volumeSpec2, volumeSpec2.Name(), "", nil) + generatedVolumeName1, err1 := dsw.AddPodToVolume(logger, util.GetUniquePodName(v.pod1), v.pod1, volumeSpec1, volumeSpec1.Name(), "", nil) + generatedVolumeName2, err2 := dsw.AddPodToVolume(logger, util.GetUniquePodName(v.pod2), v.pod2, volumeSpec2, volumeSpec2.Name(), "", nil) if err1 != nil { t.Fatalf("test %q: AddPodToVolume failed. Expected: Actual: <%v>", name, err1) } @@ -299,6 +303,7 @@ func Test_AddPodToVolume_Positive_NamesForDifferentPodsAndDifferentVolumes(t *te // Calls DeletePodFromVolume() to removes the pod // Verifies newly added pod/volume are deleted func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // Arrange volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -325,7 +330,7 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -352,6 +357,7 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) { // Marks only first volume as reported in use. // Verifies only that volume is marked reported in use func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // Arrange volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -424,19 +430,19 @@ func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) { pod3Name := util.GetUniquePodName(pod3) generatedVolume1Name, err := dsw.AddPodToVolume( - pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } generatedVolume2Name, err := dsw.AddPodToVolume( - pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } generatedVolume3Name, err := dsw.AddPodToVolume( - pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) + logger, pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -479,6 +485,7 @@ func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) { } func Test_AddPodToVolume_WithEmptyDirSizeLimit(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() dsw := NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator) @@ -598,14 +605,14 @@ func Test_AddPodToVolume_WithEmptyDirSizeLimit(t *testing.T) { } for i := range pod1.Spec.Volumes { volumeSpec := &volume.Spec{Volume: &pod1.Spec.Volumes[i]} - _, err := dsw.AddPodToVolume(pod1Name, pod1, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */) + _, err := dsw.AddPodToVolume(logger, pod1Name, pod1, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } } for i := range pod2.Spec.Volumes { volumeSpec := &volume.Spec{Volume: &pod2.Spec.Volumes[i]} - _, err := dsw.AddPodToVolume(pod2Name, pod2, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */) + _, err := dsw.AddPodToVolume(logger, pod2Name, pod2, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -616,6 +623,7 @@ func Test_AddPodToVolume_WithEmptyDirSizeLimit(t *testing.T) { // Calls AddPodToVolume() in an empty DSW with various SELinux settings / access modes. func Test_AddPodToVolume_SELinuxSinglePod(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) completeSELinuxOpts := v1.SELinuxOptions{ User: "system_u", Role: "object_r", @@ -872,7 +880,7 @@ func Test_AddPodToVolume_SELinuxSinglePod(t *testing.T) { // Act generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts) // Assert if tc.expectError { @@ -896,6 +904,7 @@ func Test_AddPodToVolume_SELinuxSinglePod(t *testing.T) { // Calls AddPodToVolume() twice to add two pods with various SELinux settings and access modes. func Test_AddPodToVolume_SELinux_MultiplePods(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) completeSELinuxOpts := v1.SELinuxOptions{ User: "system_u", Role: "object_r", @@ -1221,7 +1230,7 @@ func Test_AddPodToVolume_SELinux_MultiplePods(t *testing.T) { // Act generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts) // Assert if err != nil { @@ -1245,7 +1254,7 @@ func Test_AddPodToVolume_SELinux_MultiplePods(t *testing.T) { // Act generatedVolumeName2, err := dsw.AddPodToVolume( - pod2Name, pod2, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts) + logger, pod2Name, pod2, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts) // Assert if tc.expectError { if err == nil { diff --git a/pkg/kubelet/volumemanager/metrics/metrics_test.go b/pkg/kubelet/volumemanager/metrics/metrics_test.go index bd0f3fe55a4..d2df2e3a66e 100644 --- a/pkg/kubelet/volumemanager/metrics/metrics_test.go +++ b/pkg/kubelet/volumemanager/metrics/metrics_test.go @@ -33,6 +33,7 @@ import ( ) func TestMetricCollection(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator) @@ -59,7 +60,7 @@ func TestMetricCollection(t *testing.T) { podName := util.GetUniquePodName(pod) // Add one volume to DesiredStateOfWorld - generatedVolumeName, err := dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxOptions */) + generatedVolumeName, err := dsw.AddPodToVolume(logger, podName, pod, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxOptions */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -76,7 +77,6 @@ func TestMetricCollection(t *testing.T) { // Add one volume to ActualStateOfWorld devicePath := "fake/device/path" - logger, _ := ktesting.NewTestContext(t) err = asw.MarkVolumeAsAttached(logger, "", volumeSpec, "", devicePath) if err != nil { t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index b9e9ef05d94..39793a8a144 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -170,8 +170,9 @@ func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool { } func (dswp *desiredStateOfWorldPopulator) populatorLoop(ctx context.Context) { + logger := klog.FromContext(ctx) dswp.findAndAddNewPods(ctx) - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) } // Iterate through all pods and add to desired state of world if they don't @@ -196,7 +197,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods(ctx context.Context) // Iterate through all pods in desired state of world, and remove if they no // longer exist -func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { +func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods(logger klog.Logger) { podsFromCache := make(map[volumetypes.UniquePodName]struct{}) for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() { podsFromCache[volumetypes.UniquePodName(volumeToMount.Pod.UID)] = struct{}{} @@ -211,7 +212,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { // It is not possible right now for a CSI plugin to be both attachable and non-deviceMountable // So the uniqueVolumeName should remain the same after the attachability change dswp.desiredStateOfWorld.MarkVolumeAttachability(volumeToMount.VolumeName, false) - klog.InfoS("Volume changes from attachable to non-attachable", "volumeName", volumeToMount.VolumeName) + logger.Info("Volume changes from attachable to non-attachable", "volumeName", volumeToMount.VolumeName) continue } } @@ -227,7 +228,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { // pod state provider to verify that all containers in the pod have been // terminated. if !dswp.podStateProvider.ShouldPodRuntimeBeRemoved(volumeToMount.Pod.UID) { - klog.V(4).InfoS("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod)) + logger.V(4).Info("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod)) continue } var volumeToMountSpecName string @@ -236,10 +237,10 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { } removed := dswp.actualStateOfWorld.PodRemovedFromVolume(volumeToMount.PodName, volumeToMount.VolumeName) if removed && podExists { - klog.V(4).InfoS("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) + logger.V(4).Info("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) continue } - klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) + logger.V(4).Info("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) dswp.desiredStateOfWorld.DeletePodFromVolume( volumeToMount.PodName, volumeToMount.VolumeName) dswp.deleteProcessedPod(volumeToMount.PodName) @@ -296,7 +297,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context, } pvc, volumeSpec, volumeGIDValue, err := - dswp.createVolumeSpec(logger, podVolume, pod, mounts, devices) + dswp.createVolumeSpec(ctx, podVolume, pod, mounts, devices) if err != nil { logger.Error(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name) dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) @@ -306,7 +307,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context, // Add volume to desired state of world uniqueVolumeName, err := dswp.desiredStateOfWorld.AddPodToVolume( - uniquePodName, pod, volumeSpec, podVolume.Name, volumeGIDValue, seLinuxContainerContexts[podVolume.Name]) + logger, uniquePodName, pod, volumeSpec, podVolume.Name, volumeGIDValue, seLinuxContainerContexts[podVolume.Name]) if err != nil { logger.Error(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name()) dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) @@ -315,7 +316,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context, } logger.V(4).Info("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name()) - dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniqueVolumeName) + dswp.checkVolumeFSResize(logger, pod, podVolume, pvc, volumeSpec, uniqueVolumeName) } // some of the volume additions may have failed, should not mark this pod as fully processed @@ -323,7 +324,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context, dswp.markPodProcessed(uniquePodName) // New pod has been synced. Re-mount all volumes that need it // (e.g. DownwardAPI) - dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName) + dswp.actualStateOfWorld.MarkRemountRequired(logger, uniquePodName) // Remove any stored errors for the pod, everything went well in this processPodVolumes dswp.desiredStateOfWorld.PopPodErrors(uniquePodName) } else if dswp.podHasBeenSeenOnce(uniquePodName) { @@ -340,6 +341,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context, // It is used for comparison with actual size(coming from pvc.Status.Capacity) and calling // volume expansion on the node if needed. func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( + logger klog.Logger, pod *v1.Pod, podVolume v1.Volume, pvc *v1.PersistentVolumeClaim, @@ -358,16 +360,16 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( // we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly. if volumeSpec.ReadOnly { // This volume is used as read only by this pod, we don't perform resize for read only volumes. - klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name) + logger.V(5).Info("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name) return } pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage().DeepCopy() pvcStatusCap := pvc.Status.Capacity.Storage().DeepCopy() dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap) - klog.V(5).InfoS("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName) + logger.V(5).Info("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName) // in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value - dswp.actualStateOfWorld.InitializeClaimSize(klog.TODO(), uniqueVolumeName, pvcStatusCap) + dswp.actualStateOfWorld.InitializeClaimSize(logger, uniqueVolumeName, pvcStatusCap) } // podPreviouslyProcessed returns true if the volumes for this pod have already @@ -422,7 +424,8 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod( // specified volume. It dereference any PVC to get PV objects, if needed. // Returns an error if unable to obtain the volume at this time. func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( - logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) { + ctx context.Context, podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) { + logger := klog.FromContext(ctx) pvcSource := podVolume.VolumeSource.PersistentVolumeClaim isEphemeral := pvcSource == nil && podVolume.VolumeSource.Ephemeral != nil if isEphemeral { @@ -438,7 +441,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( logger.V(5).Info("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName)) // If podVolume is a PVC, fetch the real PV behind the claim pvc, err := dswp.getPVCExtractPV( - pod.Namespace, pvcSource.ClaimName) + ctx, pod.Namespace, pvcSource.ClaimName) if err != nil { return nil, nil, "", fmt.Errorf( "error processing PVC %s/%s: %v", @@ -455,7 +458,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( logger.V(5).Info("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName) // Fetch actual PV object volumeSpec, volumeGIDValue, err := - dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID) + dswp.getPVSpec(ctx, pvName, pvcSource.ReadOnly, pvcUID) if err != nil { return nil, nil, "", fmt.Errorf( "error processing PVC %s/%s: %v", @@ -518,9 +521,9 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( // it is pointing to and returns it. // An error is returned if the PVC object's phase is not "Bound". func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( - namespace string, claimName string) (*v1.PersistentVolumeClaim, error) { + ctx context.Context, namespace string, claimName string) (*v1.PersistentVolumeClaim, error) { pvc, err := - dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), claimName, metav1.GetOptions{}) + dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, claimName, metav1.GetOptions{}) if err != nil || pvc == nil { return nil, fmt.Errorf("failed to fetch PVC from API server: %v", err) } @@ -557,10 +560,11 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( // and returns a volume.Spec representing it. // An error is returned if the call to fetch the PV object fails. func (dswp *desiredStateOfWorldPopulator) getPVSpec( + ctx context.Context, name string, pvcReadOnly bool, expectedClaimUID types.UID) (*volume.Spec, string, error) { - pv, err := dswp.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), name, metav1.GetOptions{}) + pv, err := dswp.kubeClient.CoreV1().PersistentVolumes().Get(ctx, name, metav1.GetOptions{}) if err != nil || pv == nil { return nil, "", fmt.Errorf( "failed to fetch PV %s from API server: %v", name, err) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index da7708c3417..f3404b0c43f 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -33,6 +33,7 @@ import ( core "k8s.io/client-go/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" csitrans "k8s.io/csi-translation-lib" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" @@ -261,6 +262,7 @@ type mutablePodManager interface { } func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t) podName := util.GetUniquePodName(pod) @@ -272,7 +274,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { podGet.Status.Phase = v1.PodFailed dswp.podManager.(mutablePodManager).RemovePod(pod) - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) if !dswp.pods.processedPods[podName] { t.Fatalf("Pod should not been removed from desired state of world since pod state still thinks it exists") @@ -281,14 +283,14 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} // the pod state is marked as removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) if dswp.pods.processedPods[podName] { t.Fatalf("Failed to remove pods from desired state of world since they no longer exist") } // podWorker may call volume_manager WaitForUnmount() after we processed the pod in findAndRemoveDeletedPods() dswp.ReprocessPod(podName) - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) // findAndRemoveDeletedPods() above must detect orphaned pod and delete it from the map if _, ok := dswp.pods.processedPods[podName]; ok { @@ -321,6 +323,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { } func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t) fakeASW := dswp.actualStateOfWorld podName := util.GetUniquePodName(pod) @@ -332,7 +335,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { } podGet.Status.Phase = v1.PodFailed - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) // Although Pod status is terminated, pod still exists in pod manager and actual state does not has this pod and volume information // desired state populator will fail to delete this pod and volume first volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */) @@ -353,7 +356,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { // reconcile with actual state so that volume is added into the actual state // desired state populator now can successfully delete the pod and volume reconcileASW(fakeASW, dswp.desiredStateOfWorld, t) - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) if !dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */) { t.Fatalf( "VolumeExists(%q) failed. Expected: Actual: <%v>", @@ -366,7 +369,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { // reconcile with actual state so that volume is added into the actual state // desired state populator now can successfully delete the pod and volume reconcileASW(fakeASW, dswp.desiredStateOfWorld, t) - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */) if volumeExists { t.Fatalf( @@ -384,6 +387,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { } func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) dswp, fakePodState, pod, expectedVolumeName, pv := prepareDSWPWithPodPV(t) podName := util.GetUniquePodName(pod) @@ -416,7 +420,7 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) { } // the pod state now lists the pod as removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) if dswp.pods.processedPods[podName] { t.Fatalf("Failed to remove pods from desired state of world since they no longer exist") } @@ -517,6 +521,7 @@ func prepareDSWPWithPodPV(t *testing.T) (*desiredStateOfWorldPopulator, *fakePod } func TestFindAndRemoveNonattachableVolumes(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // create dswp mode := v1.PersistentVolumeFilesystem pv := &v1.PersistentVolume{ @@ -583,7 +588,7 @@ func TestFindAndRemoveNonattachableVolumes(t *testing.T) { verifyVolumeExistsInVolumesToMount( t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW) - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) // After the volume plugin changes to nonattachable, the corresponding volume attachable field should change. volumesToMount := fakesDSW.GetVolumesToMount() for _, volume := range volumesToMount { @@ -621,6 +626,7 @@ func TestEphemeralVolumeOwnerCheck(t *testing.T) { } func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // create dswp mode := v1.PersistentVolumeBlock pv := &v1.PersistentVolume{ @@ -698,7 +704,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} //pod is added to fakePodManager but pod state knows the pod is removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted - dswp.findAndRemoveDeletedPods() + dswp.findAndRemoveDeletedPods(logger) if dswp.pods.processedPods[podName] { t.Fatalf("Failed to remove pods from desired state of world since they no longer exist") } @@ -729,6 +735,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t } func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // create dswp mode := v1.PersistentVolumeFilesystem pv := &v1.PersistentVolume{ @@ -764,11 +771,10 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) { } pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) - logger, _ := ktesting.NewTestContext(t) fakePodManager.AddPod(pod) mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */) _, volumeSpec, _, err := - dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) + dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec == nil || err != nil { @@ -777,6 +783,7 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) { } func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // create dswp pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ @@ -811,11 +818,10 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) { } pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) - logger, _ := ktesting.NewTestContext(t) fakePodManager.AddPod(pod) mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */) _, volumeSpec, _, err := - dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) + dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec == nil || err != nil { @@ -824,6 +830,7 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) { } func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // create dswp mode := v1.PersistentVolumeBlock pv := &v1.PersistentVolume{ @@ -858,11 +865,10 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) { } pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "block-bound", containers) - logger, _ := ktesting.NewTestContext(t) fakePodManager.AddPod(pod) mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */) _, volumeSpec, _, err := - dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) + dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec == nil || err != nil { @@ -871,6 +877,7 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) { } func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // create dswp mode := v1.PersistentVolumeFilesystem pv := &v1.PersistentVolume{ @@ -905,11 +912,10 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) { } pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) - logger, _ := ktesting.NewTestContext(t) fakePodManager.AddPod(pod) mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */) _, volumeSpec, _, err := - dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) + dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec != nil || err == nil { @@ -918,6 +924,7 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) { } func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { + tCtx := ktesting.Init(t) // create dswp mode := v1.PersistentVolumeBlock pv := &v1.PersistentVolume{ @@ -952,11 +959,10 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { } pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "block-bound", containers) - logger, _ := ktesting.NewTestContext(t) fakePodManager.AddPod(pod) mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */) _, volumeSpec, _, err := - dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) + dswp.createVolumeSpec(tCtx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap) // Assert if volumeSpec != nil || err == nil { @@ -1417,15 +1423,16 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te func reprocess(ctx context.Context, dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName, dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { + logger := klog.FromContext(ctx) dswp.ReprocessPod(uniquePodName) dswp.findAndAddNewPods(ctx) - return getResizeRequiredVolumes(dsw, asw, newSize) + return getResizeRequiredVolumes(logger, dsw, asw, newSize) } -func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { +func getResizeRequiredVolumes(logger klog.Logger, dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { resizeRequiredVolumes := []v1.UniqueVolumeName{} for _, volumeToMount := range dsw.GetVolumesToMount() { - _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, newSize, "" /* SELinuxContext */) + _, _, err := asw.PodExistsInVolume(logger, volumeToMount.PodName, volumeToMount.VolumeName, newSize, "" /* SELinuxContext */) if cache.IsFSResizeRequiredError(err) { resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index e675d2b953a..f1ca5fd26e5 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -17,44 +17,48 @@ limitations under the License. package reconciler import ( + "context" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ) -func (rc *reconciler) Run(stopCh <-chan struct{}) { - rc.reconstructVolumes() - klog.InfoS("Reconciler: start to sync state") - wait.Until(rc.reconcile, rc.loopSleepDuration, stopCh) +func (rc *reconciler) Run(ctx context.Context, stopCh <-chan struct{}) { + logger := klog.FromContext(ctx) + rc.reconstructVolumes(logger) + logger.Info("Reconciler: start to sync state") + wait.Until(func() { rc.reconcile(ctx) }, rc.loopSleepDuration, stopCh) } -func (rc *reconciler) reconcile() { +func (rc *reconciler) reconcile(ctx context.Context) { + logger := klog.FromContext(ctx) readyToUnmount := rc.readyToUnmount() if readyToUnmount { // Unmounts are triggered before mounts so that a volume that was // referenced by a pod that was deleted and is now referenced by another // pod is unmounted from the first pod before being mounted to the new // pod. - rc.unmountVolumes() + rc.unmountVolumes(logger) } // Next we mount required volumes. This function could also trigger // attach if kubelet is responsible for attaching volumes. // If underlying PVC was resized while in-use then this function also handles volume // resizing. - rc.mountOrAttachVolumes() + rc.mountOrAttachVolumes(logger) // Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume // that is still needed, but it did not reach DSW yet. if readyToUnmount { // Ensure devices that should be detached/unmounted are detached/unmounted. - rc.unmountDetachDevices() + rc.unmountDetachDevices(logger) // Clean up any orphan volumes that failed reconstruction. - rc.cleanOrphanVolumes() + rc.cleanOrphanVolumes(logger) } if len(rc.volumesNeedUpdateFromNodeStatus) != 0 { - rc.updateReconstructedFromNodeStatus() + rc.updateReconstructedFromNodeStatus(ctx) } if len(rc.volumesNeedUpdateFromNodeStatus) == 0 { // ASW is fully populated only after both devicePaths and uncertain volume attach-ability diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_common.go b/pkg/kubelet/volumemanager/reconciler/reconciler_common.go index 628fffad8a2..fbe0a29a2b2 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_common.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_common.go @@ -17,6 +17,7 @@ limitations under the License. package reconciler import ( + "context" "fmt" "sync" "time" @@ -49,7 +50,7 @@ type Reconciler interface { // If attach/detach management is enabled, the manager will also check if // volumes that should be attached are attached and volumes that should // be detached are detached and trigger attach/detach operations as needed. - Run(stopCh <-chan struct{}) + Run(ctx context.Context, stopCh <-chan struct{}) // StatesHasBeenSynced returns true only after syncStates process starts to sync // states at least once after kubelet starts @@ -144,7 +145,7 @@ type reconciler struct { volumesNeedUpdateFromNodeStatus []v1.UniqueVolumeName } -func (rc *reconciler) unmountVolumes() { +func (rc *reconciler) unmountVolumes(logger klog.Logger) { // Ensure volumes that should be unmounted are unmounted. for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() { if rc.operationExecutor.IsOperationPending(mountedVolume.VolumeName, mountedVolume.PodName, nestedpendingoperations.EmptyNodeName) { @@ -152,26 +153,26 @@ func (rc *reconciler) unmountVolumes() { } if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName, mountedVolume.SELinuxMountContext) { // Volume is mounted, unmount it - klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) + logger.V(5).Info(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) err := rc.operationExecutor.UnmountVolume( mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) + logger.Error(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { - klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", "")) + logger.Info(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", "")) } } } } -func (rc *reconciler) mountOrAttachVolumes() { +func (rc *reconciler) mountOrAttachVolumes(logger klog.Logger) { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { continue } - volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel) + volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(logger, volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel) volumeToMount.DevicePath = devicePath if cache.IsSELinuxMountMismatchError(err) { // The volume is mounted, but with an unexpected SELinux context. @@ -180,75 +181,74 @@ func (rc *reconciler) mountOrAttachVolumes() { rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error()) continue } else if cache.IsVolumeNotAttachedError(err) { - rc.waitForVolumeAttach(volumeToMount) + rc.waitForVolumeAttach(logger, volumeToMount) } else if !volMounted || cache.IsRemountRequiredError(err) { - rc.mountAttachedVolumes(volumeToMount, err) + rc.mountAttachedVolumes(logger, volumeToMount, err) } else if cache.IsFSResizeRequiredError(err) { fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError) - rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize) + rc.expandVolume(logger, volumeToMount, fsResizeRequiredErr.CurrentSize) } } } -func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) { - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) +func (rc *reconciler) expandVolume(logger klog.Logger, volumeToMount cache.VolumeToMount, currentSize resource.Quantity) { + logger.V(4).Info(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + logger.Error(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) } if err == nil { - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(4).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) } } -func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) { +func (rc *reconciler) mountAttachedVolumes(logger klog.Logger, volumeToMount cache.VolumeToMount, podExistError error) { // Volume is not mounted, or is already mounted, but requires remounting remountingLogStr := "" isRemount := cache.IsRemountRequiredError(podExistError) if isRemount { remountingLogStr = "Volume is already mounted to pod, but remount was requested." } - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(4).Info(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) err := rc.operationExecutor.MountVolume( rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + logger.Error(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) } if err == nil { if remountingLogStr == "" { - klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(1).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) } else { - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(5).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) } } } -func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { - logger := klog.TODO() +func (rc *reconciler) waitForVolumeAttach(logger klog.Logger, volumeToMount cache.VolumeToMount) { if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse { - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(5).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) return } // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait // for controller to finish attaching volume. - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(5).Info(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) err := rc.operationExecutor.VerifyControllerAttachedVolume( logger, volumeToMount.VolumeToMount, rc.nodeName, rc.actualStateOfWorld) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + logger.Error(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) } if err == nil { - klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + logger.Info(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) } } else { // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, @@ -259,18 +259,18 @@ func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { NodeName: rc.nodeName, ScheduledPods: []*v1.Pod{volumeToMount.Pod}, } - klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + logger.V(5).Info(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) err := rc.operationExecutor.AttachVolume(logger, volumeToAttach, rc.actualStateOfWorld) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + logger.Error(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) } if err == nil { - klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + logger.Info(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) } } } -func (rc *reconciler) unmountDetachDevices() { +func (rc *reconciler) unmountDetachDevices(logger klog.Logger) { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName, attachedVolume.SELinuxMountContext) && @@ -282,31 +282,31 @@ func (rc *reconciler) unmountDetachDevices() { attachedVolume, _ = rc.actualStateOfWorld.GetAttachedVolume(attachedVolume.VolumeName) if attachedVolume.DeviceMayBeMounted() { // Volume is globally mounted to device, unmount it - klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) + logger.V(5).Info(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) err := rc.operationExecutor.UnmountDevice( attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) + logger.Error(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { - klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", "")) + logger.Info(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", "")) } } else { // Volume is attached to node, detach it // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin. if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName) - klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath))) + logger.Info(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath))) } else { // Only detach if kubelet detach is enabled - klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", "")) + logger.V(5).Info(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", "")) err := rc.operationExecutor.DetachVolume( - klog.TODO(), attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) + logger, attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err != nil && !isExpectedError(err) { - klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) + logger.Error(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { - klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", "")) + logger.Info(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", "")) } } } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 9ba8463dbce..fc898038e48 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -17,6 +17,7 @@ limitations under the License. package reconciler import ( + "context" "crypto/md5" "fmt" "path/filepath" @@ -39,7 +40,6 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" @@ -69,6 +69,7 @@ func hasAddedPods() bool { return true } // Calls Run() // Verifies there are no calls to attach, detach, mount, unmount, etc. func Test_Run_Positive_DoNothing(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // Arrange volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -99,7 +100,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { kubeletPodsDir) // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) // Assert assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin)) @@ -114,6 +115,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { // Calls Run() // Verifies there is are attach/mount/etc calls and no detach/unmount calls. func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Arrange volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -163,7 +165,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -171,7 +173,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( @@ -191,7 +193,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { // Verifies there is are attach/mount/etc calls and no detach/unmount calls. func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) { // Arrange - logger, _ := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) intreeToCSITranslator := csitrans.New() node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -265,6 +267,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) { podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( + logger, podName, pod, migratedSpec, @@ -280,7 +283,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) { dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount( @@ -299,6 +302,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) { // Verifies there is one mount call and no unmount calls. // Verifies there are no attach/detach calls. func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Arrange node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -361,7 +365,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) // Assert @@ -370,7 +374,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert @@ -387,11 +391,12 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { // Populates desiredStateOfWorld cache with one volume/pod. // Enables controllerAttachDetachEnabled. -// volume is not repored-in-use +// volume is not reported-in-use // Calls Run() // Verifies that there is not wait-for-mount call // Verifies that there is no exponential-backoff triggered func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Arrange volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -441,7 +446,7 @@ func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -449,7 +454,7 @@ func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) time.Sleep(reconcilerSyncWaitDuration) ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName) @@ -471,6 +476,7 @@ func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) { // Deletes volume/pod from desired state of world. // Verifies detach/unmount calls are issued. func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Arrange volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() @@ -520,7 +526,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -528,7 +534,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( @@ -561,6 +567,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { // Verifies one unmount call is made. // Verifies there are no attach/detach calls made. func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Arrange node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -623,7 +630,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -631,7 +638,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) waitForMount(t, fakePlugin, generatedVolumeName, asw) @@ -662,6 +669,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { // Verifies there are attach/get map paths/setupDevice calls and // no detach/teardownDevice calls. func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -731,7 +739,7 @@ func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) { } podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -739,7 +747,7 @@ func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( @@ -759,6 +767,7 @@ func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) { // and no teardownDevice call. // Verifies there are no attach/detach calls. func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -844,7 +853,7 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) { podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) // Assert @@ -853,7 +862,7 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert @@ -873,6 +882,7 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) { // Deletes volume/pod from desired state of world. // Verifies one detach/teardownDevice calls are issued. func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -942,7 +952,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) { podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -950,7 +960,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) waitForMount(t, fakePlugin, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyAttachCallCount( @@ -981,6 +991,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) { // Verifies one teardownDevice call is made. // Verifies there are no attach/detach calls made. func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", @@ -1067,7 +1078,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) { podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { @@ -1075,7 +1086,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) { } // Act - runReconciler(reconciler) + runReconciler(ctx, reconciler) dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) waitForMount(t, fakePlugin, generatedVolumeName, asw) @@ -1239,6 +1250,7 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) { // Mark volume as fsResizeRequired in ASW. // Verifies volume's fsResizeRequired flag is cleared later. func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) blockMode := v1.PersistentVolumeBlock fsMode := v1.PersistentVolumeFilesystem @@ -1349,7 +1361,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { volumeSpec := &volume.Spec{PersistentVolume: pv} podName := util.GetUniquePodName(pod) volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -1360,7 +1372,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) go func() { defer close(stoppedChan) - reconciler.Run(stopChan) + reconciler.Run(ctx, stopChan) }() waitForMount(t, fakePlugin, volumeName, asw) // Stop the reconciler. @@ -1370,7 +1382,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { // Simulate what DSOWP does pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize volumeSpec = &volume.Spec{PersistentVolume: pvWithSize} - _, err = dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContexts */) + _, err = dsw.AddPodToVolume(logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContexts */) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -1379,7 +1391,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { newSize := tc.newPVSize.DeepCopy() dsw.UpdatePersistentVolumeSize(volumeName, newSize) - _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxLabel */) + _, _, podExistErr := asw.PodExistsInVolume(logger, podName, volumeName, newSize, "" /* SELinuxLabel */) if tc.expansionFailed { if cache.IsFSResizeRequiredError(podExistErr) { t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr) @@ -1388,10 +1400,10 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { if !cache.IsFSResizeRequiredError(podExistErr) { t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr) } - go reconciler.Run(wait.NeverStop) + go reconciler.Run(ctx, wait.NeverStop) waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxContext */) + mounted, _, err := asw.PodExistsInVolume(logger, podName, volumeName, newSize, "" /* SELinuxContext */) return mounted && err == nil, nil }) if waitErr != nil { @@ -1467,6 +1479,7 @@ func getTestPod(claimName string) *v1.Pod { } func Test_UncertainDeviceGlobalMounts(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) var tests = []struct { name string deviceState operationexecutor.DeviceMountState @@ -1607,7 +1620,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { volumeSpec := &volume.Spec{PersistentVolume: pv} podName := util.GetUniquePodName(pod) volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -1617,7 +1630,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { // Start the reconciler to fill ASW. stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) go func() { - reconciler.Run(stopChan) + reconciler.Run(ctx, stopChan) close(stoppedChan) }() waitForVolumeToExistInASW(t, volumeName, asw) @@ -1630,7 +1643,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName { // wait for mount and then break it via remount waitForMount(t, fakePlugin, volumeName, asw) - asw.MarkRemountRequired(podName) + asw.MarkRemountRequired(logger, podName) time.Sleep(reconcilerSyncWaitDuration) } @@ -1662,6 +1675,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { } func Test_UncertainVolumeMountState(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) var tests = []struct { name string volumeState operationexecutor.VolumeMountState @@ -1830,7 +1844,7 @@ func Test_UncertainVolumeMountState(t *testing.T) { volumeSpec := &volume.Spec{PersistentVolume: pv} podName := util.GetUniquePodName(pod) volumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -1840,7 +1854,7 @@ func Test_UncertainVolumeMountState(t *testing.T) { // Start the reconciler to fill ASW. stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) go func() { - reconciler.Run(stopChan) + reconciler.Run(ctx, stopChan) close(stoppedChan) }() waitForVolumeToExistInASW(t, volumeName, asw) @@ -1856,7 +1870,7 @@ func Test_UncertainVolumeMountState(t *testing.T) { tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName { // wait for mount and then break it via remount waitForMount(t, fakePlugin, volumeName, asw) - asw.MarkRemountRequired(podName) + asw.MarkRemountRequired(logger, podName) time.Sleep(reconcilerSyncWaitDuration) } @@ -1944,11 +1958,12 @@ func waitForGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache. } func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podName types.UniquePodName, asw cache.ActualStateOfWorld) { + logger, _ := ktesting.NewTestContext(t) // check if volume is locally pod mounted in uncertain state err := retryWithExponentialBackOff( testOperationBackOffDuration, func() (bool, error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}, "" /* SELinuxContext */) + mounted, _, err := asw.PodExistsInVolume(logger, podName, volumeName, resource.Quantity{}, "" /* SELinuxContext */) if mounted || err != nil { return false, nil } @@ -2061,8 +2076,8 @@ func createTestClient(attachedVolumes ...v1.AttachedVolume) *fake.Clientset { return fakeClient } -func runReconciler(reconciler Reconciler) { - go reconciler.Run(wait.NeverStop) +func runReconciler(ctx context.Context, reconciler Reconciler) { + go reconciler.Run(ctx, wait.NeverStop) } func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim, attachedVolumes ...v1.AttachedVolume) *fake.Clientset { @@ -2101,6 +2116,7 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume } func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Arrange node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -2169,7 +2185,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { volumeSpecCopy := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) generatedVolumeName, err := dsw.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName}) if err != nil { @@ -2178,7 +2194,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { // Start the reconciler to fill ASW. stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) go func() { - reconciler.Run(stopChan) + reconciler.Run(ctx, stopChan) close(stoppedChan) }() waitForMount(t, fakePlugin, generatedVolumeName, asw) @@ -2193,10 +2209,10 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { fakePlugin.UnmountDeviceHook = func(mountPath string) error { // Act: // 3. While a volume is being unmounted, add it back to the desired state of world - klog.InfoS("UnmountDevice called") + logger.Info("UnmountDevice called") var generatedVolumeNameCopy v1.UniqueVolumeName generatedVolumeNameCopy, err = dsw.AddPodToVolume( - podName, pod, volumeSpecCopy, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) + logger, podName, pod, volumeSpecCopy, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */) dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeNameCopy}) return nil } @@ -2208,7 +2224,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { // Assert // 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath if devicePath == "" { - klog.ErrorS(nil, "Expected WaitForAttach called with devicePath from Node.Status") + logger.Error(nil, "Expected WaitForAttach called with devicePath from Node.Status") return "", fmt.Errorf("Expected devicePath from Node.Status") } return devicePath, nil @@ -2216,7 +2232,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { fakePlugin.Unlock() // Start the reconciler again. - go reconciler.Run(wait.NeverStop) + go reconciler.Run(ctx, wait.NeverStop) // 2. Delete the volume from DSW (and wait for callbacks) dsw.DeletePodFromVolume(podName, generatedVolumeName) @@ -2304,6 +2320,7 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string, kubeCl } func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) // Calls Run() with two reconstructed volumes. // Verifies the devicePaths + volume attachability are reconstructed from node.status. @@ -2382,9 +2399,9 @@ func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) { volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]} volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2") - assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, "")) + assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(logger, volumeName1, volumeSpec1, nodeName, "")) assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", "")) - assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, "")) + assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(logger, volumeName2, volumeSpec2, nodeName, "")) assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", "")) assert.False(t, reconciler.StatesHasBeenSynced()) @@ -2392,7 +2409,7 @@ func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) { reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2) // Act - run reconcile loop just once. // "volumesNeedUpdateFromNodeStatus" is not empty, so no unmount will be triggered. - reconciler.reconcile() + reconciler.reconcile(ctx) // Assert assert.True(t, reconciler.StatesHasBeenSynced()) diff --git a/pkg/kubelet/volumemanager/reconciler/reconstruct.go b/pkg/kubelet/volumemanager/reconciler/reconstruct.go index 53a6b963260..44bbf6cd517 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconstruct.go +++ b/pkg/kubelet/volumemanager/reconciler/reconstruct.go @@ -47,29 +47,29 @@ func (rc *reconciler) readyToUnmount() bool { // directories from the disk. For the volumes that cannot support or fail reconstruction, it will // put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld // is populated. -func (rc *reconciler) reconstructVolumes() { +func (rc *reconciler) reconstructVolumes(logger klog.Logger) { // Get volumes information by reading the pod's directory - podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) + podVolumes, err := getVolumesFromPodDir(logger, rc.kubeletPodsDir) if err != nil { - klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") + logger.Error(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") return } reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo) reconstructedVolumeNames := []v1.UniqueVolumeName{} for _, volume := range podVolumes { if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { - klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + logger.V(4).Info("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) // There is nothing to reconstruct continue } reconstructedVolume, err := rc.reconstructVolume(volume) if err != nil { - klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err) + logger.Info("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err) // We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned. rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume) continue } - klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + logger.V(4).Info("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) gvl := &globalVolumeInfo{ volumeName: reconstructedVolume.volumeName, volumeSpec: reconstructedVolume.volumeSpec, @@ -89,21 +89,21 @@ func (rc *reconciler) reconstructVolumes() { if len(reconstructedVolumes) > 0 { // Add the volumes to ASW - rc.updateStates(reconstructedVolumes) + rc.updateStates(logger, reconstructedVolumes) // Remember to update devicePath from node.status.volumesAttached rc.volumesNeedUpdateFromNodeStatus = reconstructedVolumeNames } - klog.V(2).InfoS("Volume reconstruction finished") + logger.V(2).Info("Volume reconstruction finished") } -func (rc *reconciler) updateStates(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) { +func (rc *reconciler) updateStates(logger klog.Logger, reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) { for _, gvl := range reconstructedVolumes { err := rc.actualStateOfWorld.AddAttachUncertainReconstructedVolume( //TODO: the devicePath might not be correct for some volume plugins: see issue #54108 - gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath) + logger, gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath) if err != nil { - klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) + logger.Error(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) continue } var seLinuxMountContext string @@ -123,31 +123,31 @@ func (rc *reconciler) updateStates(reconstructedVolumes map[v1.UniqueVolumeName] _, err = rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts) if err != nil { - klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + logger.Error(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) continue } seLinuxMountContext = volume.seLinuxMountContext - klog.V(2).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName, "seLinuxMountContext", volume.seLinuxMountContext) + logger.V(2).Info("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName, "seLinuxMountContext", volume.seLinuxMountContext) } // If the volume has device to mount, we mark its device as uncertain. if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil { deviceMountPath, err := getDeviceMountPath(gvl) if err != nil { - klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) + logger.Error(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) continue } err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, seLinuxMountContext) if err != nil { - klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath) + logger.Error(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath) continue } - klog.V(2).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath) + logger.V(2).Info("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath) } } } // cleanOrphanVolumes tries to clean up all volumes that failed reconstruction. -func (rc *reconciler) cleanOrphanVolumes() { +func (rc *reconciler) cleanOrphanVolumes(logger klog.Logger) { if len(rc.volumesFailedReconstruction) == 0 { return } @@ -156,14 +156,14 @@ func (rc *reconciler) cleanOrphanVolumes() { if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { // Some pod needs the volume, don't clean it up and hope that // reconcile() calls SetUp and reconstructs the volume in ASW. - klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + logger.V(4).Info("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) continue } - klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - rc.cleanupMounts(volume) + logger.Info("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + rc.cleanupMounts(logger, volume) } - klog.V(2).InfoS("Orphan volume cleanup finished") + logger.V(2).Info("Orphan volume cleanup finished") // Clean the cache, cleanup is one shot operation. rc.volumesFailedReconstruction = make([]podVolume, 0) } @@ -171,21 +171,22 @@ func (rc *reconciler) cleanOrphanVolumes() { // updateReconstructedFromNodeStatus tries to file devicePaths of reconstructed volumes from // node.Status.VolumesAttached. This can be done only after connection to the API // server is established, i.e. it can't be part of reconstructVolumes(). -func (rc *reconciler) updateReconstructedFromNodeStatus() { - klog.V(4).InfoS("Updating reconstructed devicePaths") +func (rc *reconciler) updateReconstructedFromNodeStatus(ctx context.Context) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Updating reconstructed devicePaths") if rc.kubeClient == nil { // Skip reconstructing devicePath from node objects if kubelet is in standalone mode. // Such kubelet is not expected to mount any attachable volume or Secrets / ConfigMap. - klog.V(2).InfoS("Skipped reconstruction of DevicePaths from node.status in standalone mode") + logger.V(2).Info("Skipped reconstruction of DevicePaths from node.status in standalone mode") rc.volumesNeedUpdateFromNodeStatus = nil return } - node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) + node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(ctx, string(rc.nodeName), metav1.GetOptions{}) if fetchErr != nil { // This may repeat few times per second until kubelet is able to read its own status for the first time. - klog.V(4).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths") + logger.V(4).Error(fetchErr, "Failed to get Node status to reconstruct device paths") return } @@ -197,11 +198,11 @@ func (rc *reconciler) updateReconstructedFromNodeStatus() { } rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath) attachable = true - klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath) + logger.V(4).Info("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath) } rc.actualStateOfWorld.UpdateReconstructedVolumeAttachability(volumeID, attachable) } - klog.V(2).InfoS("DevicePaths of reconstructed volumes updated") + logger.V(2).Info("DevicePaths of reconstructed volumes updated") rc.volumesNeedUpdateFromNodeStatus = nil } diff --git a/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go b/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go index b6815dfd515..d685a3de004 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go +++ b/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go @@ -154,8 +154,8 @@ func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) { gvi.podVolumes[rcv.podName] = rcv } -func (rc *reconciler) cleanupMounts(volume podVolume) { - klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) +func (rc *reconciler) cleanupMounts(logger klog.Logger, volume podVolume) { + logger.V(2).Info("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) mountedVolume := operationexecutor.MountedVolume{ PodName: volume.podName, // VolumeName should be generated by `GetUniqueVolumeNameFromSpec` or `GetUniqueVolumeNameFromSpecWithPod`. @@ -171,7 +171,7 @@ func (rc *reconciler) cleanupMounts(volume podVolume) { err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) if err != nil { metrics.ForceCleanedFailedVolumeOperationsErrorsTotal.Inc() - klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error()) + logger.Error(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error()) return } } @@ -194,7 +194,7 @@ func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) { // getVolumesFromPodDir scans through the volumes directories under the given pod directory. // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, // and volume spec name. -func getVolumesFromPodDir(podDir string) ([]podVolume, error) { +func getVolumesFromPodDir(logger klog.Logger, podDir string) ([]podVolume, error) { podsDirInfo, err := os.ReadDir(podDir) if err != nil { return nil, err @@ -227,13 +227,13 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) { volumePluginPath := filepath.Join(volumesDir, pluginName) volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath) if err != nil { - klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath) + logger.Error(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath) continue } unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName) for _, volumeName := range volumePluginDirs { volumePath := filepath.Join(volumePluginPath, volumeName) - klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath) + logger.V(5).Info("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath) volumes = append(volumes, podVolume{ podName: volumetypes.UniquePodName(podName), volumeSpecName: volumeName, @@ -246,7 +246,7 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) { } } for _, volume := range volumes { - klog.V(4).InfoS("Get volume from pod directory", "path", podDir, "volume", volume) + logger.V(4).Info("Get volume from pod directory", "path", podDir, "volume", volume) } return volumes, nil } diff --git a/pkg/kubelet/volumemanager/reconciler/reconstruct_test.go b/pkg/kubelet/volumemanager/reconciler/reconstruct_test.go index eb1d1b28aa4..34e73328607 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconstruct_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconstruct_test.go @@ -33,6 +33,7 @@ import ( ) func TestReconstructVolumes(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) tests := []struct { name string volumePaths []string @@ -102,7 +103,7 @@ func TestReconstructVolumes(t *testing.T) { rcInstance, _ := rc.(*reconciler) // Act - rcInstance.reconstructVolumes() + rcInstance.reconstructVolumes(logger) // Assert // Convert to []UniqueVolumeName @@ -195,7 +196,7 @@ func TestCleanOrphanVolumes(t *testing.T) { volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} podName := util.GetUniquePodName(pod) volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */) if err != nil { t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err) } @@ -203,7 +204,7 @@ func TestCleanOrphanVolumes(t *testing.T) { } // Act - rcInstance.cleanOrphanVolumes() + rcInstance.cleanOrphanVolumes(logger) // Assert if len(rcInstance.volumesFailedReconstruction) != 0 { @@ -245,6 +246,7 @@ func TestReconstructVolumesMount(t *testing.T) { // Since the volume is reconstructed, it must be marked as uncertain // even after a final SetUp error, see https://github.com/kubernetes/kubernetes/issues/96635 // and https://github.com/kubernetes/kubernetes/pull/110670. + logger, ctx := ktesting.NewTestContext(t) tests := []struct { name string @@ -304,7 +306,7 @@ func TestReconstructVolumesMount(t *testing.T) { rcInstance, _ := rc.(*reconciler) // Act 1 - reconstruction - rcInstance.reconstructVolumes() + rcInstance.reconstructVolumes(logger) // Assert 1 - the volume is Uncertain mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() @@ -318,7 +320,7 @@ func TestReconstructVolumesMount(t *testing.T) { podName := util.GetUniquePodName(pod) volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume( - podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */) + logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */) if err != nil { t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err) } @@ -333,7 +335,7 @@ func TestReconstructVolumesMount(t *testing.T) { rcInstance.volumesNeedUpdateFromNodeStatus = nil // Act 2 - reconcile once - rcInstance.reconcile() + rcInstance.reconcile(ctx) // Assert 2 // MountDevice was attempted diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index cd956f44d70..e38ab1ede6a 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -282,6 +282,7 @@ type volumeManager struct { } func (vm *volumeManager) Run(ctx context.Context, sourcesReady config.SourcesReady) { + logger := klog.FromContext(ctx) defer runtime.HandleCrash() if vm.kubeClient != nil { @@ -290,15 +291,15 @@ func (vm *volumeManager) Run(ctx context.Context, sourcesReady config.SourcesRea } go vm.desiredStateOfWorldPopulator.Run(ctx, sourcesReady) - klog.V(2).InfoS("The desired_state_of_world populator starts") + logger.V(2).Info("The desired_state_of_world populator starts") - klog.InfoS("Starting Kubelet Volume Manager") - go vm.reconciler.Run(ctx.Done()) + logger.Info("Starting Kubelet Volume Manager") + go vm.reconciler.Run(ctx, ctx.Done()) metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr) <-ctx.Done() - klog.InfoS("Shutting down Kubelet Volume Manager") + logger.Info("Shutting down Kubelet Volume Manager") } func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap { @@ -389,6 +390,7 @@ func (vm *volumeManager) MarkVolumesAsReportedInUse( } func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error { + logger := klog.FromContext(ctx) if pod == nil { return nil } @@ -399,7 +401,7 @@ func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) return nil } - klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod)) + logger.V(3).Info("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod)) uniquePodName := util.GetUniquePodName(pod) // Some pods expect to have Setup called over and over again to update. @@ -435,16 +437,17 @@ func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) err) } - klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod)) + logger.V(3).Info("All volumes are attached and mounted for pod", "pod", klog.KObj(pod)) return nil } func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error { + logger := klog.FromContext(ctx) if pod == nil { return nil } - klog.V(3).InfoS("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod)) + logger.V(3).Info("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod)) uniquePodName := util.GetUniquePodName(pod) vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName) @@ -472,7 +475,7 @@ func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error err) } - klog.V(3).InfoS("All volumes are unmounted for pod", "pod", klog.KObj(pod)) + logger.V(3).Info("All volumes are unmounted for pod", "pod", klog.KObj(pod)) return nil } diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 6e278e2983a..797c22ce50d 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -51,6 +51,7 @@ const ( ) func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) tests := []struct { name string pvMode, podMode v1.PersistentVolumeMode @@ -111,7 +112,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { tCtx.Done(), manager) - err = manager.WaitForAttachAndMount(context.Background(), pod) + err = manager.WaitForAttachAndMount(ctx, pod) if err != nil && !test.expectError { t.Errorf("Expected success: %v", err) } @@ -144,6 +145,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { } func TestWaitForAttachAndMountError(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") if err != nil { t.Fatalf("can't make a temp dir: %v", err) @@ -231,7 +233,7 @@ func TestWaitForAttachAndMountError(t *testing.T) { podManager.SetPods([]*v1.Pod{pod}) - err = manager.WaitForAttachAndMount(context.Background(), pod) + err = manager.WaitForAttachAndMount(ctx, pod) if err == nil { t.Errorf("Expected error, got none") } @@ -242,6 +244,7 @@ func TestWaitForAttachAndMountError(t *testing.T) { } func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { + tCtx := ktesting.Init(t) tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") if err != nil { t.Fatalf("can't make a temp dir: %v", err) @@ -258,7 +261,6 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node) - tCtx := ktesting.Init(t) defer tCtx.Cancel("test has completed") sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true }) go manager.Run(tCtx, sourcesReady) @@ -272,10 +274,10 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { manager) // delayed claim binding - go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name) + go delayClaimBecomesBound(t, kubeClient, claim.GetNamespace(), claim.Name) err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { - err = manager.WaitForAttachAndMount(context.Background(), pod) + err = manager.WaitForAttachAndMount(tCtx, pod) if err != nil { // Few "PVC not bound" errors are expected return false, nil @@ -289,6 +291,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { } func TestGetExtraSupplementalGroupsForPod(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") if err != nil { t.Fatalf("can't make a temp dir: %v", err) @@ -361,7 +364,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { tCtx.Done(), manager) - err = manager.WaitForAttachAndMount(context.Background(), pod) + err = manager.WaitForAttachAndMount(ctx, pod) if err != nil { t.Errorf("Expected success: %v", err) continue @@ -537,19 +540,28 @@ func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan str } func delayClaimBecomesBound( + t *testing.T, kubeClient clientset.Interface, namespace, claimName string, ) { + tCtx := ktesting.Init(t) time.Sleep(500 * time.Millisecond) - volumeClaim, _ := - kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), claimName, metav1.GetOptions{}) + volumeClaim, err := + kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(tCtx, claimName, metav1.GetOptions{}) + if err != nil { + t.Errorf("Failed to get PVC: %v", err) + } volumeClaim.Status = v1.PersistentVolumeClaimStatus{ Phase: v1.ClaimBound, } - kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{}) + _, err = kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(tCtx, volumeClaim, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("Failed to update PVC: %v", err) + } } func TestWaitForAllPodsUnmount(t *testing.T) { + tCtx := ktesting.Init(t) tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") require.NoError(t, err, "Failed to create temp directory") defer func() { @@ -584,7 +596,7 @@ func TestWaitForAllPodsUnmount(t *testing.T) { manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(tCtx, 1*time.Second) defer cancel() sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true }) go manager.Run(ctx, sourcesReady) @@ -596,7 +608,7 @@ func TestWaitForAllPodsUnmount(t *testing.T) { ctx.Done(), manager) - err := manager.WaitForAttachAndMount(context.Background(), pod) + err := manager.WaitForAttachAndMount(ctx, pod) require.NoError(t, err, "Failed to wait for attach and mount") err = manager.WaitForAllPodsUnmount(ctx, []*v1.Pod{pod})