mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #23771 from jayunit100/ClusterVerificationFramework
Automatic merge from submit-queue Cluster Verification Framework I've spent the last few days looking at the general patterns of verification we have that we tend to reuse in the e2es. Basically, we need - label filters - forEach and WaitFor (where forEach doesn't necessarily waitFor anything). - timeouts - multiple phases (reusable definition of state) - an extensible way to define cluster state that can evolve over time in a data object rather than as a set of parameters that have magic semantics This PR - implements the abstract above functionality declaratively, and w/o hidden semantics. - addresses the sprawling duplicate methods in #23540, so that we can phase out the wrapper methods and replace them with well defined, extensible semantics for cluster state. - fixes the recently discovered #23730 issue (where kubectl.go is relying on examples.go, which is obviously wacky) by using the new framework to implement forEachPod in just a couple of lines and migrating the wrapper function into framework.go. There is some cleanup to do here, but this is seemingly working for a couple of use cases that are important (spark,cassandra,...,kubectl) tests. - i played with a few different ideas and this wound up seeming to be the most natural implementation from a usability standpoint... in any case, just thought id push this up as a first iteration, open to feedback. @kubernetes/sig-testing @timothysc
This commit is contained in:
		@@ -27,7 +27,6 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/labels"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
			
		||||
 | 
			
		||||
	. "github.com/onsi/ginkgo"
 | 
			
		||||
