Migrate pkg/kubelet/volumemanager to contextual logging

Remove context.TODO and context.Background

Fix linter error in volume_manager_test

Fix QF1008: Could remove embedded field "ObjectMeta" from selector

Remove the extra code change

Remove the extra change

Update the NewTestContext
This commit is contained in:
Chulong Li
2025-04-13 19:07:58 +08:00
parent ce9ba81802
commit bf98e45afb
18 changed files with 311 additions and 240 deletions

View File

@@ -218,6 +218,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -232,6 +232,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -64,6 +64,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/kuberuntime/.*
contextual k8s.io/kubernetes/pkg/kubelet/nodeshutdown/.*
contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View File

@@ -68,7 +68,7 @@ type ActualStateOfWorld interface {
// volume indicates it requires remounting on pod updates). Atomically
// updating volumes depend on this to update the contents of the volume on
// pod update.
MarkRemountRequired(podName volumetypes.UniquePodName)
MarkRemountRequired(logger klog.Logger, podName volumetypes.UniquePodName)
// SetDeviceMountState sets device mount state for the given volume. When deviceMountState is set to DeviceGloballyMounted
// then device is mounted at a global mount point. When it is set to DeviceMountUncertain then also it means volume
@@ -110,7 +110,7 @@ type ActualStateOfWorld interface {
// volumes, depend on this to update the contents of the volume.
// All volume mounting calls should be idempotent so a second mount call for
// volumes that do not need to update contents should not fail.
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error)
PodExistsInVolume(logger klog.Logger, podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error)
// PodRemovedFromVolume returns true if the given pod does not exist in the list of
// mountedPods for the given volume in the cache, indicating that the pod has
@@ -182,7 +182,7 @@ type ActualStateOfWorld interface {
GetAttachedVolume(volumeName v1.UniqueVolumeName) (AttachedVolume, bool)
// Add the specified volume to ASW as uncertainly attached.
AddAttachUncertainReconstructedVolume(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
AddAttachUncertainReconstructedVolume(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
// UpdateReconstructedDevicePath updates devicePath of a reconstructed volume
// from Node.Status.VolumesAttached. The ASW is updated only when the volume is still
@@ -393,13 +393,14 @@ func (asw *actualStateOfWorld) MarkVolumeAsAttached(
pluginIsAttachable = volumeAttachabilityTrue
}
return asw.addVolume(volumeName, volumeSpec, devicePath, pluginIsAttachable)
return asw.addVolume(logger, volumeName, volumeSpec, devicePath, pluginIsAttachable)
}
func (asw *actualStateOfWorld) AddAttachUncertainReconstructedVolume(
logger klog.Logger,
volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, _ types.NodeName, devicePath string) error {
return asw.addVolume(volumeName, volumeSpec, devicePath, volumeAttachabilityUncertain)
return asw.addVolume(logger, volumeName, volumeSpec, devicePath, volumeAttachabilityUncertain)
}
func (asw *actualStateOfWorld) MarkVolumeAsUncertain(
@@ -679,7 +680,7 @@ func (asw *actualStateOfWorld) IsVolumeMountedElsewhere(volumeName v1.UniqueVolu
// volume with the same generated name already exists, this is a noop. If no
// volume plugin can support the given volumeSpec or more than one plugin can
// support it, an error is returned.
func (asw *actualStateOfWorld) addVolume(
func (asw *actualStateOfWorld) addVolume(logger klog.Logger,
volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string, attachability volumeAttachability) error {
asw.Lock()
defer asw.Unlock()
@@ -717,7 +718,7 @@ func (asw *actualStateOfWorld) addVolume(
} else {
// If volume object already exists, update the fields such as device path
volumeObj.devicePath = devicePath
klog.V(2).InfoS("Volume is already added to attachedVolume list, update device path", "volumeName", volumeName, "path", devicePath)
logger.V(2).Info("Volume is already added to attachedVolume list, update device path", "volumeName", volumeName, "path", devicePath)
}
asw.attachedVolumes[volumeName] = volumeObj
@@ -807,6 +808,7 @@ func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeNam
}
func (asw *actualStateOfWorld) MarkRemountRequired(
logger klog.Logger,
podName volumetypes.UniquePodName) {
asw.Lock()
defer asw.Unlock()
@@ -816,7 +818,7 @@ func (asw *actualStateOfWorld) MarkRemountRequired(
asw.volumePluginMgr.FindPluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.ErrorS(nil, "MarkRemountRequired failed to FindPluginBySpec for volume", "uniquePodName", podObj.podName, "podUID", podObj.podUID, "volumeName", volumeName, "volumeSpecName", podObj.volumeSpec.Name())
logger.Error(nil, "MarkRemountRequired failed to FindPluginBySpec for volume", "uniquePodName", podObj.podName, "podUID", podObj.podUID, "volumeName", volumeName, "volumeSpecName", podObj.volumeSpec.Name())
continue
}
@@ -926,7 +928,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro
return nil
}
func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) {
func (asw *actualStateOfWorld) PodExistsInVolume(logger klog.Logger, podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity, seLinuxLabel string) (bool, string, error) {
asw.RLock()
defer asw.RUnlock()
@@ -952,7 +954,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodNa
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
if currentSize, expandVolume := asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize); expandVolume {
if currentSize, expandVolume := asw.volumeNeedsExpansion(logger, volumeObj, desiredVolumeSize); expandVolume {
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName, currentSize)
}
}
@@ -974,7 +976,7 @@ func (asw *actualStateOfWorld) PodHasMountedVolumes(podName volumetypes.UniquePo
return false
}
func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) {
func (asw *actualStateOfWorld) volumeNeedsExpansion(logger klog.Logger, volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) {
currentSize := volumeObj.persistentVolumeSize.DeepCopy()
if volumeObj.volumeInUseErrorForExpansion {
return currentSize, false
@@ -983,13 +985,13 @@ func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, de
return currentSize, false
}
klog.V(5).InfoS("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName)
logger.V(5).Info("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName)
if desiredVolumeSize.Cmp(volumeObj.persistentVolumeSize) > 0 {
volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.InfoS("PodExistsInVolume failed to find expandable plugin",
logger.Info("PodExistsInVolume failed to find expandable plugin",
"volume", volumeObj.volumeName,
"volumeSpecName", volumeObj.spec.Name())
return currentSize, false

View File

@@ -470,6 +470,7 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) {
// Test if volumes that were recorded to be read from disk during reconstruction
// are handled correctly by the ASOW.
func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
tests := []struct {
name string
opCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error
@@ -585,7 +586,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec1)
require.NoError(t, err)
err = asw.AddAttachUncertainReconstructedVolume(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
err = asw.AddAttachUncertainReconstructedVolume(logger, generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
@@ -1077,7 +1078,7 @@ func TestUncertainVolumeMounts(t *testing.T) {
t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name())
}
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, resource.Quantity{}, "")
volExists, _, _ := asw.PodExistsInVolume(logger, podName1, generatedVolumeName1, resource.Quantity{}, "")
if volExists {
t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1)
}
@@ -1215,8 +1216,9 @@ func verifyPodExistsInVolumeAswWithSELinux(
expectedDevicePath string,
expectedSELinuxLabel string,
asw ActualStateOfWorld) {
logger, _ := ktesting.NewTestContext(t)
podExistsInVolume, devicePath, err :=
asw.PodExistsInVolume(expectedPodName, expectedVolumeName, resource.Quantity{}, expectedSELinuxLabel)
asw.PodExistsInVolume(logger, expectedPodName, expectedVolumeName, resource.Quantity{}, expectedSELinuxLabel)
if err != nil {
t.Fatalf(
"ASW PodExistsInVolume failed. Expected: <no error> Actual: <%v>", err)
@@ -1257,8 +1259,9 @@ func verifyPodDoesntExistInVolumeAsw(
volumeToCheck v1.UniqueVolumeName,
expectVolumeToExist bool,
asw ActualStateOfWorld) {
logger, _ := ktesting.NewTestContext(t)
podExistsInVolume, devicePath, err :=
asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{}, "")
asw.PodExistsInVolume(logger, podToCheck, volumeToCheck, resource.Quantity{}, "")
if !expectVolumeToExist && err == nil {
t.Fatalf(
"ASW PodExistsInVolume did not return error. Expected: <error indicating volume does not exist> Actual: <%v>", err)
@@ -1288,8 +1291,8 @@ func verifyPodExistsInVolumeSELinuxMismatch(
volumeToCheck v1.UniqueVolumeName,
unexpectedSELinuxLabel string,
asw ActualStateOfWorld) {
podExistsInVolume, _, err := asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{}, unexpectedSELinuxLabel)
logger, _ := ktesting.NewTestContext(t)
podExistsInVolume, _, err := asw.PodExistsInVolume(logger, podToCheck, volumeToCheck, resource.Quantity{}, unexpectedSELinuxLabel)
if podExistsInVolume {
t.Errorf("expected Pod %s not to exists, but it does", podToCheck)
}

View File

@@ -61,7 +61,7 @@ type DesiredStateOfWorld interface {
// added.
// If a pod with the same unique name already exists under the specified
// volume, this is a no-op.
AddPodToVolume(podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGIDValue string, seLinuxContainerContexts []*v1.SELinuxOptions) (v1.UniqueVolumeName, error)
AddPodToVolume(logger klog.Logger, podName types.UniquePodName, pod *v1.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGIDValue string, seLinuxContainerContexts []*v1.SELinuxOptions) (v1.UniqueVolumeName, error)
// MarkVolumesReportedInUse sets the ReportedInUse value to true for the
// reportedVolumes. For volumes not in the reportedVolumes list, the
@@ -258,6 +258,7 @@ const (
)
func (dsw *desiredStateOfWorld) AddPodToVolume(
logger klog.Logger,
podName types.UniquePodName,
pod *v1.Pod,
volumeSpec *volume.Spec,
@@ -274,7 +275,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
volumeSpec.Name(),
err)
}
volumePluginName := getVolumePluginNameWithDriver(volumePlugin, volumeSpec)
volumePluginName := getVolumePluginNameWithDriver(logger, volumePlugin, volumeSpec)
accessMode := getVolumeAccessMode(volumeSpec)
var volumeName v1.UniqueVolumeName
@@ -301,11 +302,11 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
volumeName = util.GetUniqueVolumeNameFromSpecWithPod(podName, volumePlugin, volumeSpec)
}
seLinuxFileLabel, pluginSupportsSELinuxContextMount, err := dsw.getSELinuxLabel(volumeSpec, seLinuxContainerContexts, pod.Spec.SecurityContext)
seLinuxFileLabel, pluginSupportsSELinuxContextMount, err := dsw.getSELinuxLabel(logger, volumeSpec, seLinuxContainerContexts, pod.Spec.SecurityContext)
if err != nil {
return "", err
}
klog.V(4).InfoS("expected volume SELinux label context", "volume", volumeSpec.Name(), "label", seLinuxFileLabel)
logger.V(4).Info("expected volume SELinux label context", "volume", volumeSpec.Name(), "label", seLinuxFileLabel)
if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {
var sizeLimit *resource.Quantity
@@ -325,7 +326,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
effectiveSELinuxMountLabel := seLinuxFileLabel
if !util.VolumeSupportsSELinuxMount(volumeSpec) {
// Clear SELinux label for the volume with unsupported access modes.
klog.V(4).InfoS("volume does not support SELinux context mount, clearing the expected label", "volume", volumeSpec.Name())
logger.V(4).Info("volume does not support SELinux context mount, clearing the expected label", "volume", volumeSpec.Name())
effectiveSELinuxMountLabel = ""
}
if seLinuxFileLabel != "" {
@@ -368,6 +369,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
fullErr := fmt.Errorf("conflicting SELinux labels of volume %s: %q and %q", volumeSpec.Name(), existingVolume.originalSELinuxLabel, seLinuxFileLabel)
supported := util.VolumeSupportsSELinuxMount(volumeSpec)
err := handleSELinuxMetricError(
logger,
fullErr,
supported,
seLinuxVolumeContextMismatchWarnings.WithLabelValues(volumePluginName, accessMode),
@@ -396,7 +398,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
// if the plugin supports mounting the volume with SELinux context.
// It returns error if the SELinux label cannot be constructed or when the volume is used with multiple SELinux
// labels.
func (dsw *desiredStateOfWorld) getSELinuxLabel(volumeSpec *volume.Spec, seLinuxContainerContexts []*v1.SELinuxOptions, podSecurityContext *v1.PodSecurityContext) (seLinuxFileLabel string, pluginSupportsSELinuxContextMount bool, err error) {
func (dsw *desiredStateOfWorld) getSELinuxLabel(logger klog.Logger, volumeSpec *volume.Spec, seLinuxContainerContexts []*v1.SELinuxOptions, podSecurityContext *v1.PodSecurityContext) (seLinuxFileLabel string, pluginSupportsSELinuxContextMount bool, err error) {
labelInfo, err := util.GetMountSELinuxLabel(volumeSpec, seLinuxContainerContexts, podSecurityContext, dsw.volumePluginMgr, dsw.seLinuxTranslator)
if err != nil {
accessMode := getVolumeAccessMode(volumeSpec)
@@ -404,6 +406,7 @@ func (dsw *desiredStateOfWorld) getSELinuxLabel(volumeSpec *volume.Spec, seLinux
if util.IsSELinuxLabelTranslationError(err) {
err := handleSELinuxMetricError(
logger,
err,
seLinuxSupported,
seLinuxContainerContextWarnings.WithLabelValues(accessMode),
@@ -412,6 +415,7 @@ func (dsw *desiredStateOfWorld) getSELinuxLabel(volumeSpec *volume.Spec, seLinux
}
if util.IsMultipleSELinuxLabelsError(err) {
err := handleSELinuxMetricError(
logger,
err,
seLinuxSupported,
seLinuxPodContextMismatchWarnings.WithLabelValues(accessMode),
@@ -637,7 +641,7 @@ func (dsw *desiredStateOfWorld) MarkVolumeAttachability(volumeName v1.UniqueVolu
}
// Based on isRWOP, bump the right warning / error metric and either consume the error or return it.
func handleSELinuxMetricError(err error, seLinuxSupported bool, warningMetric, errorMetric metrics.GaugeMetric) error {
func handleSELinuxMetricError(logger klog.Logger, err error, seLinuxSupported bool, warningMetric, errorMetric metrics.GaugeMetric) error {
if seLinuxSupported {
errorMetric.Add(1.0)
return err
@@ -645,12 +649,12 @@ func handleSELinuxMetricError(err error, seLinuxSupported bool, warningMetric, e
// This is not an error yet, but it will be when support for other access modes is added.
warningMetric.Add(1.0)
klog.V(4).ErrorS(err, "Please report this error in https://github.com/kubernetes/enhancements/issues/1710, together with full Pod yaml file")
logger.V(4).Error(err, "Please report this error in https://github.com/kubernetes/enhancements/issues/1710, together with full Pod yaml file")
return nil
}
// Return the volume plugin name, together with the CSI driver name if it's a CSI volume.
func getVolumePluginNameWithDriver(plugin volume.VolumePlugin, spec *volume.Spec) string {
func getVolumePluginNameWithDriver(logger klog.Logger, plugin volume.VolumePlugin, spec *volume.Spec) string {
pluginName := plugin.GetPluginName()
if pluginName != csi.CSIPluginName {
return pluginName
@@ -660,7 +664,7 @@ func getVolumePluginNameWithDriver(plugin volume.VolumePlugin, spec *volume.Spec
driverName, err := csi.GetCSIDriverName(spec)
if err != nil {
// In theory this is unreachable - such volume would not pass validation.
klog.V(4).ErrorS(err, "failed to get CSI driver name from volume spec")
logger.V(4).Error(err, "failed to get CSI driver name from volume spec")
driverName = "unknown"
}
// `/` is used to separate plugin + CSI driver in util.GetUniqueVolumeName() too

View File

@@ -25,6 +25,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
@@ -37,6 +38,7 @@ import (
// Verifies newly added pod/volume exists via
// PodExistsInVolume() VolumeExists() and GetVolumesToMount()
func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -65,7 +67,7 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
// Act
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
// Assert
if err != nil {
@@ -83,6 +85,7 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
// Verifies newly added pod/volume exists via
// PodExistsInVolume() VolumeExists() and GetVolumesToMount() and no errors.
func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -111,12 +114,12 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
// Act
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolumeName2, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@@ -136,6 +139,7 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
// Verities generated names are same for different pods if volume is device mountable or attachable
// Verities generated names are different for different pods if volume is not device mountble and attachable
func Test_AddPodToVolume_Positive_NamesForDifferentPodsAndDifferentVolumes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// Arrange
fakeVolumeHost := volumetesting.NewFakeVolumeHost(t,
"", /* rootDir */
@@ -274,8 +278,8 @@ func Test_AddPodToVolume_Positive_NamesForDifferentPodsAndDifferentVolumes(t *te
for name, v := range testcases {
volumeSpec1 := &volume.Spec{Volume: &v.pod1.Spec.Volumes[0]}
volumeSpec2 := &volume.Spec{Volume: &v.pod2.Spec.Volumes[0]}
generatedVolumeName1, err1 := dsw.AddPodToVolume(util.GetUniquePodName(v.pod1), v.pod1, volumeSpec1, volumeSpec1.Name(), "", nil)
generatedVolumeName2, err2 := dsw.AddPodToVolume(util.GetUniquePodName(v.pod2), v.pod2, volumeSpec2, volumeSpec2.Name(), "", nil)
generatedVolumeName1, err1 := dsw.AddPodToVolume(logger, util.GetUniquePodName(v.pod1), v.pod1, volumeSpec1, volumeSpec1.Name(), "", nil)
generatedVolumeName2, err2 := dsw.AddPodToVolume(logger, util.GetUniquePodName(v.pod2), v.pod2, volumeSpec2, volumeSpec2.Name(), "", nil)
if err1 != nil {
t.Fatalf("test %q: AddPodToVolume failed. Expected: <no error> Actual: <%v>", name, err1)
}
@@ -299,6 +303,7 @@ func Test_AddPodToVolume_Positive_NamesForDifferentPodsAndDifferentVolumes(t *te
// Calls DeletePodFromVolume() to removes the pod
// Verifies newly added pod/volume are deleted
func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -325,7 +330,7 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@@ -352,6 +357,7 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
// Marks only first volume as reported in use.
// Verifies only that volume is marked reported in use
func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -424,19 +430,19 @@ func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) {
pod3Name := util.GetUniquePodName(pod3)
generatedVolume1Name, err := dsw.AddPodToVolume(
pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolume2Name, err := dsw.AddPodToVolume(
pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolume3Name, err := dsw.AddPodToVolume(
pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
logger, pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@@ -479,6 +485,7 @@ func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) {
}
func Test_AddPodToVolume_WithEmptyDirSizeLimit(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
volumePluginMgr, _ := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
dsw := NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
@@ -598,14 +605,14 @@ func Test_AddPodToVolume_WithEmptyDirSizeLimit(t *testing.T) {
}
for i := range pod1.Spec.Volumes {
volumeSpec := &volume.Spec{Volume: &pod1.Spec.Volumes[i]}
_, err := dsw.AddPodToVolume(pod1Name, pod1, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */)
_, err := dsw.AddPodToVolume(logger, pod1Name, pod1, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
}
for i := range pod2.Spec.Volumes {
volumeSpec := &volume.Spec{Volume: &pod2.Spec.Volumes[i]}
_, err := dsw.AddPodToVolume(pod2Name, pod2, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */)
_, err := dsw.AddPodToVolume(logger, pod2Name, pod2, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxContainerContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@@ -616,6 +623,7 @@ func Test_AddPodToVolume_WithEmptyDirSizeLimit(t *testing.T) {
// Calls AddPodToVolume() in an empty DSW with various SELinux settings / access modes.
func Test_AddPodToVolume_SELinuxSinglePod(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
completeSELinuxOpts := v1.SELinuxOptions{
User: "system_u",
Role: "object_r",
@@ -872,7 +880,7 @@ func Test_AddPodToVolume_SELinuxSinglePod(t *testing.T) {
// Act
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts)
// Assert
if tc.expectError {
@@ -896,6 +904,7 @@ func Test_AddPodToVolume_SELinuxSinglePod(t *testing.T) {
// Calls AddPodToVolume() twice to add two pods with various SELinux settings and access modes.
func Test_AddPodToVolume_SELinux_MultiplePods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
completeSELinuxOpts := v1.SELinuxOptions{
User: "system_u",
Role: "object_r",
@@ -1221,7 +1230,7 @@ func Test_AddPodToVolume_SELinux_MultiplePods(t *testing.T) {
// Act
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts)
// Assert
if err != nil {
@@ -1245,7 +1254,7 @@ func Test_AddPodToVolume_SELinux_MultiplePods(t *testing.T) {
// Act
generatedVolumeName2, err := dsw.AddPodToVolume(
pod2Name, pod2, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts)
logger, pod2Name, pod2, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, seLinuxContainerContexts)
// Assert
if tc.expectError {
if err == nil {

View File

@@ -33,6 +33,7 @@ import (
)
func TestMetricCollection(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
@@ -59,7 +60,7 @@ func TestMetricCollection(t *testing.T) {
podName := util.GetUniquePodName(pod)
// Add one volume to DesiredStateOfWorld
generatedVolumeName, err := dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxOptions */)
generatedVolumeName, err := dsw.AddPodToVolume(logger, podName, pod, volumeSpec, volumeSpec.Name(), "", nil /* seLinuxOptions */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@@ -76,7 +77,6 @@ func TestMetricCollection(t *testing.T) {
// Add one volume to ActualStateOfWorld
devicePath := "fake/device/path"
logger, _ := ktesting.NewTestContext(t)
err = asw.MarkVolumeAsAttached(logger, "", volumeSpec, "", devicePath)
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)

View File

@@ -170,8 +170,9 @@ func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool {
}
func (dswp *desiredStateOfWorldPopulator) populatorLoop(ctx context.Context) {
logger := klog.FromContext(ctx)
dswp.findAndAddNewPods(ctx)
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
}
// Iterate through all pods and add to desired state of world if they don't
@@ -196,7 +197,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods(ctx context.Context)
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods(logger klog.Logger) {
podsFromCache := make(map[volumetypes.UniquePodName]struct{})
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
podsFromCache[volumetypes.UniquePodName(volumeToMount.Pod.UID)] = struct{}{}
@@ -211,7 +212,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
// It is not possible right now for a CSI plugin to be both attachable and non-deviceMountable
// So the uniqueVolumeName should remain the same after the attachability change
dswp.desiredStateOfWorld.MarkVolumeAttachability(volumeToMount.VolumeName, false)
klog.InfoS("Volume changes from attachable to non-attachable", "volumeName", volumeToMount.VolumeName)
logger.Info("Volume changes from attachable to non-attachable", "volumeName", volumeToMount.VolumeName)
continue
}
}
@@ -227,7 +228,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
// pod state provider to verify that all containers in the pod have been
// terminated.
if !dswp.podStateProvider.ShouldPodRuntimeBeRemoved(volumeToMount.Pod.UID) {
klog.V(4).InfoS("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod))
logger.V(4).Info("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod))
continue
}
var volumeToMountSpecName string
@@ -236,10 +237,10 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
}
removed := dswp.actualStateOfWorld.PodRemovedFromVolume(volumeToMount.PodName, volumeToMount.VolumeName)
if removed && podExists {
klog.V(4).InfoS("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
logger.V(4).Info("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
continue
}
klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
logger.V(4).Info("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
@@ -296,7 +297,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context,
}
pvc, volumeSpec, volumeGIDValue, err :=
dswp.createVolumeSpec(logger, podVolume, pod, mounts, devices)
dswp.createVolumeSpec(ctx, podVolume, pod, mounts, devices)
if err != nil {
logger.Error(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
@@ -306,7 +307,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context,
// Add volume to desired state of world
uniqueVolumeName, err := dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGIDValue, seLinuxContainerContexts[podVolume.Name])
logger, uniquePodName, pod, volumeSpec, podVolume.Name, volumeGIDValue, seLinuxContainerContexts[podVolume.Name])
if err != nil {
logger.Error(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
@@ -315,7 +316,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context,
}
logger.V(4).Info("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniqueVolumeName)
dswp.checkVolumeFSResize(logger, pod, podVolume, pvc, volumeSpec, uniqueVolumeName)
}
// some of the volume additions may have failed, should not mark this pod as fully processed
@@ -323,7 +324,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context,
dswp.markPodProcessed(uniquePodName)
// New pod has been synced. Re-mount all volumes that need it
// (e.g. DownwardAPI)
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
dswp.actualStateOfWorld.MarkRemountRequired(logger, uniquePodName)
// Remove any stored errors for the pod, everything went well in this processPodVolumes
dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
} else if dswp.podHasBeenSeenOnce(uniquePodName) {
@@ -340,6 +341,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(ctx context.Context,
// It is used for comparison with actual size(coming from pvc.Status.Capacity) and calling
// volume expansion on the node if needed.
func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
logger klog.Logger,
pod *v1.Pod,
podVolume v1.Volume,
pvc *v1.PersistentVolumeClaim,
@@ -358,16 +360,16 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
// we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.
if volumeSpec.ReadOnly {
// This volume is used as read only by this pod, we don't perform resize for read only volumes.
klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
logger.V(5).Info("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
return
}
pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage().DeepCopy()
pvcStatusCap := pvc.Status.Capacity.Storage().DeepCopy()
dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap)
klog.V(5).InfoS("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName)
logger.V(5).Info("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName)
// in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value
dswp.actualStateOfWorld.InitializeClaimSize(klog.TODO(), uniqueVolumeName, pvcStatusCap)
dswp.actualStateOfWorld.InitializeClaimSize(logger, uniqueVolumeName, pvcStatusCap)
}
// podPreviouslyProcessed returns true if the volumes for this pod have already
@@ -422,7 +424,8 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
// specified volume. It dereference any PVC to get PV objects, if needed.
// Returns an error if unable to obtain the volume at this time.
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
ctx context.Context, podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
logger := klog.FromContext(ctx)
pvcSource := podVolume.VolumeSource.PersistentVolumeClaim
isEphemeral := pvcSource == nil && podVolume.VolumeSource.Ephemeral != nil
if isEphemeral {
@@ -438,7 +441,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
logger.V(5).Info("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName))
// If podVolume is a PVC, fetch the real PV behind the claim
pvc, err := dswp.getPVCExtractPV(
pod.Namespace, pvcSource.ClaimName)
ctx, pod.Namespace, pvcSource.ClaimName)
if err != nil {
return nil, nil, "", fmt.Errorf(
"error processing PVC %s/%s: %v",
@@ -455,7 +458,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
logger.V(5).Info("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName)
// Fetch actual PV object
volumeSpec, volumeGIDValue, err :=
dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID)
dswp.getPVSpec(ctx, pvName, pvcSource.ReadOnly, pvcUID)
if err != nil {
return nil, nil, "", fmt.Errorf(
"error processing PVC %s/%s: %v",
@@ -518,9 +521,9 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
// it is pointing to and returns it.
// An error is returned if the PVC object's phase is not "Bound".
func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
namespace string, claimName string) (*v1.PersistentVolumeClaim, error) {
ctx context.Context, namespace string, claimName string) (*v1.PersistentVolumeClaim, error) {
pvc, err :=
dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), claimName, metav1.GetOptions{})
dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, claimName, metav1.GetOptions{})
if err != nil || pvc == nil {
return nil, fmt.Errorf("failed to fetch PVC from API server: %v", err)
}
@@ -557,10 +560,11 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
// and returns a volume.Spec representing it.
// An error is returned if the call to fetch the PV object fails.
func (dswp *desiredStateOfWorldPopulator) getPVSpec(
ctx context.Context,
name string,
pvcReadOnly bool,
expectedClaimUID types.UID) (*volume.Spec, string, error) {
pv, err := dswp.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), name, metav1.GetOptions{})
pv, err := dswp.kubeClient.CoreV1().PersistentVolumes().Get(ctx, name, metav1.GetOptions{})
if err != nil || pv == nil {
return nil, "", fmt.Errorf(
"failed to fetch PV %s from API server: %v", name, err)

View File

@@ -33,6 +33,7 @@ import (
core "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
@@ -261,6 +262,7 @@ type mutablePodManager interface {
}
func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t)
podName := util.GetUniquePodName(pod)
@@ -272,7 +274,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
podGet.Status.Phase = v1.PodFailed
dswp.podManager.(mutablePodManager).RemovePod(pod)
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
if !dswp.pods.processedPods[podName] {
t.Fatalf("Pod should not been removed from desired state of world since pod state still thinks it exists")
@@ -281,14 +283,14 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}}
// the pod state is marked as removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
if dswp.pods.processedPods[podName] {
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
}
// podWorker may call volume_manager WaitForUnmount() after we processed the pod in findAndRemoveDeletedPods()
dswp.ReprocessPod(podName)
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
// findAndRemoveDeletedPods() above must detect orphaned pod and delete it from the map
if _, ok := dswp.pods.processedPods[podName]; ok {
@@ -321,6 +323,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
}
func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t)
fakeASW := dswp.actualStateOfWorld
podName := util.GetUniquePodName(pod)
@@ -332,7 +335,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
}
podGet.Status.Phase = v1.PodFailed
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
// Although Pod status is terminated, pod still exists in pod manager and actual state does not has this pod and volume information
// desired state populator will fail to delete this pod and volume first
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */)
@@ -353,7 +356,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
// reconcile with actual state so that volume is added into the actual state
// desired state populator now can successfully delete the pod and volume
reconcileASW(fakeASW, dswp.desiredStateOfWorld, t)
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
if !dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */) {
t.Fatalf(
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
@@ -366,7 +369,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
// reconcile with actual state so that volume is added into the actual state
// desired state populator now can successfully delete the pod and volume
reconcileASW(fakeASW, dswp.desiredStateOfWorld, t)
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName, "" /* SELinuxContext */)
if volumeExists {
t.Fatalf(
@@ -384,6 +387,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
}
func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
dswp, fakePodState, pod, expectedVolumeName, pv := prepareDSWPWithPodPV(t)
podName := util.GetUniquePodName(pod)
@@ -416,7 +420,7 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) {
}
// the pod state now lists the pod as removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
if dswp.pods.processedPods[podName] {
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
}
@@ -517,6 +521,7 @@ func prepareDSWPWithPodPV(t *testing.T) (*desiredStateOfWorldPopulator, *fakePod
}
func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// create dswp
mode := v1.PersistentVolumeFilesystem
pv := &v1.PersistentVolume{
@@ -583,7 +588,7 @@ func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
verifyVolumeExistsInVolumesToMount(
t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW)
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
// After the volume plugin changes to nonattachable, the corresponding volume attachable field should change.
volumesToMount := fakesDSW.GetVolumesToMount()
for _, volume := range volumesToMount {
@@ -621,6 +626,7 @@ func TestEphemeralVolumeOwnerCheck(t *testing.T) {
}
func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// create dswp
mode := v1.PersistentVolumeBlock
pv := &v1.PersistentVolume{
@@ -698,7 +704,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t
fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}}
//pod is added to fakePodManager but pod state knows the pod is removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
dswp.findAndRemoveDeletedPods()
dswp.findAndRemoveDeletedPods(logger)
if dswp.pods.processedPods[podName] {
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
}
@@ -729,6 +735,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t
}
func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// create dswp
mode := v1.PersistentVolumeFilesystem
pv := &v1.PersistentVolume{
@@ -764,11 +771,10 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) {
}
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
logger, _ := ktesting.NewTestContext(t)
fakePodManager.AddPod(pod)
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec == nil || err != nil {
@@ -777,6 +783,7 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) {
}
func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// create dswp
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
@@ -811,11 +818,10 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) {
}
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
logger, _ := ktesting.NewTestContext(t)
fakePodManager.AddPod(pod)
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec == nil || err != nil {
@@ -824,6 +830,7 @@ func TestCreateVolumeSpec_Valid_Nil_VolumeMounts(t *testing.T) {
}
func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// create dswp
mode := v1.PersistentVolumeBlock
pv := &v1.PersistentVolume{
@@ -858,11 +865,10 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) {
}
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "block-bound", containers)
logger, _ := ktesting.NewTestContext(t)
fakePodManager.AddPod(pod)
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec == nil || err != nil {
@@ -871,6 +877,7 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) {
}
func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// create dswp
mode := v1.PersistentVolumeFilesystem
pv := &v1.PersistentVolume{
@@ -905,11 +912,10 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) {
}
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
logger, _ := ktesting.NewTestContext(t)
fakePodManager.AddPod(pod)
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
dswp.createVolumeSpec(ctx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec != nil || err == nil {
@@ -918,6 +924,7 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) {
}
func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) {
tCtx := ktesting.Init(t)
// create dswp
mode := v1.PersistentVolumeBlock
pv := &v1.PersistentVolume{
@@ -952,11 +959,10 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) {
}
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "block-bound", containers)
logger, _ := ktesting.NewTestContext(t)
fakePodManager.AddPod(pod)
mountsMap, devicesMap, _ := util.GetPodVolumeNames(pod, false /* collectSELinuxOptions */)
_, volumeSpec, _, err :=
dswp.createVolumeSpec(logger, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
dswp.createVolumeSpec(tCtx, pod.Spec.Volumes[0], pod, mountsMap, devicesMap)
// Assert
if volumeSpec != nil || err == nil {
@@ -1417,15 +1423,16 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te
func reprocess(ctx context.Context, dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName,
dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
logger := klog.FromContext(ctx)
dswp.ReprocessPod(uniquePodName)
dswp.findAndAddNewPods(ctx)
return getResizeRequiredVolumes(dsw, asw, newSize)
return getResizeRequiredVolumes(logger, dsw, asw, newSize)
}
func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
func getResizeRequiredVolumes(logger klog.Logger, dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
resizeRequiredVolumes := []v1.UniqueVolumeName{}
for _, volumeToMount := range dsw.GetVolumesToMount() {
_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, newSize, "" /* SELinuxContext */)
_, _, err := asw.PodExistsInVolume(logger, volumeToMount.PodName, volumeToMount.VolumeName, newSize, "" /* SELinuxContext */)
if cache.IsFSResizeRequiredError(err) {
resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName)
}

View File

@@ -17,44 +17,48 @@ limitations under the License.
package reconciler
import (
"context"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
func (rc *reconciler) Run(stopCh <-chan struct{}) {
rc.reconstructVolumes()
klog.InfoS("Reconciler: start to sync state")
wait.Until(rc.reconcile, rc.loopSleepDuration, stopCh)
func (rc *reconciler) Run(ctx context.Context, stopCh <-chan struct{}) {
logger := klog.FromContext(ctx)
rc.reconstructVolumes(logger)
logger.Info("Reconciler: start to sync state")
wait.Until(func() { rc.reconcile(ctx) }, rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconcile() {
func (rc *reconciler) reconcile(ctx context.Context) {
logger := klog.FromContext(ctx)
readyToUnmount := rc.readyToUnmount()
if readyToUnmount {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
rc.unmountVolumes(logger)
}
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountOrAttachVolumes()
rc.mountOrAttachVolumes(logger)
// Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume
// that is still needed, but it did not reach DSW yet.
if readyToUnmount {
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
rc.unmountDetachDevices(logger)
// Clean up any orphan volumes that failed reconstruction.
rc.cleanOrphanVolumes()
rc.cleanOrphanVolumes(logger)
}
if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
rc.updateReconstructedFromNodeStatus()
rc.updateReconstructedFromNodeStatus(ctx)
}
if len(rc.volumesNeedUpdateFromNodeStatus) == 0 {
// ASW is fully populated only after both devicePaths and uncertain volume attach-ability

View File

@@ -17,6 +17,7 @@ limitations under the License.
package reconciler
import (
"context"
"fmt"
"sync"
"time"
@@ -49,7 +50,7 @@ type Reconciler interface {
// If attach/detach management is enabled, the manager will also check if
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run(stopCh <-chan struct{})
Run(ctx context.Context, stopCh <-chan struct{})
// StatesHasBeenSynced returns true only after syncStates process starts to sync
// states at least once after kubelet starts
@@ -144,7 +145,7 @@ type reconciler struct {
volumesNeedUpdateFromNodeStatus []v1.UniqueVolumeName
}
func (rc *reconciler) unmountVolumes() {
func (rc *reconciler) unmountVolumes(logger klog.Logger) {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if rc.operationExecutor.IsOperationPending(mountedVolume.VolumeName, mountedVolume.PodName, nestedpendingoperations.EmptyNodeName) {
@@ -152,26 +153,26 @@ func (rc *reconciler) unmountVolumes() {
}
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName, mountedVolume.SELinuxMountContext) {
// Volume is mounted, unmount it
klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
logger.V(5).Info(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
logger.Error(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
logger.Info(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
}
func (rc *reconciler) mountOrAttachVolumes() {
func (rc *reconciler) mountOrAttachVolumes(logger klog.Logger) {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
continue
}
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(logger, volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
volumeToMount.DevicePath = devicePath
if cache.IsSELinuxMountMismatchError(err) {
// The volume is mounted, but with an unexpected SELinux context.
@@ -180,75 +181,74 @@ func (rc *reconciler) mountOrAttachVolumes() {
rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
continue
} else if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
rc.waitForVolumeAttach(logger, volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
rc.mountAttachedVolumes(logger, volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
rc.expandVolume(logger, volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
func (rc *reconciler) expandVolume(logger klog.Logger, volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
logger.V(4).Info(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
logger.Error(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
logger.V(4).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
func (rc *reconciler) mountAttachedVolumes(logger klog.Logger, volumeToMount cache.VolumeToMount, podExistError error) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(podExistError)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
logger.V(4).Info(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
logger.Error(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
if remountingLogStr == "" {
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
logger.V(1).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
} else {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
logger.V(5).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
logger := klog.TODO()
func (rc *reconciler) waitForVolumeAttach(logger klog.Logger, volumeToMount cache.VolumeToMount) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
logger.V(5).Info(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
return
}
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
logger.V(5).Info(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.VerifyControllerAttachedVolume(
logger,
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
logger.Error(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
logger.Info(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
@@ -259,18 +259,18 @@ func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
NodeName: rc.nodeName,
ScheduledPods: []*v1.Pod{volumeToMount.Pod},
}
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
logger.V(5).Info(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.AttachVolume(logger, volumeToAttach, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
logger.Error(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
logger.Info(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
func (rc *reconciler) unmountDetachDevices() {
func (rc *reconciler) unmountDetachDevices(logger klog.Logger) {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName, attachedVolume.SELinuxMountContext) &&
@@ -282,31 +282,31 @@ func (rc *reconciler) unmountDetachDevices() {
attachedVolume, _ = rc.actualStateOfWorld.GetAttachedVolume(attachedVolume.VolumeName)
if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
logger.V(5).Info(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
logger.Error(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
logger.Info(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
}
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
logger.Info(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
logger.V(5).Info(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
klog.TODO(), attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
logger, attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
logger.Error(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
logger.Info(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
}
}
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package reconciler
import (
"context"
"crypto/md5"
"fmt"
"path/filepath"
@@ -39,7 +40,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
@@ -69,6 +69,7 @@ func hasAddedPods() bool { return true }
// Calls Run()
// Verifies there are no calls to attach, detach, mount, unmount, etc.
func Test_Run_Positive_DoNothing(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -99,7 +100,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
kubeletPodsDir)
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
// Assert
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
@@ -114,6 +115,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
// Calls Run()
// Verifies there is are attach/mount/etc calls and no detach/unmount calls.
func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -163,7 +165,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -171,7 +173,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@@ -191,7 +193,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
// Verifies there is are attach/mount/etc calls and no detach/unmount calls.
func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
// Arrange
logger, _ := ktesting.NewTestContext(t)
logger, ctx := ktesting.NewTestContext(t)
intreeToCSITranslator := csitrans.New()
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@@ -265,6 +267,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
logger,
podName,
pod,
migratedSpec,
@@ -280,7 +283,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
@@ -299,6 +302,7 @@ func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
// Verifies there is one mount call and no unmount calls.
// Verifies there are no attach/detach calls.
func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@@ -361,7 +365,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
// Assert
@@ -370,7 +374,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
@@ -387,11 +391,12 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
// Populates desiredStateOfWorld cache with one volume/pod.
// Enables controllerAttachDetachEnabled.
// volume is not repored-in-use
// volume is not reported-in-use
// Calls Run()
// Verifies that there is not wait-for-mount call
// Verifies that there is no exponential-backoff triggered
func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -441,7 +446,7 @@ func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -449,7 +454,7 @@ func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
time.Sleep(reconcilerSyncWaitDuration)
ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName)
@@ -471,6 +476,7 @@ func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
// Deletes volume/pod from desired state of world.
// Verifies detach/unmount calls are issued.
func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
@@ -520,7 +526,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -528,7 +534,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@@ -561,6 +567,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Verifies one unmount call is made.
// Verifies there are no attach/detach calls made.
func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@@ -623,7 +630,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -631,7 +638,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
waitForMount(t, fakePlugin, generatedVolumeName, asw)
@@ -662,6 +669,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Verifies there are attach/get map paths/setupDevice calls and
// no detach/teardownDevice calls.
func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
@@ -731,7 +739,7 @@ func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -739,7 +747,7 @@ func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@@ -759,6 +767,7 @@ func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
// and no teardownDevice call.
// Verifies there are no attach/detach calls.
func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
@@ -844,7 +853,7 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
// Assert
@@ -853,7 +862,7 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
@@ -873,6 +882,7 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
// Deletes volume/pod from desired state of world.
// Verifies one detach/teardownDevice calls are issued.
func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
@@ -942,7 +952,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -950,7 +960,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@@ -981,6 +991,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
// Verifies one teardownDevice call is made.
// Verifies there are no attach/detach calls made.
func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
@@ -1067,7 +1078,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
@@ -1075,7 +1086,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
}
// Act
runReconciler(reconciler)
runReconciler(ctx, reconciler)
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
waitForMount(t, fakePlugin, generatedVolumeName, asw)
@@ -1239,6 +1250,7 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
// Mark volume as fsResizeRequired in ASW.
// Verifies volume's fsResizeRequired flag is cleared later.
func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
blockMode := v1.PersistentVolumeBlock
fsMode := v1.PersistentVolumeFilesystem
@@ -1349,7 +1361,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@@ -1360,7 +1372,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
defer close(stoppedChan)
reconciler.Run(stopChan)
reconciler.Run(ctx, stopChan)
}()
waitForMount(t, fakePlugin, volumeName, asw)
// Stop the reconciler.
@@ -1370,7 +1382,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
// Simulate what DSOWP does
pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
volumeSpec = &volume.Spec{PersistentVolume: pvWithSize}
_, err = dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContexts */)
_, err = dsw.AddPodToVolume(logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxContexts */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
@@ -1379,7 +1391,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
newSize := tc.newPVSize.DeepCopy()
dsw.UpdatePersistentVolumeSize(volumeName, newSize)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxLabel */)
_, _, podExistErr := asw.PodExistsInVolume(logger, podName, volumeName, newSize, "" /* SELinuxLabel */)
if tc.expansionFailed {
if cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
@@ -1388,10 +1400,10 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
if !cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
}
go reconciler.Run(wait.NeverStop)
go reconciler.Run(ctx, wait.NeverStop)
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxContext */)
mounted, _, err := asw.PodExistsInVolume(logger, podName, volumeName, newSize, "" /* SELinuxContext */)
return mounted && err == nil, nil
})
if waitErr != nil {
@@ -1467,6 +1479,7 @@ func getTestPod(claimName string) *v1.Pod {
}
func Test_UncertainDeviceGlobalMounts(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
var tests = []struct {
name string
deviceState operationexecutor.DeviceMountState
@@ -1607,7 +1620,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@@ -1617,7 +1630,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
reconciler.Run(ctx, stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
@@ -1630,7 +1643,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
asw.MarkRemountRequired(logger, podName)
time.Sleep(reconcilerSyncWaitDuration)
}
@@ -1662,6 +1675,7 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
}
func Test_UncertainVolumeMountState(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
var tests = []struct {
name string
volumeState operationexecutor.VolumeMountState
@@ -1830,7 +1844,7 @@ func Test_UncertainVolumeMountState(t *testing.T) {
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
volumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
@@ -1840,7 +1854,7 @@ func Test_UncertainVolumeMountState(t *testing.T) {
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
reconciler.Run(ctx, stopChan)
close(stoppedChan)
}()
waitForVolumeToExistInASW(t, volumeName, asw)
@@ -1856,7 +1870,7 @@ func Test_UncertainVolumeMountState(t *testing.T) {
tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
// wait for mount and then break it via remount
waitForMount(t, fakePlugin, volumeName, asw)
asw.MarkRemountRequired(podName)
asw.MarkRemountRequired(logger, podName)
time.Sleep(reconcilerSyncWaitDuration)
}
@@ -1944,11 +1958,12 @@ func waitForGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.
}
func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podName types.UniquePodName, asw cache.ActualStateOfWorld) {
logger, _ := ktesting.NewTestContext(t)
// check if volume is locally pod mounted in uncertain state
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
func() (bool, error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}, "" /* SELinuxContext */)
mounted, _, err := asw.PodExistsInVolume(logger, podName, volumeName, resource.Quantity{}, "" /* SELinuxContext */)
if mounted || err != nil {
return false, nil
}
@@ -2061,8 +2076,8 @@ func createTestClient(attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
return fakeClient
}
func runReconciler(reconciler Reconciler) {
go reconciler.Run(wait.NeverStop)
func runReconciler(ctx context.Context, reconciler Reconciler) {
go reconciler.Run(ctx, wait.NeverStop)
}
func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim, attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
@@ -2101,6 +2116,7 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume
}
func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@@ -2169,7 +2185,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
volumeSpecCopy := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
if err != nil {
@@ -2178,7 +2194,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
// Start the reconciler to fill ASW.
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
go func() {
reconciler.Run(stopChan)
reconciler.Run(ctx, stopChan)
close(stoppedChan)
}()
waitForMount(t, fakePlugin, generatedVolumeName, asw)
@@ -2193,10 +2209,10 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
fakePlugin.UnmountDeviceHook = func(mountPath string) error {
// Act:
// 3. While a volume is being unmounted, add it back to the desired state of world
klog.InfoS("UnmountDevice called")
logger.Info("UnmountDevice called")
var generatedVolumeNameCopy v1.UniqueVolumeName
generatedVolumeNameCopy, err = dsw.AddPodToVolume(
podName, pod, volumeSpecCopy, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
logger, podName, pod, volumeSpecCopy, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* seLinuxLabel */)
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeNameCopy})
return nil
}
@@ -2208,7 +2224,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
// Assert
// 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
if devicePath == "" {
klog.ErrorS(nil, "Expected WaitForAttach called with devicePath from Node.Status")
logger.Error(nil, "Expected WaitForAttach called with devicePath from Node.Status")
return "", fmt.Errorf("Expected devicePath from Node.Status")
}
return devicePath, nil
@@ -2216,7 +2232,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
fakePlugin.Unlock()
// Start the reconciler again.
go reconciler.Run(wait.NeverStop)
go reconciler.Run(ctx, wait.NeverStop)
// 2. Delete the volume from DSW (and wait for callbacks)
dsw.DeletePodFromVolume(podName, generatedVolumeName)
@@ -2304,6 +2320,7 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string, kubeCl
}
func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Calls Run() with two reconstructed volumes.
// Verifies the devicePaths + volume attachability are reconstructed from node.status.
@@ -2382,9 +2399,9 @@ func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]}
volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2")
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, ""))
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(logger, volumeName1, volumeSpec1, nodeName, ""))
assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", ""))
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, ""))
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(logger, volumeName2, volumeSpec2, nodeName, ""))
assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", ""))
assert.False(t, reconciler.StatesHasBeenSynced())
@@ -2392,7 +2409,7 @@ func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2)
// Act - run reconcile loop just once.
// "volumesNeedUpdateFromNodeStatus" is not empty, so no unmount will be triggered.
reconciler.reconcile()
reconciler.reconcile(ctx)
// Assert
assert.True(t, reconciler.StatesHasBeenSynced())

View File

@@ -47,29 +47,29 @@ func (rc *reconciler) readyToUnmount() bool {
// directories from the disk. For the volumes that cannot support or fail reconstruction, it will
// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld
// is populated.
func (rc *reconciler) reconstructVolumes() {
func (rc *reconciler) reconstructVolumes(logger klog.Logger) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
podVolumes, err := getVolumesFromPodDir(logger, rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
logger.Error(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
reconstructedVolumeNames := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
logger.V(4).Info("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
logger.Info("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
// We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned.
rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume)
continue
}
klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
logger.V(4).Info("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
@@ -89,21 +89,21 @@ func (rc *reconciler) reconstructVolumes() {
if len(reconstructedVolumes) > 0 {
// Add the volumes to ASW
rc.updateStates(reconstructedVolumes)
rc.updateStates(logger, reconstructedVolumes)
// Remember to update devicePath from node.status.volumesAttached
rc.volumesNeedUpdateFromNodeStatus = reconstructedVolumeNames
}
klog.V(2).InfoS("Volume reconstruction finished")
logger.V(2).Info("Volume reconstruction finished")
}
func (rc *reconciler) updateStates(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
func (rc *reconciler) updateStates(logger klog.Logger, reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
for _, gvl := range reconstructedVolumes {
err := rc.actualStateOfWorld.AddAttachUncertainReconstructedVolume(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
logger, gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
logger.Error(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
var seLinuxMountContext string
@@ -123,31 +123,31 @@ func (rc *reconciler) updateStates(reconstructedVolumes map[v1.UniqueVolumeName]
_, err = rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
logger.Error(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
seLinuxMountContext = volume.seLinuxMountContext
klog.V(2).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName, "seLinuxMountContext", volume.seLinuxMountContext)
logger.V(2).Info("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName, "seLinuxMountContext", volume.seLinuxMountContext)
}
// If the volume has device to mount, we mark its device as uncertain.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
logger.Error(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, seLinuxMountContext)
if err != nil {
klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
logger.Error(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
continue
}
klog.V(2).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
logger.V(2).Info("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
}
}
}
// cleanOrphanVolumes tries to clean up all volumes that failed reconstruction.
func (rc *reconciler) cleanOrphanVolumes() {
func (rc *reconciler) cleanOrphanVolumes(logger klog.Logger) {
if len(rc.volumesFailedReconstruction) == 0 {
return
}
@@ -156,14 +156,14 @@ func (rc *reconciler) cleanOrphanVolumes() {
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
logger.V(4).Info("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
rc.cleanupMounts(volume)
logger.Info("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
rc.cleanupMounts(logger, volume)
}
klog.V(2).InfoS("Orphan volume cleanup finished")
logger.V(2).Info("Orphan volume cleanup finished")
// Clean the cache, cleanup is one shot operation.
rc.volumesFailedReconstruction = make([]podVolume, 0)
}
@@ -171,21 +171,22 @@ func (rc *reconciler) cleanOrphanVolumes() {
// updateReconstructedFromNodeStatus tries to file devicePaths of reconstructed volumes from
// node.Status.VolumesAttached. This can be done only after connection to the API
// server is established, i.e. it can't be part of reconstructVolumes().
func (rc *reconciler) updateReconstructedFromNodeStatus() {
klog.V(4).InfoS("Updating reconstructed devicePaths")
func (rc *reconciler) updateReconstructedFromNodeStatus(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.V(4).Info("Updating reconstructed devicePaths")
if rc.kubeClient == nil {
// Skip reconstructing devicePath from node objects if kubelet is in standalone mode.
// Such kubelet is not expected to mount any attachable volume or Secrets / ConfigMap.
klog.V(2).InfoS("Skipped reconstruction of DevicePaths from node.status in standalone mode")
logger.V(2).Info("Skipped reconstruction of DevicePaths from node.status in standalone mode")
rc.volumesNeedUpdateFromNodeStatus = nil
return
}
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(ctx, string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
// This may repeat few times per second until kubelet is able to read its own status for the first time.
klog.V(4).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
logger.V(4).Error(fetchErr, "Failed to get Node status to reconstruct device paths")
return
}
@@ -197,11 +198,11 @@ func (rc *reconciler) updateReconstructedFromNodeStatus() {
}
rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath)
attachable = true
klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
logger.V(4).Info("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
}
rc.actualStateOfWorld.UpdateReconstructedVolumeAttachability(volumeID, attachable)
}
klog.V(2).InfoS("DevicePaths of reconstructed volumes updated")
logger.V(2).Info("DevicePaths of reconstructed volumes updated")
rc.volumesNeedUpdateFromNodeStatus = nil
}

View File

@@ -154,8 +154,8 @@ func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
gvi.podVolumes[rcv.podName] = rcv
}
func (rc *reconciler) cleanupMounts(volume podVolume) {
klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
func (rc *reconciler) cleanupMounts(logger klog.Logger, volume podVolume) {
logger.V(2).Info("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName,
// VolumeName should be generated by `GetUniqueVolumeNameFromSpec` or `GetUniqueVolumeNameFromSpecWithPod`.
@@ -171,7 +171,7 @@ func (rc *reconciler) cleanupMounts(volume podVolume) {
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil {
metrics.ForceCleanedFailedVolumeOperationsErrorsTotal.Inc()
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
logger.Error(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
return
}
}
@@ -194,7 +194,7 @@ func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
func getVolumesFromPodDir(logger klog.Logger, podDir string) ([]podVolume, error) {
podsDirInfo, err := os.ReadDir(podDir)
if err != nil {
return nil, err
@@ -227,13 +227,13 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
volumePluginPath := filepath.Join(volumesDir, pluginName)
volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
if err != nil {
klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
logger.Error(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
continue
}
unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
for _, volumeName := range volumePluginDirs {
volumePath := filepath.Join(volumePluginPath, volumeName)
klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
logger.V(5).Info("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName,
@@ -246,7 +246,7 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
}
}
for _, volume := range volumes {
klog.V(4).InfoS("Get volume from pod directory", "path", podDir, "volume", volume)
logger.V(4).Info("Get volume from pod directory", "path", podDir, "volume", volume)
}
return volumes, nil
}

View File

@@ -33,6 +33,7 @@ import (
)
func TestReconstructVolumes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
tests := []struct {
name string
volumePaths []string
@@ -102,7 +103,7 @@ func TestReconstructVolumes(t *testing.T) {
rcInstance, _ := rc.(*reconciler)
// Act
rcInstance.reconstructVolumes()
rcInstance.reconstructVolumes(logger)
// Assert
// Convert to []UniqueVolumeName
@@ -195,7 +196,7 @@ func TestCleanOrphanVolumes(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */)
if err != nil {
t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
}
@@ -203,7 +204,7 @@ func TestCleanOrphanVolumes(t *testing.T) {
}
// Act
rcInstance.cleanOrphanVolumes()
rcInstance.cleanOrphanVolumes(logger)
// Assert
if len(rcInstance.volumesFailedReconstruction) != 0 {
@@ -245,6 +246,7 @@ func TestReconstructVolumesMount(t *testing.T) {
// Since the volume is reconstructed, it must be marked as uncertain
// even after a final SetUp error, see https://github.com/kubernetes/kubernetes/issues/96635
// and https://github.com/kubernetes/kubernetes/pull/110670.
logger, ctx := ktesting.NewTestContext(t)
tests := []struct {
name string
@@ -304,7 +306,7 @@ func TestReconstructVolumesMount(t *testing.T) {
rcInstance, _ := rc.(*reconciler)
// Act 1 - reconstruction
rcInstance.reconstructVolumes()
rcInstance.reconstructVolumes(logger)
// Assert 1 - the volume is Uncertain
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
@@ -318,7 +320,7 @@ func TestReconstructVolumesMount(t *testing.T) {
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */)
logger, podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGIDValue */, nil /* SELinuxContext */)
if err != nil {
t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
}
@@ -333,7 +335,7 @@ func TestReconstructVolumesMount(t *testing.T) {
rcInstance.volumesNeedUpdateFromNodeStatus = nil
// Act 2 - reconcile once
rcInstance.reconcile()
rcInstance.reconcile(ctx)
// Assert 2
// MountDevice was attempted

View File

@@ -282,6 +282,7 @@ type volumeManager struct {
}
func (vm *volumeManager) Run(ctx context.Context, sourcesReady config.SourcesReady) {
logger := klog.FromContext(ctx)
defer runtime.HandleCrash()
if vm.kubeClient != nil {
@@ -290,15 +291,15 @@ func (vm *volumeManager) Run(ctx context.Context, sourcesReady config.SourcesRea
}
go vm.desiredStateOfWorldPopulator.Run(ctx, sourcesReady)
klog.V(2).InfoS("The desired_state_of_world populator starts")
logger.V(2).Info("The desired_state_of_world populator starts")
klog.InfoS("Starting Kubelet Volume Manager")
go vm.reconciler.Run(ctx.Done())
logger.Info("Starting Kubelet Volume Manager")
go vm.reconciler.Run(ctx, ctx.Done())
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
<-ctx.Done()
klog.InfoS("Shutting down Kubelet Volume Manager")
logger.Info("Shutting down Kubelet Volume Manager")
}
func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
@@ -389,6 +390,7 @@ func (vm *volumeManager) MarkVolumesAsReportedInUse(
}
func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
logger := klog.FromContext(ctx)
if pod == nil {
return nil
}
@@ -399,7 +401,7 @@ func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod)
return nil
}
klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
logger.V(3).Info("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
uniquePodName := util.GetUniquePodName(pod)
// Some pods expect to have Setup called over and over again to update.
@@ -435,16 +437,17 @@ func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod)
err)
}
klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
logger.V(3).Info("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
return nil
}
func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error {
logger := klog.FromContext(ctx)
if pod == nil {
return nil
}
klog.V(3).InfoS("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod))
logger.V(3).Info("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod))
uniquePodName := util.GetUniquePodName(pod)
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
@@ -472,7 +475,7 @@ func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error
err)
}
klog.V(3).InfoS("All volumes are unmounted for pod", "pod", klog.KObj(pod))
logger.V(3).Info("All volumes are unmounted for pod", "pod", klog.KObj(pod))
return nil
}

View File

@@ -51,6 +51,7 @@ const (
)
func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tests := []struct {
name string
pvMode, podMode v1.PersistentVolumeMode
@@ -111,7 +112,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
tCtx.Done(),
manager)
err = manager.WaitForAttachAndMount(context.Background(), pod)
err = manager.WaitForAttachAndMount(ctx, pod)
if err != nil && !test.expectError {
t.Errorf("Expected success: %v", err)
}
@@ -144,6 +145,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
}
func TestWaitForAttachAndMountError(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
@@ -231,7 +233,7 @@ func TestWaitForAttachAndMountError(t *testing.T) {
podManager.SetPods([]*v1.Pod{pod})
err = manager.WaitForAttachAndMount(context.Background(), pod)
err = manager.WaitForAttachAndMount(ctx, pod)
if err == nil {
t.Errorf("Expected error, got none")
}
@@ -242,6 +244,7 @@ func TestWaitForAttachAndMountError(t *testing.T) {
}
func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
tCtx := ktesting.Init(t)
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
@@ -258,7 +261,6 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
tCtx := ktesting.Init(t)
defer tCtx.Cancel("test has completed")
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
go manager.Run(tCtx, sourcesReady)
@@ -272,10 +274,10 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
manager)
// delayed claim binding
go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)
go delayClaimBecomesBound(t, kubeClient, claim.GetNamespace(), claim.Name)
err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
err = manager.WaitForAttachAndMount(context.Background(), pod)
err = manager.WaitForAttachAndMount(tCtx, pod)
if err != nil {
// Few "PVC not bound" errors are expected
return false, nil
@@ -289,6 +291,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
}
func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
@@ -361,7 +364,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
tCtx.Done(),
manager)
err = manager.WaitForAttachAndMount(context.Background(), pod)
err = manager.WaitForAttachAndMount(ctx, pod)
if err != nil {
t.Errorf("Expected success: %v", err)
continue
@@ -537,19 +540,28 @@ func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan str
}
func delayClaimBecomesBound(
t *testing.T,
kubeClient clientset.Interface,
namespace, claimName string,
) {
tCtx := ktesting.Init(t)
time.Sleep(500 * time.Millisecond)
volumeClaim, _ :=
kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), claimName, metav1.GetOptions{})
volumeClaim, err :=
kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(tCtx, claimName, metav1.GetOptions{})
if err != nil {
t.Errorf("Failed to get PVC: %v", err)
}
volumeClaim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
}
kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{})
_, err = kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(tCtx, volumeClaim, metav1.UpdateOptions{})
if err != nil {
t.Errorf("Failed to update PVC: %v", err)
}
}
func TestWaitForAllPodsUnmount(t *testing.T) {
tCtx := ktesting.Init(t)
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
require.NoError(t, err, "Failed to create temp directory")
defer func() {
@@ -584,7 +596,7 @@ func TestWaitForAllPodsUnmount(t *testing.T) {
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(tCtx, 1*time.Second)
defer cancel()
sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
go manager.Run(ctx, sourcesReady)
@@ -596,7 +608,7 @@ func TestWaitForAllPodsUnmount(t *testing.T) {
ctx.Done(),
manager)
err := manager.WaitForAttachAndMount(context.Background(), pod)
err := manager.WaitForAttachAndMount(ctx, pod)
require.NoError(t, err, "Failed to wait for attach and mount")
err = manager.WaitForAllPodsUnmount(ctx, []*v1.Pod{pod})