mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Let controllers ignore initialization timeout error when creating a pod.
This commit is contained in:
		@@ -580,7 +580,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
 | 
			
		||||
	}
 | 
			
		||||
	if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil {
 | 
			
		||||
		r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
 | 
			
		||||
		return fmt.Errorf("unable to create pods: %v", err)
 | 
			
		||||
		return err
 | 
			
		||||
	} else {
 | 
			
		||||
		accessor, err := meta.Accessor(object)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -816,7 +816,18 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet
 | 
			
		||||
	for i := 0; i < createDiff; i++ {
 | 
			
		||||
		go func(ix int) {
 | 
			
		||||
			defer createWait.Done()
 | 
			
		||||
			if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil {
 | 
			
		||||
			err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds))
 | 
			
		||||
			if err != nil && errors.IsTimeout(err) {
 | 
			
		||||
				// Pod is created but its initialization has timed out.
 | 
			
		||||
				// If the initialization is successful eventually, the
 | 
			
		||||
				// controller will observe the creation via the informer.
 | 
			
		||||
				// If the initialization fails, or if the pod keeps
 | 
			
		||||
				// uninitialized for a long time, the informer will not
 | 
			
		||||
				// receive any update, and the controller will create a new
 | 
			
		||||
				// pod when the expectation expires.
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
 | 
			
		||||
				dsc.expectations.CreationObserved(dsKey)
 | 
			
		||||
				errCh <- err
 | 
			
		||||
 
 | 
			
		||||
@@ -624,7 +624,18 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
 | 
			
		||||
		for i := int32(0); i < diff; i++ {
 | 
			
		||||
			go func() {
 | 
			
		||||
				defer wait.Done()
 | 
			
		||||
				if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil {
 | 
			
		||||
				err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job))
 | 
			
		||||
				if err != nil && errors.IsTimeout(err) {
 | 
			
		||||
					// Pod is created but its initialization has timed out.
 | 
			
		||||
					// If the initialization is successful eventually, the
 | 
			
		||||
					// controller will observe the creation via the informer.
 | 
			
		||||
					// If the initialization fails, or if the pod keeps
 | 
			
		||||
					// uninitialized for a long time, the informer will not
 | 
			
		||||
					// receive any update, and the controller will create a new
 | 
			
		||||
					// pod when the expectation expires.
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					defer utilruntime.HandleError(err)
 | 
			
		||||
					// Decrement the expected number of creates because the informer won't observe this pod
 | 
			
		||||
					glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
 | 
			
		||||
 
 | 
			
		||||
@@ -466,6 +466,16 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
 | 
			
		||||
					Controller:         boolPtr(true),
 | 
			
		||||
				}
 | 
			
		||||
				err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
 | 
			
		||||
				if err != nil && errors.IsTimeout(err) {
 | 
			
		||||
					// Pod is created but its initialization has timed out.
 | 
			
		||||
					// If the initialization is successful eventually, the
 | 
			
		||||
					// controller will observe the creation via the informer.
 | 
			
		||||
					// If the initialization fails, or if the pod keeps
 | 
			
		||||
					// uninitialized for a long time, the informer will not
 | 
			
		||||
					// receive any update, and the controller will create a new
 | 
			
		||||
					// pod when the expectation expires.
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					// Decrement the expected number of creates because the informer won't observe this pod
 | 
			
		||||
					glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
 | 
			
		||||
 
 | 
			
		||||
@@ -462,6 +462,16 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl
 | 
			
		||||
					Controller:         boolPtr(true),
 | 
			
		||||
				}
 | 
			
		||||
				err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
 | 
			
		||||
				if err != nil && errors.IsTimeout(err) {
 | 
			
		||||
					// Pod is created but its initialization has timed out.
 | 
			
		||||
					// If the initialization is successful eventually, the
 | 
			
		||||
					// controller will observe the creation via the informer.
 | 
			
		||||
					// If the initialization fails, or if the pod keeps
 | 
			
		||||
					// uninitialized for a long time, the informer will not
 | 
			
		||||
					// receive any update, and the controller will create a new
 | 
			
		||||
					// pod when the expectation expires.
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					// Decrement the expected number of creates because the informer won't observe this pod
 | 
			
		||||
					glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
 | 
			
		||||
 
 | 
			
		||||
@@ -362,8 +362,9 @@ func (e *Store) WaitForInitialized(ctx genericapirequest.Context, obj runtime.Ob
 | 
			
		||||
		select {
 | 
			
		||||
		case event, ok := <-ch:
 | 
			
		||||
			if !ok {
 | 
			
		||||
				// TODO: should we just expose the partially initialized object?
 | 
			
		||||
				return nil, kubeerr.NewServerTimeout(e.QualifiedResource, "create", 0)
 | 
			
		||||
				msg := fmt.Sprintf("server has timed out waiting for the initialization of %s %s",
 | 
			
		||||
					e.QualifiedResource.String(), accessor.GetName())
 | 
			
		||||
				return nil, kubeerr.NewTimeoutError(msg, 0)
 | 
			
		||||
			}
 | 
			
		||||
			switch event.Type {
 | 
			
		||||
			case watch.Deleted:
 | 
			
		||||
 
 | 
			
		||||
@@ -26,8 +26,10 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/admissionregistration/v1alpha1"
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/api/extensions/v1beta1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	clientretry "k8s.io/kubernetes/pkg/client/retry"
 | 
			
		||||
@@ -133,15 +135,7 @@ var _ = SIGDescribe("Initializers", func() {
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
		// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
 | 
			
		||||
		defer func() {
 | 
			
		||||
			if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
 | 
			
		||||
				framework.Logf("got error on deleting %s", initializerConfigName)
 | 
			
		||||
			}
 | 
			
		||||
			// poller configuration is 1 second, wait at least that long
 | 
			
		||||
			time.Sleep(3 * time.Second)
 | 
			
		||||
			// clear our initializer from anyone who got it
 | 
			
		||||
			removeInitializersFromAllPods(c, initializerName)
 | 
			
		||||
		}()
 | 
			
		||||
		defer cleanupInitializer(c, initializerConfigName, initializerName)
 | 
			
		||||
 | 
			
		||||
		// poller configuration is 1 second, wait at least that long
 | 
			
		||||
		time.Sleep(3 * time.Second)
 | 
			
		||||
@@ -207,6 +201,67 @@ var _ = SIGDescribe("Initializers", func() {
 | 
			
		||||
		Expect(pod.Initializers).To(BeNil())
 | 
			
		||||
		Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true"))
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	It("don't cause replicaset controller creating extra pods if the initializer is not handled [Serial]", func() {
 | 
			
		||||
		ns := f.Namespace.Name
 | 
			
		||||
		c := f.ClientSet
 | 
			
		||||
 | 
			
		||||
		podName := "uninitialized-pod"
 | 
			
		||||
		framework.Logf("Creating pod %s", podName)
 | 
			
		||||
 | 
			
		||||
		// create and register an initializer, without setting up a controller to handle it.
 | 
			
		||||
		initializerName := "pod.test.e2e.kubernetes.io"
 | 
			
		||||
		initializerConfigName := "e2e-test-initializer"
 | 
			
		||||
		_, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName},
 | 
			
		||||
			Initializers: []v1alpha1.Initializer{
 | 
			
		||||
				{
 | 
			
		||||
					Name: initializerName,
 | 
			
		||||
					Rules: []v1alpha1.Rule{
 | 
			
		||||
						{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		})
 | 
			
		||||
		if errors.IsNotFound(err) {
 | 
			
		||||
			framework.Skipf("dynamic configuration of initializers requires the alpha admissionregistration.k8s.io group to be enabled")
 | 
			
		||||
		}
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
		// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
 | 
			
		||||
		defer cleanupInitializer(c, initializerConfigName, initializerName)
 | 
			
		||||
 | 
			
		||||
		// poller configuration is 1 second, wait at least that long
 | 
			
		||||
		time.Sleep(3 * time.Second)
 | 
			
		||||
 | 
			
		||||
		// create a replicaset
 | 
			
		||||
		persistedRS, err := c.ExtensionsV1beta1().ReplicaSets(ns).Create(newReplicaset())
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
		// wait for replicaset controller to confirm that it has handled the creation
 | 
			
		||||
		err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
		// update the replicaset spec to trigger a resync
 | 
			
		||||
		patch := []byte(`{"spec":{"minReadySeconds":5}}`)
 | 
			
		||||
		persistedRS, err = c.ExtensionsV1beta1().ReplicaSets(ns).Patch(persistedRS.Name, types.StrategicMergePatchType, patch)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
		// wait for replicaset controller to confirm that it has handle the spec update
 | 
			
		||||
		err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
		// verify that the replicaset controller doesn't create extra pod
 | 
			
		||||
		selector, err := metav1.LabelSelectorAsSelector(persistedRS.Spec.Selector)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
		listOptions := metav1.ListOptions{
 | 
			
		||||
			LabelSelector:        selector.String(),
 | 
			
		||||
			IncludeUninitialized: true,
 | 
			
		||||
		}
 | 
			
		||||
		pods, err := c.Core().Pods(ns).List(listOptions)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
		Expect(len(pods.Items)).Should(Equal(1))
 | 
			
		||||
	})
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
func newUninitializedPod(podName string) *v1.Pod {
 | 
			
		||||
@@ -217,6 +272,34 @@ func newUninitializedPod(podName string) *v1.Pod {
 | 
			
		||||
	return pod
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newReplicaset() *v1beta1.ReplicaSet {
 | 
			
		||||
	name := "initializer-test-replicaset"
 | 
			
		||||
	replicas := int32(1)
 | 
			
		||||
	labels := map[string]string{"initializer-test": "single-replicaset"}
 | 
			
		||||
	return &v1beta1.ReplicaSet{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: name,
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1beta1.ReplicaSetSpec{
 | 
			
		||||
			Replicas: &replicas,
 | 
			
		||||
			Template: v1.PodTemplateSpec{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Labels: labels,
 | 
			
		||||
				},
 | 
			
		||||
				Spec: v1.PodSpec{
 | 
			
		||||
					TerminationGracePeriodSeconds: &zero,
 | 
			
		||||
					Containers: []v1.Container{
 | 
			
		||||
						{
 | 
			
		||||
							Name:  name + "-container",
 | 
			
		||||
							Image: "gcr.io/google_containers/porter:4524579c0eb935c056c8e75563b4e1eda31587e0",
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newInitPod(podName string) *v1.Pod {
 | 
			
		||||
	containerName := fmt.Sprintf("%s-container", podName)
 | 
			
		||||
	port := 8080
 | 
			
		||||
@@ -283,3 +366,28 @@ func removeInitializersFromAllPods(c clientset.Interface, initializerName string
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// remove the initializerConfig, and remove the initializer from all pods
 | 
			
		||||
func cleanupInitializer(c clientset.Interface, initializerConfigName, initializerName string) {
 | 
			
		||||
	if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
 | 
			
		||||
		framework.Logf("got error on deleting %s", initializerConfigName)
 | 
			
		||||
	}
 | 
			
		||||
	// poller configuration is 1 second, wait at least that long
 | 
			
		||||
	time.Sleep(3 * time.Second)
 | 
			
		||||
	// clear our initializer from anyone who got it
 | 
			
		||||
	removeInitializersFromAllPods(c, initializerName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// waits till the RS status.observedGeneration matches metadata.generation.
 | 
			
		||||
func waitForRSObservedGeneration(c clientset.Interface, ns, name string, generation int64) error {
 | 
			
		||||
	return wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
 | 
			
		||||
		rs, err := c.Extensions().ReplicaSets(ns).Get(name, metav1.GetOptions{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
		if generation > rs.Status.ObservedGeneration {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user