@@ -40,6 +39,14 @@ const (
 | 
			
		||||
 | 
			
		||||
var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
	f := framework.NewDefaultFramework("examples")
 | 
			
		||||
	// Customized ForEach wrapper for this test.
 | 
			
		||||
	forEachPod := func(selectorKey string, selectorValue string, fn func(api.Pod)) {
 | 
			
		||||
		f.NewClusterVerification(
 | 
			
		||||
			framework.PodStateVerification{
 | 
			
		||||
				Selectors:   map[string]string{selectorKey: selectorValue},
 | 
			
		||||
				ValidPhases: []api.PodPhase{api.PodRunning},
 | 
			
		||||
			}).ForEach(fn)
 | 
			
		||||
	}
 | 
			
		||||
	var c *client.Client
 | 
			
		||||
	var ns string
 | 
			
		||||
	BeforeEach(func() {
 | 
			
		||||
@@ -85,13 +92,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
 | 
			
		||||
			By("checking up the services")
 | 
			
		||||
			checkAllLogs := func() {
 | 
			
		||||
				forEachPod(c, ns, "name", "redis", func(pod api.Pod) {
 | 
			
		||||
				forEachPod("name", "redis", func(pod api.Pod) {
 | 
			
		||||
					if pod.Name != bootstrapPodName {
 | 
			
		||||
						_, err := framework.LookForStringInLog(ns, pod.Name, "redis", expectedOnServer, serverStartTimeout)
 | 
			
		||||
						Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
					}
 | 
			
		||||
				})
 | 
			
		||||
				forEachPod(c, ns, "name", "redis-sentinel", func(pod api.Pod) {
 | 
			
		||||
				forEachPod("name", "redis-sentinel", func(pod api.Pod) {
 | 
			
		||||
					if pod.Name != bootstrapPodName {
 | 
			
		||||
						_, err := framework.LookForStringInLog(ns, pod.Name, "sentinel", expectedOnSentinel, serverStartTimeout)
 | 
			
		||||
						Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
@@ -124,7 +131,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
			By("starting rabbitmq")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", rabbitmqServiceYaml, nsFlag)
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", rabbitmqControllerYaml, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "component", "rabbitmq", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("component", "rabbitmq", func(pod api.Pod) {
 | 
			
		||||
				_, err := framework.LookForStringInLog(ns, pod.Name, "rabbitmq", "Server startup complete", serverStartTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
			})
 | 
			
		||||
@@ -133,7 +140,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
 | 
			
		||||
			By("starting celery")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", celeryControllerYaml, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "component", "celery", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("component", "celery", func(pod api.Pod) {
 | 
			
		||||
				_, err := framework.LookForStringInFile(ns, pod.Name, "celery", "/data/celery.log", " ready.", serverStartTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
			})
 | 
			
		||||
@@ -141,14 +148,16 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
			By("starting flower")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", flowerServiceYaml, nsFlag)
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", flowerControllerYaml, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "component", "flower", func(pod api.Pod) {
 | 
			
		||||
				// Do nothing. just wait for it to be up and running.
 | 
			
		||||
			forEachPod("component", "flower", func(pod api.Pod) {
 | 
			
		||||
 | 
			
		||||
			})
 | 
			
		||||
			forEachPod("component", "flower", func(pod api.Pod) {
 | 
			
		||||
				content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
				if !strings.Contains(content, "<title>Celery Flower</title>") {
 | 
			
		||||
					framework.Failf("Flower HTTP request failed")
 | 
			
		||||
				}
 | 
			
		||||
			})
 | 
			
		||||
			content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout)
 | 
			
		||||
			Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
			if !strings.Contains(content, "<title>Celery Flower</title>") {
 | 
			
		||||
				framework.Failf("Flower HTTP request failed")
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
@@ -172,7 +181,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
				framework.Logf("Now polling for Master startup...")
 | 
			
		||||
 | 
			
		||||
				// Only one master pod: But its a natural way to look up pod names.
 | 
			
		||||
				forEachPod(c, ns, "component", "spark-master", func(pod api.Pod) {
 | 
			
		||||
				forEachPod("component", "spark-master", func(pod api.Pod) {
 | 
			
		||||
					framework.Logf("Now waiting for master to startup in %v", pod.Name)
 | 
			
		||||
					_, err := framework.LookForStringInLog(ns, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
 | 
			
		||||
					Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
@@ -181,6 +190,12 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
				By("waiting for master endpoint")
 | 
			
		||||
				err := framework.WaitForEndpoint(c, ns, "spark-master")
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
				forEachPod("component", "spark-master", func(pod api.Pod) {
 | 
			
		||||
					_, maErr := framework.LookForStringInLog(f.Namespace.Name, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
 | 
			
		||||
					if maErr != nil {
 | 
			
		||||
						framework.Failf("Didn't find target string. error:", maErr)
 | 
			
		||||
					}
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
			worker := func() {
 | 
			
		||||
				By("starting workers")
 | 
			
		||||
@@ -191,10 +206,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
				// framework.ScaleRC(c, ns, "spark-worker-controller", 2, true)
 | 
			
		||||
 | 
			
		||||
				framework.Logf("Now polling for worker startup...")
 | 
			
		||||
				forEachPod(c, ns, "component", "spark-worker", func(pod api.Pod) {
 | 
			
		||||
					_, err := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
 | 
			
		||||
					Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
				})
 | 
			
		||||
				// ScaleRC(c, ns, "spark-worker-controller", 2, true)
 | 
			
		||||
				framework.Logf("Now polling for worker startup...")
 | 
			
		||||
				forEachPod("component", "spark-worker",
 | 
			
		||||
					func(pod api.Pod) {
 | 
			
		||||
						_, slaveErr := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
 | 
			
		||||
						Expect(slaveErr).NotTo(HaveOccurred())
 | 
			
		||||
					})
 | 
			
		||||
			}
 | 
			
		||||
			// Run the worker verification after we turn up the master.
 | 
			
		||||
			defer worker()
 | 
			
		||||
@@ -221,7 +239,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
			// Create an RC with n nodes in it.  Each node will then be verified.
 | 
			
		||||
			By("Creating a Cassandra RC")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("app", "cassandra", func(pod api.Pod) {
 | 
			
		||||
				framework.Logf("Verifying pod %v ", pod.Name)
 | 
			
		||||
				_, err = framework.LookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
@@ -231,7 +249,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
 | 
			
		||||
			By("Finding each node in the nodetool status lines")
 | 
			
		||||
			output := framework.RunKubectlOrDie("exec", "cassandra", nsFlag, "--", "nodetool", "status")
 | 
			
		||||
			forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("app", "cassandra", func(pod api.Pod) {
 | 
			
		||||
				if !strings.Contains(output, pod.Status.PodIP) {
 | 
			
		||||
					framework.Failf("Pod ip %s not found in nodetool status", pod.Status.PodIP)
 | 
			
		||||
				}
 | 
			
		||||
@@ -275,7 +293,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
 | 
			
		||||
			By("starting workers")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", workerControllerJson, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "name", "storm-worker", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("name", "storm-worker", func(pod api.Pod) {
 | 
			
		||||
				//do nothing, just wait for the pod to be running
 | 
			
		||||
			})
 | 
			
		||||
			// TODO: Add logging configuration to nimbus & workers images and then
 | 
			
		||||
@@ -402,7 +420,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", driverServiceYaml, nsFlag)
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", rethinkDbControllerYaml, nsFlag)
 | 
			
		||||
			checkDbInstances := func() {
 | 
			
		||||
				forEachPod(c, ns, "db", "rethinkdb", func(pod api.Pod) {
 | 
			
		||||
				forEachPod("db", "rethinkdb", func(pod api.Pod) {
 | 
			
		||||
					_, err := framework.LookForStringInLog(ns, pod.Name, "rethinkdb", "Server ready", serverStartTimeout)
 | 
			
		||||
					Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
				})
 | 
			
		||||
@@ -441,7 +459,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
			By("starting hazelcast")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", serviceYaml, nsFlag)
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("name", "hazelcast", func(pod api.Pod) {
 | 
			
		||||
				_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [1]", serverStartTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
				_, err = framework.LookForStringInLog(ns, pod.Name, "hazelcast", "is STARTED", serverStartTimeout)
 | 
			
		||||
@@ -453,7 +471,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
 | 
			
		||||
 | 
			
		||||
			By("scaling hazelcast")
 | 
			
		||||
			framework.ScaleRC(c, ns, "hazelcast", 2, true)
 | 
			
		||||
			forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
 | 
			
		||||
			forEachPod("name", "hazelcast", func(pod api.Pod) {
 | 
			
		||||
				_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
			})
 | 
			
		||||
@@ -491,29 +509,3 @@ func prepareResourceWithReplacedString(inputFile, old, new string) string {
 | 
			
		||||
	podYaml := strings.Replace(string(data), old, new, 1)
 | 
			
		||||
	return podYaml
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func forEachPod(c *client.Client, ns, selectorKey, selectorValue string, fn func(api.Pod)) {
 | 
			
		||||
	pods := []*api.Pod{}
 | 
			
		||||
	for t := time.Now(); time.Since(t) < framework.PodListTimeout; time.Sleep(framework.Poll) {
 | 
			
		||||
		selector := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue}))
 | 
			
		||||
		options := api.ListOptions{LabelSelector: selector}
 | 
			
		||||
		podList, err := c.Pods(ns).List(options)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
		for _, pod := range podList.Items {
 | 
			
		||||
			if pod.Status.Phase == api.PodPending || pod.Status.Phase == api.PodRunning {
 | 
			
		||||
				pods = append(pods, &pod)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if len(pods) > 0 {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if pods == nil || len(pods) == 0 {
 | 
			
		||||
		framework.Failf("No pods found")
 | 
			
		||||
	}
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		err := framework.WaitForPodRunningInNamespace(c, pod.Name, ns)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
		fn(*pod)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -36,6 +36,8 @@ import (
 | 
			
		||||
 | 
			
		||||
	. "github.com/onsi/ginkgo"
 | 
			
		||||
	. "github.com/onsi/gomega"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/labels"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
@@ -423,3 +425,174 @@ func kubectlExec(namespace string, podName, containerName string, args ...string
 | 
			
		||||
func KubeDescribe(text string, body func()) bool {
 | 
			
		||||
	return Describe("[k8s.io] "+text, body)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PodStateVerification represents a verification of pod state.
 | 
			
		||||
// Any time you have a set of pods that you want to operate against or query,
 | 
			
		||||
// this struct can be used to declaratively identify those pods.
 | 
			
		||||
type PodStateVerification struct {
 | 
			
		||||
	// Optional: only pods that have k=v labels will pass this filter.
 | 
			
		||||
	Selectors map[string]string
 | 
			
		||||
 | 
			
		||||
	// Required: The phases which are valid for your pod.
 | 
			
		||||
	ValidPhases []api.PodPhase
 | 
			
		||||
 | 
			
		||||
	// Optional: only pods passing this function will pass the filter
 | 
			
		||||
	// Verify a pod.
 | 
			
		||||
	// As an optimization, in addition to specfying filter (boolean),
 | 
			
		||||
	// this function allows specifying an error as well.
 | 
			
		||||
	// The error indicates that the polling of the pod spectrum should stop.
 | 
			
		||||
	Verify func(api.Pod) (bool, error)
 | 
			
		||||
 | 
			
		||||
	// Optional: only pods with this name will pass the filter.
 | 
			
		||||
	PodName string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ClusterVerification struct {
 | 
			
		||||
	client    *client.Client
 | 
			
		||||
	namespace *api.Namespace // pointer rather than string, since ns isn't created until before each.
 | 
			
		||||
	podState  PodStateVerification
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *Framework) NewClusterVerification(filter PodStateVerification) *ClusterVerification {
 | 
			
		||||
	return &ClusterVerification{
 | 
			
		||||
		f.Client,
 | 
			
		||||
		f.Namespace,
 | 
			
		||||
		filter,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func passesPodNameFilter(pod api.Pod, name string) bool {
 | 
			
		||||
	return name == "" || strings.Contains(pod.Name, name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func passesVerifyFilter(pod api.Pod, verify func(p api.Pod) (bool, error)) (bool, error) {
 | 
			
		||||
	if verify == nil {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	} else {
 | 
			
		||||
		verified, err := verify(pod)
 | 
			
		||||
		// If an error is returned, by definition, pod verification fails
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		} else {
 | 
			
		||||
			return verified, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func passesPhasesFilter(pod api.Pod, validPhases []api.PodPhase) bool {
 | 
			
		||||
	passesPhaseFilter := false
 | 
			
		||||
	for _, phase := range validPhases {
 | 
			
		||||
		if pod.Status.Phase == phase {
 | 
			
		||||
			passesPhaseFilter = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return passesPhaseFilter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// filterLabels returns a list of pods which have labels.
 | 
			
		||||
func filterLabels(selectors map[string]string, cli *client.Client, ns string) (*api.PodList, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	var selector labels.Selector
 | 
			
		||||
	var pl *api.PodList
 | 
			
		||||
	// List pods based on selectors.  This might be a tiny optimization rather then filtering
 | 
			
		||||
	// everything manually.
 | 
			
		||||
	if len(selectors) > 0 {
 | 
			
		||||
		selector = labels.SelectorFromSet(labels.Set(selectors))
 | 
			
		||||
		options := api.ListOptions{LabelSelector: selector}
 | 
			
		||||
		pl, err = cli.Pods(ns).List(options)
 | 
			
		||||
	} else {
 | 
			
		||||
		pl, err = cli.Pods(ns).List(api.ListOptions{})
 | 
			
		||||
	}
 | 
			
		||||
	return pl, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// filter filters pods which pass a filter.  It can be used to compose
 | 
			
		||||
// the more useful abstractions like ForEach, WaitFor, and so on, which
 | 
			
		||||
// can be used directly by tests.
 | 
			
		||||
func (p *PodStateVerification) filter(c *client.Client, namespace *api.Namespace) ([]api.Pod, error) {
 | 
			
		||||
	if len(p.ValidPhases) == 0 || namespace == nil {
 | 
			
		||||
		panic(fmt.Errorf("Need to specify a valid pod phases (%v) and namespace (%v). ", p.ValidPhases, namespace))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ns := namespace.Name
 | 
			
		||||
	pl, err := filterLabels(p.Selectors, c, ns) // Build an api.PodList to operate against.
 | 
			
		||||
	Logf("Selector matched %v pods for %v", len(pl.Items), p.Selectors)
 | 
			
		||||
	if len(pl.Items) == 0 || err != nil {
 | 
			
		||||
		return pl.Items, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	unfilteredPods := pl.Items
 | 
			
		||||
	filteredPods := []api.Pod{}
 | 
			
		||||
ReturnPodsSoFar:
 | 
			
		||||
	// Next: Pod must match at least one of the states that the user specified
 | 
			
		||||
	for _, pod := range unfilteredPods {
 | 
			
		||||
		if !(passesPhasesFilter(pod, p.ValidPhases) && passesPodNameFilter(pod, p.PodName)) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		passesVerify, err := passesVerifyFilter(pod, p.Verify)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			Logf("Error detected on %v : %v !", pod.Name, err)
 | 
			
		||||
			break ReturnPodsSoFar
 | 
			
		||||
		}
 | 
			
		||||
		if passesVerify {
 | 
			
		||||
			filteredPods = append(filteredPods, pod)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return filteredPods, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitFor waits for some minimum number of pods to be verified, according to the PodStateVerification
 | 
			
		||||
// definition.
 | 
			
		||||
func (cl *ClusterVerification) WaitFor(atLeast int, timeout time.Duration) ([]api.Pod, error) {
 | 
			
		||||
	pods := []api.Pod{}
 | 
			
		||||
	var returnedErr error
 | 
			
		||||
 | 
			
		||||
	err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
 | 
			
		||||
		pods, returnedErr = cl.podState.filter(cl.client, cl.namespace)
 | 
			
		||||
 | 
			
		||||
		// Failure
 | 
			
		||||
		if returnedErr != nil {
 | 
			
		||||
			Logf("Cutting polling short: We got an error from the pod filtering layer.")
 | 
			
		||||
			// stop polling if the pod filtering returns an error.  that should never happen.
 | 
			
		||||
			// it indicates, for example, that the client is broken or something non-pod related.
 | 
			
		||||
			return false, returnedErr
 | 
			
		||||
		}
 | 
			
		||||
		Logf("Found %v / %v", len(pods), atLeast)
 | 
			
		||||
 | 
			
		||||
		// Success
 | 
			
		||||
		if len(pods) >= atLeast {
 | 
			
		||||
			return true, nil
 | 
			
		||||
		}
 | 
			
		||||
		// Keep trying...
 | 
			
		||||
		return false, nil
 | 
			
		||||
	})
 | 
			
		||||
	Logf("WaitFor completed.  Pods found = %v out of %v", timeout, len(pods), atLeast)
 | 
			
		||||
	return pods, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForOrFail provides a shorthand WaitFor with failure as an option if anything goes wrong.
 | 
			
		||||
func (cl *ClusterVerification) WaitForOrFail(atLeast int, timeout time.Duration) {
 | 
			
		||||
	pods, err := cl.WaitFor(atLeast, timeout)
 | 
			
		||||
	if err != nil || len(pods) < atLeast {
 | 
			
		||||
		Failf("Verified %v of %v pods , error : %v", len(pods), atLeast, err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ForEach runs a function against every verifiable pod.  Be warned that this doesn't wait for "n" pods to verifiy,
 | 
			
		||||
// so it may return very quickly if you have strict pod state requirements.
 | 
			
		||||
//
 | 
			
		||||
// For example, if you require at least 5 pods to be running before your test will pass,
 | 
			
		||||
// its smart to first call "clusterVerification.WaitFor(5)" before you call clusterVerification.ForEach.
 | 
			
		||||
func (cl *ClusterVerification) ForEach(podFunc func(api.Pod)) error {
 | 
			
		||||
	pods, err := cl.podState.filter(cl.client, cl.namespace)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		Logf("ForEach: Found %v pods from the filter.  Now looping through them.", len(pods))
 | 
			
		||||
		for _, p := range pods {
 | 
			
		||||
			podFunc(p)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		Logf("ForEach: Something went wrong when filtering pods to execute against: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -115,6 +115,25 @@ var (
 | 
			
		||||
var _ = framework.KubeDescribe("Kubectl client", func() {
 | 
			
		||||
	defer GinkgoRecover()
 | 
			
		||||
	f := framework.NewDefaultFramework("kubectl")
 | 
			
		||||
 | 
			
		||||
	// Reustable cluster state function.  This won't be adversly affected by lazy initialization of framework.
 | 
			
		||||
	clusterState := func() *framework.ClusterVerification {
 | 
			
		||||
		return f.NewClusterVerification(
 | 
			
		||||
			framework.PodStateVerification{
 | 
			
		||||
				Selectors:   map[string]string{"app": "redis"},
 | 
			
		||||
				ValidPhases: []api.PodPhase{api.PodRunning /*api.PodPending*/},
 | 
			
		||||
			})
 | 
			
		||||
	}
 | 
			
		||||
	// Customized Wait  / ForEach wrapper for this test.  These demonstrate the
 | 
			
		||||
	// idiomatic way to wrap the ClusterVerification structs for syntactic sugar in large
 | 
			
		||||
	// test files.
 | 
			
		||||
	waitFor := func(atLeast int) {
 | 
			
		||||
		// 60 seconds can be flakey for some of the containers.
 | 
			
		||||
		clusterState().WaitFor(atLeast, 90*time.Second)
 | 
			
		||||
	}
 | 
			
		||||
	forEachPod := func(podFunc func(p api.Pod)) {
 | 
			
		||||
		clusterState().ForEach(podFunc)
 | 
			
		||||
	}
 | 
			
		||||
	var c *client.Client
 | 
			
		||||
	var ns string
 | 
			
		||||
	BeforeEach(func() {
 | 
			
		||||
@@ -588,8 +607,10 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag)
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", serviceJson, nsFlag)
 | 
			
		||||
 | 
			
		||||
			// Wait for the redis pods to come online...
 | 
			
		||||
			waitFor(1)
 | 
			
		||||
			// Pod
 | 
			
		||||
			forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
 | 
			
		||||
			forEachPod(func(pod api.Pod) {
 | 
			
		||||
				output := framework.RunKubectlOrDie("describe", "pod", pod.Name, nsFlag)
 | 
			
		||||
				requiredStrings := [][]string{
 | 
			
		||||
					{"Name:", "redis-master-"},
 | 
			
		||||
@@ -684,8 +705,11 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
 | 
			
		||||
			redisPort := 6379
 | 
			
		||||
 | 
			
		||||
			By("creating Redis RC")
 | 
			
		||||
 | 
			
		||||
			framework.Logf("namespace %v", ns)
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag)
 | 
			
		||||
			forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
 | 
			
		||||
			forEachPod(func(pod api.Pod) {
 | 
			
		||||
				framework.Logf("wait on %v ", ns)
 | 
			
		||||
				framework.LookForStringInLog(ns, pod.Name, "redis-master", "The server is now ready to accept connections", framework.PodStartTimeout)
 | 
			
		||||
			})
 | 
			
		||||
			validateService := func(name string, servicePort int, timeout time.Duration) {
 | 
			
		||||
@@ -799,7 +823,7 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
 | 
			
		||||
		It("should be able to retrieve and filter logs [Conformance]", func() {
 | 
			
		||||
			framework.SkipUnlessServerVersionGTE(extendedPodLogFilterVersion, c)
 | 
			
		||||
 | 
			
		||||
			forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
 | 
			
		||||
			forEachPod(func(pod api.Pod) {
 | 
			
		||||
				By("checking for a matching strings")
 | 
			
		||||
				_, err := framework.LookForStringInLog(ns, pod.Name, containerName, "The server is now ready to accept connections", framework.PodStartTimeout)
 | 
			
		||||
				Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
@@ -850,12 +874,12 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
 | 
			
		||||
			By("creating Redis RC")
 | 
			
		||||
			framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag)
 | 
			
		||||
			By("patching all pods")
 | 
			
		||||
			forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
 | 
			
		||||
			forEachPod(func(pod api.Pod) {
 | 
			
		||||
				framework.RunKubectlOrDie("patch", "pod", pod.Name, nsFlag, "-p", "{\"metadata\":{\"annotations\":{\"x\":\"y\"}}}")
 | 
			
		||||
			})
 | 
			
		||||
 | 
			
		||||
			By("checking annotations")
 | 
			
		||||
			forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
 | 
			
		||||
			forEachPod(func(pod api.Pod) {
 | 
			
		||||
				found := false
 | 
			
		||||
				for key, val := range pod.Annotations {
 | 
			
		||||
					if key == "x" && val == "y" {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user