Merge pull request #80004 from Miciah/prefer-to-delete-doubled-up-pods-of-a-replicaset

Prefer to delete doubled-up pods of a ReplicaSet
This commit is contained in:
Kubernetes Prow Robot
2019-10-17 15:09:58 -07:00
committed by GitHub
7 changed files with 698 additions and 62 deletions

View File

@@ -42,6 +42,7 @@ import (
e2edeploy "k8s.io/kubernetes/test/e2e/framework/deployment"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/framework/replicaset"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
testutil "k8s.io/kubernetes/test/utils"
utilpointer "k8s.io/utils/pointer"
)
@@ -119,6 +120,10 @@ var _ = SIGDescribe("Deployment", func() {
framework.ConformanceIt("deployment should support proportional scaling", func() {
testProportionalScalingDeployment(f)
})
ginkgo.It("should not disrupt a cloud load-balancer's connectivity during rollout", func() {
framework.SkipUnlessProviderIs("aws", "azure", "gce", "gke")
testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f)
})
// TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues
// See https://github.com/kubernetes/kubernetes/issues/29229
})
@@ -856,3 +861,151 @@ func orphanDeploymentReplicaSets(c clientset.Interface, d *appsv1.Deployment) er
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(d.UID))
return c.AppsV1().Deployments(d.Namespace).Delete(d.Name, deleteOptions)
}
func testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f *framework.Framework) {
ns := f.Namespace.Name
c := f.ClientSet
name := "test-rolling-update-with-lb"
framework.Logf("Creating Deployment %q", name)
podLabels := map[string]string{"name": name}
replicas := int32(3)
d := e2edeploy.NewDeployment(name, replicas, podLabels, AgnhostImageName, AgnhostImage, appsv1.RollingUpdateDeploymentStrategyType)
// NewDeployment assigned the same value to both d.Spec.Selector and
// d.Spec.Template.Labels, so mutating the one would mutate the other.
// Thus we need to set d.Spec.Template.Labels to a new value if we want
// to mutate it alone.
d.Spec.Template.Labels = map[string]string{
"iteration": "0",
"name": name,
}
d.Spec.Template.Spec.Containers[0].Args = []string{"netexec", "--http-port=80", "--udp-port=80"}
// To ensure that a node that had a local endpoint prior to a rolling
// update continues to have a local endpoint throughout the rollout, we
// need an affinity policy that will cause pods to be scheduled on the
// same nodes as old pods, and we need the deployment to scale up a new
// pod before deleting an old pod. This affinity policy will define
// inter-pod affinity for pods of different rollouts and anti-affinity
// for pods of the same rollout, so it will need to be updated when
// performing a rollout.
setAffinity(d)
d.Spec.Strategy.RollingUpdate = &appsv1.RollingUpdateDeployment{
MaxSurge: intOrStrP(1),
MaxUnavailable: intOrStrP(0),
}
deployment, err := c.AppsV1().Deployments(ns).Create(d)
framework.ExpectNoError(err)
err = e2edeploy.WaitForDeploymentComplete(c, deployment)
framework.ExpectNoError(err)
framework.Logf("Creating a service %s with type=LoadBalancer and externalTrafficPolicy=Local in namespace %s", name, ns)
jig := e2eservice.NewTestJig(c, name)
jig.Labels = podLabels
service := jig.CreateLoadBalancerService(ns, name, e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
})
lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0])
svcPort := int(service.Spec.Ports[0].Port)
framework.Logf("Hitting the replica set's pods through the service's load balancer")
timeout := e2eservice.LoadBalancerLagTimeoutDefault
if framework.ProviderIs("aws") {
timeout = e2eservice.LoadBalancerLagTimeoutAWS
}
e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout)
framework.Logf("Starting a goroutine to watch the service's endpoints in the background")
done := make(chan struct{})
failed := make(chan struct{})
defer close(done)
go func() {
defer ginkgo.GinkgoRecover()
expectedNodes := jig.GetEndpointNodeNames(service)
// The affinity policy should ensure that before an old pod is
// deleted, a new pod will have been created on the same node.
// Thus the set of nodes with local endpoints for the service
// should remain unchanged.
wait.Until(func() {
actualNodes := jig.GetEndpointNodeNames(service)
if !actualNodes.Equal(expectedNodes) {
framework.Logf("The set of nodes with local endpoints changed; started with %v, now have %v", expectedNodes.List(), actualNodes.List())
failed <- struct{}{}
}
}, framework.Poll, done)
}()
framework.Logf("Triggering a rolling deployment several times")
for i := 1; i <= 3; i++ {
framework.Logf("Updating label deployment %q pod spec (iteration #%d)", name, i)
deployment, err = e2edeploy.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *appsv1.Deployment) {
update.Spec.Template.Labels["iteration"] = fmt.Sprintf("%d", i)
setAffinity(update)
})
framework.ExpectNoError(err)
framework.Logf("Waiting for observed generation %d", deployment.Generation)
err = e2edeploy.WaitForObservedDeployment(c, ns, name, deployment.Generation)
framework.ExpectNoError(err)
framework.Logf("Make sure deployment %q is complete", name)
err = e2edeploy.WaitForDeploymentCompleteAndCheckRolling(c, deployment)
framework.ExpectNoError(err)
}
select {
case <-failed:
framework.Failf("Connectivity to the load balancer was interrupted")
case <-time.After(1 * time.Minute):
}
}
func setAffinity(d *appsv1.Deployment) {
d.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
{
Weight: int32(100),
PodAffinityTerm: v1.PodAffinityTerm{
TopologyKey: "kubernetes.io/hostname",
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "name",
Operator: metav1.LabelSelectorOpIn,
Values: []string{d.Spec.Template.Labels["name"]},
},
{
Key: "iteration",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{d.Spec.Template.Labels["iteration"]},
},
},
},
},
},
},
},
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
TopologyKey: "kubernetes.io/hostname",
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "name",
Operator: metav1.LabelSelectorOpIn,
Values: []string{d.Spec.Template.Labels["name"]},
},
{
Key: "iteration",
Operator: metav1.LabelSelectorOpIn,
Values: []string{d.Spec.Template.Labels["iteration"]},
},
},
},
},
},
},
}
}

View File

@@ -266,6 +266,19 @@ func (j *TestJig) CreateLoadBalancerService(namespace, serviceName string, timeo
func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
epNodes := j.GetEndpointNodeNames(svc)
nodeMap := map[string][]string{}
for _, n := range nodes.Items {
if epNodes.Has(n.Name) {
nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
}
}
return nodeMap
}
// GetEndpointNodeNames returns a string set of node names on which the
// endpoints of the given Service are running.
func (j *TestJig) GetEndpointNodeNames(svc *v1.Service) sets.String {
endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
if err != nil {
framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
@@ -281,13 +294,7 @@ func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
}
}
}
nodeMap := map[string][]string{}
for _, n := range nodes.Items {
if epNodes.Has(n.Name) {
nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
}
}
return nodeMap
return epNodes
}
// WaitForEndpointOnNode waits for a service endpoint on the given node.