mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	e2e for cluster-autoscaler unhealthy cluster handling
Moved testUnderNetworkFailure function to framework, so it can be reused.
This commit is contained in:
		@@ -20,8 +20,10 @@ import (
 | 
				
			|||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"io/ioutil"
 | 
						"io/ioutil"
 | 
				
			||||||
 | 
						"math"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"os/exec"
 | 
						"os/exec"
 | 
				
			||||||
 | 
						"regexp"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@@ -51,12 +53,14 @@ const (
 | 
				
			|||||||
	scaleUpTimeout      = 5 * time.Minute
 | 
						scaleUpTimeout      = 5 * time.Minute
 | 
				
			||||||
	scaleDownTimeout    = 15 * time.Minute
 | 
						scaleDownTimeout    = 15 * time.Minute
 | 
				
			||||||
	podTimeout          = 2 * time.Minute
 | 
						podTimeout          = 2 * time.Minute
 | 
				
			||||||
 | 
						nodesRecoverTimeout = 5 * time.Minute
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	gkeEndpoint      = "https://test-container.sandbox.googleapis.com"
 | 
						gkeEndpoint      = "https://test-container.sandbox.googleapis.com"
 | 
				
			||||||
	gkeUpdateTimeout = 15 * time.Minute
 | 
						gkeUpdateTimeout = 15 * time.Minute
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	disabledTaint             = "DisabledForAutoscalingTest"
 | 
						disabledTaint             = "DisabledForAutoscalingTest"
 | 
				
			||||||
	newNodesForScaledownTests = 2
 | 
						newNodesForScaledownTests = 2
 | 
				
			||||||
 | 
						unhealthyClusterThreshold = 4
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
					var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			||||||
@@ -354,6 +358,48 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						It("Shouldn't perform scale up operation and should list unhealthy status if most of the cluster is broken[Feature:ClusterSizeAutoscalingScaleUp]", func() {
 | 
				
			||||||
 | 
							clusterSize := nodeCount
 | 
				
			||||||
 | 
							for clusterSize < unhealthyClusterThreshold+1 {
 | 
				
			||||||
 | 
								clusterSize = manuallyIncreaseClusterSize(f, originalSizes)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Block network connectivity to some nodes to simulate unhealthy cluster")
 | 
				
			||||||
 | 
							nodesToBreakCount := int(math.Floor(math.Max(float64(unhealthyClusterThreshold), 0.5*float64(clusterSize))))
 | 
				
			||||||
 | 
							nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
 | 
				
			||||||
 | 
								"spec.unschedulable": "false",
 | 
				
			||||||
 | 
							}.AsSelector().String()})
 | 
				
			||||||
 | 
							framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							Expect(nodesToBreakCount <= len(nodes.Items)).To(BeTrue())
 | 
				
			||||||
 | 
							nodesToBreak := nodes.Items[:nodesToBreakCount]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// TestUnderTemporaryNetworkFailure only removes connectivity to a single node,
 | 
				
			||||||
 | 
							// and accepts func() callback. This is expanding the loop to recursive call
 | 
				
			||||||
 | 
							// to avoid duplicating TestUnderTemporaryNetworkFailure
 | 
				
			||||||
 | 
							var testFunction func()
 | 
				
			||||||
 | 
							testFunction = func() {
 | 
				
			||||||
 | 
								if len(nodesToBreak) > 0 {
 | 
				
			||||||
 | 
									ntb := &nodesToBreak[0]
 | 
				
			||||||
 | 
									nodesToBreak = nodesToBreak[1:]
 | 
				
			||||||
 | 
									framework.TestUnderTemporaryNetworkFailure(c, "default", ntb, testFunction)
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false)
 | 
				
			||||||
 | 
									defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
 | 
				
			||||||
 | 
									time.Sleep(scaleUpTimeout)
 | 
				
			||||||
 | 
									currentNodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
 | 
				
			||||||
 | 
									framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
 | 
				
			||||||
 | 
									Expect(len(currentNodes.Items)).Should(Equal(len(nodes.Items) - nodesToBreakCount))
 | 
				
			||||||
 | 
									status, err := getClusterwideStatus(c)
 | 
				
			||||||
 | 
									framework.Logf("Clusterwide status: %v", status)
 | 
				
			||||||
 | 
									framework.ExpectNoError(err)
 | 
				
			||||||
 | 
									Expect(status).Should(Equal("Unhealthy"))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							testFunction()
 | 
				
			||||||
 | 
							// Give nodes time to recover from network failure
 | 
				
			||||||
 | 
							framework.ExpectNoError(framework.WaitForClusterSize(c, len(nodes.Items), nodesRecoverTimeout))
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
})
 | 
					})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func runDrainTest(f *framework.Framework, migSizes map[string]int, podsPerNode, pdbSize int, verifyFunction func(int)) {
 | 
					func runDrainTest(f *framework.Framework, migSizes map[string]int, podsPerNode, pdbSize int, verifyFunction func(int)) {
 | 
				
			||||||
@@ -828,3 +874,25 @@ func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[strin
 | 
				
			|||||||
		func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
 | 
							func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
 | 
				
			||||||
	return increasedSize
 | 
						return increasedSize
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Try to get clusterwide health from CA status configmap.
 | 
				
			||||||
 | 
					// Status configmap is not parsing-friendly, so evil regexpery follows.
 | 
				
			||||||
 | 
					func getClusterwideStatus(c clientset.Interface) (string, error) {
 | 
				
			||||||
 | 
						configMap, err := c.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return "", err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						status, ok := configMap.Data["status"]
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							return "", fmt.Errorf("Status information not found in configmap")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						matcher, err := regexp.Compile("Cluster-wide:\\s*\n\\s*Health:\\s*([A-Za-z]+)")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return "", err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						result := matcher.FindStringSubmatch(status)
 | 
				
			||||||
 | 
						if len(result) < 2 {
 | 
				
			||||||
 | 
							return "", fmt.Errorf("Failed to parse CA status configmap")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return result[1], nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,6 +39,7 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
						"k8s.io/kubernetes/pkg/api/v1"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
				
			||||||
	coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
 | 
						coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -826,3 +827,35 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
 | 
				
			||||||
 | 
					// At the end (even in case of errors), the network traffic is brought back to normal.
 | 
				
			||||||
 | 
					// This function executes commands on a node so it will work only for some
 | 
				
			||||||
 | 
					// environments.
 | 
				
			||||||
 | 
					func TestUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
 | 
				
			||||||
 | 
						host := GetNodeExternalIP(node)
 | 
				
			||||||
 | 
						master := GetMasterAddress(c)
 | 
				
			||||||
 | 
						By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
 | 
				
			||||||
 | 
						defer func() {
 | 
				
			||||||
 | 
							// This code will execute even if setting the iptables rule failed.
 | 
				
			||||||
 | 
							// It is on purpose because we may have an error even if the new rule
 | 
				
			||||||
 | 
							// had been inserted. (yes, we could look at the error code and ssh error
 | 
				
			||||||
 | 
							// separately, but I prefer to stay on the safe side).
 | 
				
			||||||
 | 
							By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
 | 
				
			||||||
 | 
							UnblockNetwork(host, master)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
 | 
				
			||||||
 | 
						if !WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
 | 
				
			||||||
 | 
							Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						BlockNetwork(host, master)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
 | 
				
			||||||
 | 
						if !WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
 | 
				
			||||||
 | 
							Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						testFunc()
 | 
				
			||||||
 | 
						// network traffic is unblocked in a deferred function
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -41,38 +41,6 @@ import (
 | 
				
			|||||||
	. "github.com/onsi/gomega"
 | 
						. "github.com/onsi/gomega"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
 | 
					 | 
				
			||||||
// At the end (even in case of errors), the network traffic is brought back to normal.
 | 
					 | 
				
			||||||
// This function executes commands on a node so it will work only for some
 | 
					 | 
				
			||||||
// environments.
 | 
					 | 
				
			||||||
func testUnderTemporaryNetworkFailure(c clientset.Interface, ns string, node *v1.Node, testFunc func()) {
 | 
					 | 
				
			||||||
	host := framework.GetNodeExternalIP(node)
 | 
					 | 
				
			||||||
	master := framework.GetMasterAddress(c)
 | 
					 | 
				
			||||||
	By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
 | 
					 | 
				
			||||||
	defer func() {
 | 
					 | 
				
			||||||
		// This code will execute even if setting the iptables rule failed.
 | 
					 | 
				
			||||||
		// It is on purpose because we may have an error even if the new rule
 | 
					 | 
				
			||||||
		// had been inserted. (yes, we could look at the error code and ssh error
 | 
					 | 
				
			||||||
		// separately, but I prefer to stay on the safe side).
 | 
					 | 
				
			||||||
		By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
 | 
					 | 
				
			||||||
		framework.UnblockNetwork(host, master)
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
 | 
					 | 
				
			||||||
	if !framework.WaitForNodeToBe(c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
 | 
					 | 
				
			||||||
		framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	framework.BlockNetwork(host, master)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
 | 
					 | 
				
			||||||
	if !framework.WaitForNodeToBe(c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
 | 
					 | 
				
			||||||
		framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	testFunc()
 | 
					 | 
				
			||||||
	// network traffic is unblocked in a deferred function
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func expectNodeReadiness(isReady bool, newNode chan *v1.Node) {
 | 
					func expectNodeReadiness(isReady bool, newNode chan *v1.Node) {
 | 
				
			||||||
	timeout := false
 | 
						timeout := false
 | 
				
			||||||
	expected := false
 | 
						expected := false
 | 
				
			||||||
@@ -281,7 +249,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
 | 
				
			|||||||
			// Finally, it checks that the replication controller recreates the
 | 
								// Finally, it checks that the replication controller recreates the
 | 
				
			||||||
			// pods on another node and that now the number of replicas is equal 'replicas'.
 | 
								// pods on another node and that now the number of replicas is equal 'replicas'.
 | 
				
			||||||
			By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
 | 
								By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
 | 
				
			||||||
			testUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
								framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
				
			||||||
				framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
 | 
									framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
 | 
				
			||||||
				err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
 | 
									err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
 | 
				
			||||||
				Expect(err).NotTo(HaveOccurred())
 | 
									Expect(err).NotTo(HaveOccurred())
 | 
				
			||||||
@@ -346,7 +314,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
 | 
				
			|||||||
			// Finally, it checks that the replication controller recreates the
 | 
								// Finally, it checks that the replication controller recreates the
 | 
				
			||||||
			// pods on another node and that now the number of replicas is equal 'replicas + 1'.
 | 
								// pods on another node and that now the number of replicas is equal 'replicas + 1'.
 | 
				
			||||||
			By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
 | 
								By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
 | 
				
			||||||
			testUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
								framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
				
			||||||
				framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
 | 
									framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
 | 
				
			||||||
				err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
 | 
									err := framework.WaitForRCPodToDisappear(c, ns, name, pods.Items[0].Name)
 | 
				
			||||||
				Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
 | 
									Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
 | 
				
			||||||
@@ -421,7 +389,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
 | 
				
			|||||||
			// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
 | 
								// Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
 | 
				
			||||||
			// that belongs to StatefulSet 'statefulSetName', **does not** disappear due to forced deletion from the apiserver.
 | 
								// that belongs to StatefulSet 'statefulSetName', **does not** disappear due to forced deletion from the apiserver.
 | 
				
			||||||
			// The grace period on the stateful pods is set to a value > 0.
 | 
								// The grace period on the stateful pods is set to a value > 0.
 | 
				
			||||||
			testUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
								framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
				
			||||||
				framework.Logf("Checking that the NodeController does not force delete stateful pods %v", pod.Name)
 | 
									framework.Logf("Checking that the NodeController does not force delete stateful pods %v", pod.Name)
 | 
				
			||||||
				err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, 10*time.Minute)
 | 
									err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, 10*time.Minute)
 | 
				
			||||||
				Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
 | 
									Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
 | 
				
			||||||
@@ -464,7 +432,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
 | 
				
			|||||||
			// This creates a temporary network partition, verifies that the job has 'parallelism' number of
 | 
								// This creates a temporary network partition, verifies that the job has 'parallelism' number of
 | 
				
			||||||
			// running pods after the node-controller detects node unreachable.
 | 
								// running pods after the node-controller detects node unreachable.
 | 
				
			||||||
			By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
 | 
								By(fmt.Sprintf("blocking network traffic from node %s", node.Name))
 | 
				
			||||||
			testUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
								framework.TestUnderTemporaryNetworkFailure(c, ns, node, func() {
 | 
				
			||||||
				framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
 | 
									framework.Logf("Waiting for pod %s to be removed", pods.Items[0].Name)
 | 
				
			||||||
				err := framework.WaitForPodToDisappear(c, ns, pods.Items[0].Name, label, 20*time.Second, 10*time.Minute)
 | 
									err := framework.WaitForPodToDisappear(c, ns, pods.Items[0].Name, label, 20*time.Second, 10*time.Minute)
 | 
				
			||||||
				Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
 | 
									Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user