mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Fix: With TolerateUnready set, endpoints are still listed for a Pod in state terminating
* Otherwise it prevents long running task in a preStop hook to succeed, that require DNS resolution
This commit is contained in:
		@@ -59,9 +59,13 @@ const (
 | 
				
			|||||||
	// An annotation on the Service denoting if the endpoints controller should
 | 
						// An annotation on the Service denoting if the endpoints controller should
 | 
				
			||||||
	// go ahead and create endpoints for unready pods. This annotation is
 | 
						// go ahead and create endpoints for unready pods. This annotation is
 | 
				
			||||||
	// currently only used by StatefulSets, where we need the pod to be DNS
 | 
						// currently only used by StatefulSets, where we need the pod to be DNS
 | 
				
			||||||
	// resolvable during initialization. In this situation we create a headless
 | 
						// resolvable during initialization and termination. In this situation we
 | 
				
			||||||
	// service just for the StatefulSet, and clients shouldn't be using this Service
 | 
						// create a headless Service just for the StatefulSet, and clients shouldn't
 | 
				
			||||||
	// for anything so unready endpoints don't matter.
 | 
						// be using this Service for anything so unready endpoints don't matter.
 | 
				
			||||||
 | 
						// Endpoints of these Services retain their DNS records and continue
 | 
				
			||||||
 | 
						// receiving traffic for the Service from the moment the kubelet starts all
 | 
				
			||||||
 | 
						// containers in the pod and marks it "Running", till the kubelet stops all
 | 
				
			||||||
 | 
						// containers and deletes the pod from the apiserver.
 | 
				
			||||||
	TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
 | 
						TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -403,7 +407,7 @@ func (e *EndpointController) syncService(key string) error {
 | 
				
			|||||||
				glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
 | 
									glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			if pod.DeletionTimestamp != nil {
 | 
								if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
 | 
				
			||||||
				glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
 | 
									glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1048,7 +1048,7 @@ var _ = framework.KubeDescribe("Services", func() {
 | 
				
			|||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	It("should create endpoints for unready pods", func() {
 | 
						It("should create endpoints for unready pods", func() {
 | 
				
			||||||
		serviceName := "never-ready"
 | 
							serviceName := "tolerate-unready"
 | 
				
			||||||
		ns := f.Namespace.Name
 | 
							ns := f.Namespace.Name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		t := NewServerTest(cs, ns, serviceName)
 | 
							t := NewServerTest(cs, ns, serviceName)
 | 
				
			||||||
@@ -1060,12 +1060,31 @@ var _ = framework.KubeDescribe("Services", func() {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		service := t.BuildServiceSpec()
 | 
							t.name = "slow-terminating-unready-pod"
 | 
				
			||||||
		service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}
 | 
							t.image = "gcr.io/google_containers/netexec:1.7"
 | 
				
			||||||
 | 
							port := 80
 | 
				
			||||||
 | 
							terminateSeconds := int64(600)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							service := &v1.Service{
 | 
				
			||||||
 | 
								ObjectMeta: v1.ObjectMeta{
 | 
				
			||||||
 | 
									Name:        t.ServiceName,
 | 
				
			||||||
 | 
									Namespace:   t.Namespace,
 | 
				
			||||||
 | 
									Annotations: map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								Spec: v1.ServiceSpec{
 | 
				
			||||||
 | 
									Selector: t.Labels,
 | 
				
			||||||
 | 
									Ports: []v1.ServicePort{{
 | 
				
			||||||
 | 
										Name:       "http",
 | 
				
			||||||
 | 
										Port:       int32(port),
 | 
				
			||||||
 | 
										TargetPort: intstr.FromInt(port),
 | 
				
			||||||
 | 
									}},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, v1.Container{
 | 
							rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, v1.Container{
 | 
				
			||||||
 | 
								Args:  []string{fmt.Sprintf("--http-port=%d", port)},
 | 
				
			||||||
			Name:  t.name,
 | 
								Name:  t.name,
 | 
				
			||||||
			Image: t.image,
 | 
								Image: t.image,
 | 
				
			||||||
			Ports: []v1.ContainerPort{{ContainerPort: int32(80), Protocol: v1.ProtocolTCP}},
 | 
								Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: v1.ProtocolTCP}},
 | 
				
			||||||
			ReadinessProbe: &v1.Probe{
 | 
								ReadinessProbe: &v1.Probe{
 | 
				
			||||||
				Handler: v1.Handler{
 | 
									Handler: v1.Handler{
 | 
				
			||||||
					Exec: &v1.ExecAction{
 | 
										Exec: &v1.ExecAction{
 | 
				
			||||||
@@ -1073,9 +1092,17 @@ var _ = framework.KubeDescribe("Services", func() {
 | 
				
			|||||||
					},
 | 
										},
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
 | 
								Lifecycle: &v1.Lifecycle{
 | 
				
			||||||
 | 
									PreStop: &v1.Handler{
 | 
				
			||||||
 | 
										Exec: &v1.ExecAction{
 | 
				
			||||||
 | 
											Command: []string{"/bin/sleep", fmt.Sprintf("%d", terminateSeconds)},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
		}, nil)
 | 
							}, nil)
 | 
				
			||||||
 | 
							rcSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &terminateSeconds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
 | 
							By(fmt.Sprintf("creating RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
 | 
				
			||||||
		_, err := t.createRC(rcSpec)
 | 
							_, err := t.createRC(rcSpec)
 | 
				
			||||||
		framework.ExpectNoError(err)
 | 
							framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1087,10 +1114,10 @@ var _ = framework.KubeDescribe("Services", func() {
 | 
				
			|||||||
		framework.ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))
 | 
							framework.ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
 | 
							svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
 | 
				
			||||||
		By("waiting for endpoints of Service with DNS name " + svcName)
 | 
							By("Waiting for endpoints of Service with DNS name " + svcName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		execPodName := createExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-")
 | 
							execPodName := createExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-")
 | 
				
			||||||
		cmd := fmt.Sprintf("wget -qO- %v", svcName)
 | 
							cmd := fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
 | 
				
			||||||
		var stdout string
 | 
							var stdout string
 | 
				
			||||||
		if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
 | 
							if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
 | 
				
			||||||
			var err error
 | 
								var err error
 | 
				
			||||||
@@ -1103,6 +1130,66 @@ var _ = framework.KubeDescribe("Services", func() {
 | 
				
			|||||||
		}); pollErr != nil {
 | 
							}); pollErr != nil {
 | 
				
			||||||
			framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
 | 
								framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Scaling down replication controler to zero")
 | 
				
			||||||
 | 
							framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Update service to not tolerate unready services")
 | 
				
			||||||
 | 
							_, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
 | 
				
			||||||
 | 
								s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "false"
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Check if pod is unreachable")
 | 
				
			||||||
 | 
							cmd = fmt.Sprintf("wget -qO- -T 2 http://%s:%d/; test \"$?\" -eq \"1\"", svcName, port)
 | 
				
			||||||
 | 
							if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
 | 
				
			||||||
 | 
								var err error
 | 
				
			||||||
 | 
								stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}); pollErr != nil {
 | 
				
			||||||
 | 
								framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Update service to tolerate unready services again")
 | 
				
			||||||
 | 
							_, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
 | 
				
			||||||
 | 
								s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "true"
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							framework.ExpectNoError(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Check if terminating pod is available through service")
 | 
				
			||||||
 | 
							cmd = fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
 | 
				
			||||||
 | 
							if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
 | 
				
			||||||
 | 
								var err error
 | 
				
			||||||
 | 
								stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}); pollErr != nil {
 | 
				
			||||||
 | 
								framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							By("Remove pods immediately")
 | 
				
			||||||
 | 
							label := labels.SelectorFromSet(labels.Set(t.Labels))
 | 
				
			||||||
 | 
							options := v1.ListOptions{LabelSelector: label.String()}
 | 
				
			||||||
 | 
							podClient := t.Client.Core().Pods(f.Namespace.Name)
 | 
				
			||||||
 | 
							pods, err := podClient.List(options)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								framework.Logf("warning: error retrieving pods: %s", err)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								for _, pod := range pods.Items {
 | 
				
			||||||
 | 
									var gracePeriodSeconds int64 = 0
 | 
				
			||||||
 | 
									err := podClient.Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										framework.Logf("warning: error force deleting pod '%s': %s", pod.Name, err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	It("should only allow access from service loadbalancer source ranges [Slow]", func() {
 | 
						It("should only allow access from service loadbalancer source ranges [Slow]", func() {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user