mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	The provided DialContext wraps existing clients' DialContext in an attempt to preserve any existing timeout configuration. In some cases, we may replace infinite timeouts with golang defaults. - scaleio: tcp connect/keepalive values changed from 0/15 to 30/30 - storageos: no change
		
			
				
	
	
		
			338 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			338 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package attachdetach
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/labels"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	kcache "k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
	"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
 | 
						|
	controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
 | 
						|
	"k8s.io/kubernetes/pkg/volume"
 | 
						|
)
 | 
						|
 | 
						|
func Test_NewAttachDetachController_Positive(t *testing.T) {
 | 
						|
	// Arrange
 | 
						|
	fakeKubeClient := controllervolumetesting.CreateTestClient()
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
 | 
						|
 | 
						|
	// Act
 | 
						|
	_, err := NewAttachDetachController(
 | 
						|
		fakeKubeClient,
 | 
						|
		informerFactory.Core().V1().Pods(),
 | 
						|
		informerFactory.Core().V1().Nodes(),
 | 
						|
		informerFactory.Core().V1().PersistentVolumeClaims(),
 | 
						|
		informerFactory.Core().V1().PersistentVolumes(),
 | 
						|
		informerFactory.Storage().V1().CSINodes(),
 | 
						|
		informerFactory.Storage().V1().CSIDrivers(),
 | 
						|
		informerFactory.Storage().V1().VolumeAttachments(),
 | 
						|
		nil, /* cloud */
 | 
						|
		nil, /* plugins */
 | 
						|
		nil, /* prober */
 | 
						|
		false,
 | 
						|
		5*time.Second,
 | 
						|
		DefaultTimerConfig,
 | 
						|
		nil, /* filteredDialOptions */
 | 
						|
	)
 | 
						|
 | 
						|
	// Assert
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
 | 
						|
	// Arrange
 | 
						|
	fakeKubeClient := controllervolumetesting.CreateTestClient()
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
 | 
						|
	podInformer := informerFactory.Core().V1().Pods()
 | 
						|
	nodeInformer := informerFactory.Core().V1().Nodes()
 | 
						|
	pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
 | 
						|
	pvInformer := informerFactory.Core().V1().PersistentVolumes()
 | 
						|
 | 
						|
	adc := &attachDetachController{
 | 
						|
		kubeClient:  fakeKubeClient,
 | 
						|
		pvcLister:   pvcInformer.Lister(),
 | 
						|
		pvcsSynced:  pvcInformer.Informer().HasSynced,
 | 
						|
		pvLister:    pvInformer.Lister(),
 | 
						|
		pvsSynced:   pvInformer.Informer().HasSynced,
 | 
						|
		podLister:   podInformer.Lister(),
 | 
						|
		podsSynced:  podInformer.Informer().HasSynced,
 | 
						|
		nodeLister:  nodeInformer.Lister(),
 | 
						|
		nodesSynced: nodeInformer.Informer().HasSynced,
 | 
						|
		cloud:       nil,
 | 
						|
	}
 | 
						|
 | 
						|
	// Act
 | 
						|
	plugins := controllervolumetesting.CreateTestPlugin()
 | 
						|
	var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
 | 
						|
 | 
						|
	if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
 | 
						|
		t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
 | 
						|
	adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
 | 
						|
 | 
						|
	err := adc.populateActualStateOfWorld()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
 | 
						|
	}
 | 
						|
 | 
						|
	err = adc.populateDesiredStateOfWorld()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Test the ActualStateOfWorld contains all the node volumes
 | 
						|
	nodes, err := adc.nodeLister.List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Failed to list nodes in indexer. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, node := range nodes {
 | 
						|
		nodeName := types.NodeName(node.Name)
 | 
						|
		for _, attachedVolume := range node.Status.VolumesAttached {
 | 
						|
			found := adc.actualStateOfWorld.IsVolumeAttachedToNode(attachedVolume.Name, nodeName)
 | 
						|
			if !found {
 | 
						|
				t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	pods, err := adc.podLister.List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
	for _, pod := range pods {
 | 
						|
		uniqueName := fmt.Sprintf("%s/%s", controllervolumetesting.TestPluginName, pod.Spec.Volumes[0].Name)
 | 
						|
		nodeName := types.NodeName(pod.Spec.NodeName)
 | 
						|
		found := adc.desiredStateOfWorld.VolumeExists(v1.UniqueVolumeName(uniqueName), nodeName)
 | 
						|
		if !found {
 | 
						|
			t.Fatalf("Run failed with error. Volume %s, node %s not found in DesiredStateOfWorld",
 | 
						|
				pod.Spec.Volumes[0].Name,
 | 
						|
				pod.Spec.NodeName)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func Test_AttachDetachControllerRecovery(t *testing.T) {
 | 
						|
	attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{})
 | 
						|
	newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
 | 
						|
	attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{})
 | 
						|
	newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
 | 
						|
	attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{newPod1})
 | 
						|
	newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")
 | 
						|
	newPod2 := controllervolumetesting.NewPodWithVolume("newpod-2", "volumeName3", "mynode-1")
 | 
						|
	attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{newPod2})
 | 
						|
}
 | 
						|
 | 
						|
func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 []*v1.Pod) {
 | 
						|
	fakeKubeClient := controllervolumetesting.CreateTestClient()
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
 | 
						|
	//informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
 | 
						|
	plugins := controllervolumetesting.CreateTestPlugin()
 | 
						|
	var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock
 | 
						|
	nodeInformer := informerFactory.Core().V1().Nodes().Informer()
 | 
						|
	csiNodeInformer := informerFactory.Storage().V1().CSINodes().Informer()
 | 
						|
	podInformer := informerFactory.Core().V1().Pods().Informer()
 | 
						|
	var podsNum, extraPodsNum, nodesNum, i int
 | 
						|
 | 
						|
	// Create the controller
 | 
						|
	adcObj, err := NewAttachDetachController(
 | 
						|
		fakeKubeClient,
 | 
						|
		informerFactory.Core().V1().Pods(),
 | 
						|
		informerFactory.Core().V1().Nodes(),
 | 
						|
		informerFactory.Core().V1().PersistentVolumeClaims(),
 | 
						|
		informerFactory.Core().V1().PersistentVolumes(),
 | 
						|
		informerFactory.Storage().V1().CSINodes(),
 | 
						|
		informerFactory.Storage().V1().CSIDrivers(),
 | 
						|
		informerFactory.Storage().V1().VolumeAttachments(),
 | 
						|
		nil, /* cloud */
 | 
						|
		plugins,
 | 
						|
		prober,
 | 
						|
		false,
 | 
						|
		1*time.Second,
 | 
						|
		DefaultTimerConfig,
 | 
						|
		nil, /* filteredDialOptions */
 | 
						|
	)
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
 | 
						|
	}
 | 
						|
 | 
						|
	adc := adcObj.(*attachDetachController)
 | 
						|
 | 
						|
	stopCh := make(chan struct{})
 | 
						|
 | 
						|
	pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, pod := range pods.Items {
 | 
						|
		podToAdd := pod
 | 
						|
		podInformer.GetIndexer().Add(&podToAdd)
 | 
						|
		podsNum++
 | 
						|
	}
 | 
						|
	nodes, err := fakeKubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
	for _, node := range nodes.Items {
 | 
						|
		nodeToAdd := node
 | 
						|
		nodeInformer.GetIndexer().Add(&nodeToAdd)
 | 
						|
		nodesNum++
 | 
						|
	}
 | 
						|
 | 
						|
	csiNodes, err := fakeKubeClient.StorageV1().CSINodes().List(context.TODO(), metav1.ListOptions{})
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
	for _, csiNode := range csiNodes.Items {
 | 
						|
		csiNodeToAdd := csiNode
 | 
						|
		csiNodeInformer.GetIndexer().Add(&csiNodeToAdd)
 | 
						|
	}
 | 
						|
 | 
						|
	informerFactory.Start(stopCh)
 | 
						|
 | 
						|
	if !kcache.WaitForNamedCacheSync("attach detach", stopCh,
 | 
						|
		informerFactory.Core().V1().Pods().Informer().HasSynced,
 | 
						|
		informerFactory.Core().V1().Nodes().Informer().HasSynced,
 | 
						|
		informerFactory.Storage().V1().CSINodes().Informer().HasSynced) {
 | 
						|
		t.Fatalf("Error waiting for the informer caches to sync")
 | 
						|
	}
 | 
						|
 | 
						|
	// Make sure the nodes and pods are in the informer cache
 | 
						|
	i = 0
 | 
						|
	nodeList, err := informerFactory.Core().V1().Nodes().Lister().List(labels.Everything())
 | 
						|
	for len(nodeList) < nodesNum {
 | 
						|
		if err != nil {
 | 
						|
			t.Fatalf("Error getting list of nodes %v", err)
 | 
						|
		}
 | 
						|
		if i > 100 {
 | 
						|
			t.Fatalf("Time out while waiting for the node informer sync: found %d nodes, expected %d nodes", len(nodeList), nodesNum)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
		nodeList, err = informerFactory.Core().V1().Nodes().Lister().List(labels.Everything())
 | 
						|
		i++
 | 
						|
	}
 | 
						|
	i = 0
 | 
						|
	podList, err := informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
 | 
						|
	for len(podList) < podsNum {
 | 
						|
		if err != nil {
 | 
						|
			t.Fatalf("Error getting list of nodes %v", err)
 | 
						|
		}
 | 
						|
		if i > 100 {
 | 
						|
			t.Fatalf("Time out while waiting for the pod informer sync: found %d pods, expected %d pods", len(podList), podsNum)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
		podList, err = informerFactory.Core().V1().Pods().Lister().List(labels.Everything())
 | 
						|
		i++
 | 
						|
	}
 | 
						|
	i = 0
 | 
						|
	csiNodesList, err := informerFactory.Storage().V1().CSINodes().Lister().List(labels.Everything())
 | 
						|
	for len(csiNodesList) < nodesNum {
 | 
						|
		if err != nil {
 | 
						|
			t.Fatalf("Error getting list of csi nodes %v", err)
 | 
						|
		}
 | 
						|
		if i > 100 {
 | 
						|
			t.Fatalf("Time out while waiting for the csinodes informer sync: found %d csinodes, expected %d csinodes", len(csiNodesList), nodesNum)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
		csiNodesList, err = informerFactory.Storage().V1().CSINodes().Lister().List(labels.Everything())
 | 
						|
		i++
 | 
						|
	}
 | 
						|
 | 
						|
	// Populate ASW
 | 
						|
	err = adc.populateActualStateOfWorld()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, newPod := range extraPods1 {
 | 
						|
		// Add a new pod between ASW and DSW ppoulators
 | 
						|
		_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
 | 
						|
		if err != nil {
 | 
						|
			t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
 | 
						|
		}
 | 
						|
		extraPodsNum++
 | 
						|
		podInformer.GetIndexer().Add(newPod)
 | 
						|
 | 
						|
	}
 | 
						|
 | 
						|
	// Populate DSW
 | 
						|
	err = adc.populateDesiredStateOfWorld()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, newPod := range extraPods2 {
 | 
						|
		// Add a new pod between DSW ppoulator and reconciler run
 | 
						|
		_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
 | 
						|
		if err != nil {
 | 
						|
			t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
 | 
						|
		}
 | 
						|
		extraPodsNum++
 | 
						|
		podInformer.GetIndexer().Add(newPod)
 | 
						|
	}
 | 
						|
 | 
						|
	go adc.reconciler.Run(stopCh)
 | 
						|
	go adc.desiredStateOfWorldPopulator.Run(stopCh)
 | 
						|
	defer close(stopCh)
 | 
						|
 | 
						|
	time.Sleep(time.Second * 1) // Wait so the reconciler calls sync at least once
 | 
						|
 | 
						|
	testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
 | 
						|
	for i = 0; i <= 10; i++ {
 | 
						|
		var attachedVolumesNum int = 0
 | 
						|
		var detachedVolumesNum int = 0
 | 
						|
 | 
						|
		time.Sleep(time.Second * 1) // Wait for a second
 | 
						|
		for _, volumeList := range testPlugin.GetAttachedVolumes() {
 | 
						|
			attachedVolumesNum += len(volumeList)
 | 
						|
		}
 | 
						|
		for _, volumeList := range testPlugin.GetDetachedVolumes() {
 | 
						|
			detachedVolumesNum += len(volumeList)
 | 
						|
		}
 | 
						|
 | 
						|
		// All the "extra pods" should result in volume to be attached, the pods all share one volume
 | 
						|
		// which should be attached (+1), the volumes found only in the nodes status should be detached
 | 
						|
		if attachedVolumesNum == 1+extraPodsNum && detachedVolumesNum == nodesNum {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if i == 10 { // 10 seconds time out
 | 
						|
			t.Fatalf("Waiting for the volumes to attach/detach timed out: attached %d (expected %d); detached %d (%d)",
 | 
						|
				attachedVolumesNum, 1+extraPodsNum, detachedVolumesNum, nodesNum)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if testPlugin.GetErrorEncountered() {
 | 
						|
		t.Fatalf("Fatal error encountered in the testing volume plugin")
 | 
						|
	}
 | 
						|
 | 
						|
}
 |