mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	Break deployment controller into separate self-contained files
* rolling.go (has all the logic for rolling deployments) * recreate.go (has all the logic for recreate deployments) * sync.go (has all the logic for getting and scaling replica sets) * rollback.go (has all the logic for rolling back a deployment) * util.go (contains all the utilities used throughout the controller) Leave back at deployment_controller.go all the necessary bits for creating, setting up, and running the controller loop. Also add package documentation.
This commit is contained in:
		 Michail Kargakis
					Michail Kargakis
				
			
				
					committed by
					
						 kargakis
						kargakis
					
				
			
			
				
	
			
			
			 kargakis
						kargakis
					
				
			
						parent
						
							d06359d6a0
						
					
				
				
					commit
					332d151d61
				
			
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -19,7 +19,6 @@ package deployment | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/api/testapi" | ||||
| @@ -144,805 +143,6 @@ func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TestScale tests proportional scaling of deployments. Note that fenceposts for | ||||
| // rolling out (maxUnavailable, maxSurge) have no meaning for simple scaling other | ||||
| // than recording maxSurge as part of the max-replicas annotation that is taken | ||||
| // into account in the next scale event (max-replicas is used for calculating the | ||||
| // proportion of a replica set). | ||||
| func TestScale(t *testing.T) { | ||||
| 	newTimestamp := unversioned.Date(2016, 5, 20, 2, 0, 0, 0, time.UTC) | ||||
| 	oldTimestamp := unversioned.Date(2016, 5, 20, 1, 0, 0, 0, time.UTC) | ||||
| 	olderTimestamp := unversioned.Date(2016, 5, 20, 0, 0, 0, 0, time.UTC) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		name          string | ||||
| 		deployment    *exp.Deployment | ||||
| 		oldDeployment *exp.Deployment | ||||
|  | ||||
| 		newRS  *exp.ReplicaSet | ||||
| 		oldRSs []*exp.ReplicaSet | ||||
|  | ||||
| 		expectedNew *exp.ReplicaSet | ||||
| 		expectedOld []*exp.ReplicaSet | ||||
|  | ||||
| 		desiredReplicasAnnotations map[string]int32 | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:          "normal scaling event: 10 -> 12", | ||||
| 			deployment:    newDeployment(12, nil), | ||||
| 			oldDeployment: newDeployment(10, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v1", 10, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v1", 12, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "normal scaling event: 10 -> 5", | ||||
| 			deployment:    newDeployment(5, nil), | ||||
| 			oldDeployment: newDeployment(10, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v1", 10, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v1", 5, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 5 -> 10", | ||||
| 			deployment:    newDeployment(10, nil), | ||||
| 			oldDeployment: newDeployment(5, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 4, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 5 -> 3", | ||||
| 			deployment:    newDeployment(3, nil), | ||||
| 			oldDeployment: newDeployment(5, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 1, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 2, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 9 -> 4", | ||||
| 			deployment:    newDeployment(4, nil), | ||||
| 			oldDeployment: newDeployment(9, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 8, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 1, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 4, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 0, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 7 -> 10", | ||||
| 			deployment:    newDeployment(10, nil), | ||||
| 			oldDeployment: newDeployment(7, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 3, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 4, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 13 -> 8", | ||||
| 			deployment:    newDeployment(8, nil), | ||||
| 			oldDeployment: newDeployment(13, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 8, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 5, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales up the new replica set. | ||||
| 		{ | ||||
| 			name:          "leftover distribution: 3 -> 4", | ||||
| 			deployment:    newDeployment(4, nil), | ||||
| 			oldDeployment: newDeployment(3, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales down the older replica set. | ||||
| 		{ | ||||
| 			name:          "leftover distribution: 3 -> 2", | ||||
| 			deployment:    newDeployment(2, nil), | ||||
| 			oldDeployment: newDeployment(3, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales up the latest replica set first. | ||||
| 		{ | ||||
| 			name:          "proportional scaling (no new rs): 4 -> 5", | ||||
| 			deployment:    newDeployment(5, nil), | ||||
| 			oldDeployment: newDeployment(4, nil), | ||||
|  | ||||
| 			newRS:  nil, | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: nil, | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales down to zero | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 6 -> 0", | ||||
| 			deployment:    newDeployment(0, nil), | ||||
| 			oldDeployment: newDeployment(6, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 3, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 0, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales up from zero | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 0 -> 6", | ||||
| 			deployment:    newDeployment(6, nil), | ||||
| 			oldDeployment: newDeployment(0, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 0, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 6, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 ) | ||||
| 		// Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to | ||||
| 		// update. | ||||
| 		{ | ||||
| 			name:          "failed rs update", | ||||
| 			deployment:    newDeployment(5, nil), | ||||
| 			oldDeployment: newDeployment(5, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "deployment with surge pods", | ||||
| 			deployment:    newDeploymentEnhanced(20, intstr.FromInt(2)), | ||||
| 			oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(2)), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 6, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 11, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 11, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "change both surge and size", | ||||
| 			deployment:    newDeploymentEnhanced(50, intstr.FromInt(6)), | ||||
| 			oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(3)), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 5, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 8, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 22, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 34, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range tests { | ||||
| 		_ = olderTimestamp | ||||
| 		t.Log(test.name) | ||||
| 		fake := fake.Clientset{} | ||||
| 		dc := &DeploymentController{ | ||||
| 			client:        &fake, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
|  | ||||
| 		if test.newRS != nil { | ||||
| 			desiredReplicas := test.oldDeployment.Spec.Replicas | ||||
| 			if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok { | ||||
| 				desiredReplicas = desired | ||||
| 			} | ||||
| 			setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) | ||||
| 		} | ||||
| 		for i := range test.oldRSs { | ||||
| 			rs := test.oldRSs[i] | ||||
| 			if rs == nil { | ||||
| 				continue | ||||
| 			} | ||||
| 			desiredReplicas := test.oldDeployment.Spec.Replicas | ||||
| 			if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok { | ||||
| 				desiredReplicas = desired | ||||
| 			} | ||||
| 			setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) | ||||
| 		} | ||||
|  | ||||
| 		if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { | ||||
| 			t.Errorf("%s: unexpected error: %v", test.name, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas { | ||||
| 			t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas) | ||||
| 			continue | ||||
| 		} | ||||
| 		if len(test.expectedOld) != len(test.oldRSs) { | ||||
| 			t.Errorf("%s: expected %d old replica sets, got %d", test.name, len(test.expectedOld), len(test.oldRSs)) | ||||
| 			continue | ||||
| 		} | ||||
| 		for n := range test.oldRSs { | ||||
| 			rs := test.oldRSs[n] | ||||
| 			exp := test.expectedOld[n] | ||||
| 			if exp.Spec.Replicas != rs.Spec.Replicas { | ||||
| 				t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, exp.Spec.Replicas, rs.Spec.Replicas) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		deploymentReplicas  int | ||||
| 		maxSurge            intstr.IntOrString | ||||
| 		oldReplicas         int | ||||
| 		newReplicas         int | ||||
| 		scaleExpected       bool | ||||
| 		expectedNewReplicas int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			// Should not scale up. | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxSurge:           intstr.FromInt(0), | ||||
| 			oldReplicas:        10, | ||||
| 			newReplicas:        0, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxSurge:            intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedNewReplicas: 2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxSurge:            intstr.FromInt(2), | ||||
| 			oldReplicas:         5, | ||||
| 			newReplicas:         0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedNewReplicas: 7, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxSurge:           intstr.FromInt(2), | ||||
| 			oldReplicas:        10, | ||||
| 			newReplicas:        2, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Should scale down. | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxSurge:            intstr.FromInt(2), | ||||
| 			oldReplicas:         2, | ||||
| 			newReplicas:         11, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedNewReplicas: 10, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
| 		newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp) | ||||
| 		oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) | ||||
| 		allRSs := []*exp.ReplicaSet{newRS, oldRS} | ||||
| 		deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil) | ||||
| 		fake := fake.Clientset{} | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fake, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
| 		scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !test.scaleExpected { | ||||
| 			if scaled || len(fake.Actions()) > 0 { | ||||
| 				t.Errorf("unexpected scaling: %v", fake.Actions()) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		if test.scaleExpected && !scaled { | ||||
| 			t.Errorf("expected scaling to occur") | ||||
| 			continue | ||||
| 		} | ||||
| 		if len(fake.Actions()) != 1 { | ||||
| 			t.Errorf("expected 1 action during scale, got: %v", fake.Actions()) | ||||
| 			continue | ||||
| 		} | ||||
| 		updated := fake.Actions()[0].(core.UpdateAction).GetObject().(*exp.ReplicaSet) | ||||
| 		if e, a := test.expectedNewReplicas, int(updated.Spec.Replicas); e != a { | ||||
| 			t.Errorf("expected update to %d replicas, got %d", e, a) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		deploymentReplicas  int | ||||
| 		maxUnavailable      intstr.IntOrString | ||||
| 		oldReplicas         int | ||||
| 		newReplicas         int | ||||
| 		readyPodsFromOldRS  int | ||||
| 		readyPodsFromNewRS  int | ||||
| 		scaleExpected       bool | ||||
| 		expectedOldReplicas int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(0), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  10, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 9, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  10, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ // expect unhealthy replicas from old replica sets been cleaned up | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  8, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ // expect 1 unhealthy replica from old replica sets been cleaned up, and 1 ready pod been scaled down | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  9, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ // the unavailable pods from the newRS would not make us scale down old RSs in a further step | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			oldReplicas:        8, | ||||
| 			newReplicas:        2, | ||||
| 			readyPodsFromOldRS: 8, | ||||
| 			readyPodsFromNewRS: 0, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 	} | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
|  | ||||
| 		newSelector := map[string]string{"foo": "new"} | ||||
| 		oldSelector := map[string]string{"foo": "old"} | ||||
| 		newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp) | ||||
| 		oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp) | ||||
| 		oldRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		allRSs := []*exp.ReplicaSet{oldRS, newRS} | ||||
|  | ||||
| 		deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector) | ||||
| 		fakeClientset := fake.Clientset{} | ||||
| 		fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { | ||||
| 			switch action.(type) { | ||||
| 			case core.ListAction: | ||||
| 				podList := &api.PodList{} | ||||
| 				for podIndex := 0; podIndex < test.readyPodsFromOldRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-oldReadyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: oldSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-oldUnhealthyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: oldSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionFalse, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.readyPodsFromNewRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-newReadyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: newSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-newUnhealthyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: newSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionFalse, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				return true, podList, nil | ||||
| 			} | ||||
| 			return false, nil, nil | ||||
| 		}) | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fakeClientset, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
|  | ||||
| 		scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !test.scaleExpected && scaled { | ||||
| 			t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) | ||||
| 		} | ||||
| 		if test.scaleExpected && !scaled { | ||||
| 			t.Errorf("expected scaling to occur") | ||||
| 			continue | ||||
| 		} | ||||
| 		continue | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		oldReplicas          int | ||||
| 		readyPods            int | ||||
| 		unHealthyPods        int | ||||
| 		maxCleanupCount      int | ||||
| 		cleanupCountExpected int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            8, | ||||
| 			unHealthyPods:        2, | ||||
| 			maxCleanupCount:      1, | ||||
| 			cleanupCountExpected: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            8, | ||||
| 			unHealthyPods:        2, | ||||
| 			maxCleanupCount:      3, | ||||
| 			cleanupCountExpected: 2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            8, | ||||
| 			unHealthyPods:        2, | ||||
| 			maxCleanupCount:      0, | ||||
| 			cleanupCountExpected: 0, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            10, | ||||
| 			unHealthyPods:        0, | ||||
| 			maxCleanupCount:      3, | ||||
| 			cleanupCountExpected: 0, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
| 		oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) | ||||
| 		oldRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil) | ||||
| 		fakeClientset := fake.Clientset{} | ||||
| 		fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { | ||||
| 			switch action.(type) { | ||||
| 			case core.ListAction: | ||||
| 				podList := &api.PodList{} | ||||
| 				for podIndex := 0; podIndex < test.readyPods; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name: fmt.Sprintf("%s-readyPod-%d", oldRS.Name, podIndex), | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.unHealthyPods; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name: fmt.Sprintf("%s-unHealthyPod-%d", oldRS.Name, podIndex), | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionFalse, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				return true, podList, nil | ||||
| 			} | ||||
| 			return false, nil, nil | ||||
| 		}) | ||||
|  | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fakeClientset, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
| 		_, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, 0, int32(test.maxCleanupCount)) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if int(cleanupCount) != test.cleanupCountExpected { | ||||
| 			t.Errorf("expected %v unhealthy replicas been cleaned up, got %v", test.cleanupCountExpected, cleanupCount) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		deploymentReplicas  int | ||||
| 		maxUnavailable      intstr.IntOrString | ||||
| 		readyPods           int | ||||
| 		oldReplicas         int | ||||
| 		scaleExpected       bool | ||||
| 		expectedOldReplicas int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(0), | ||||
| 			readyPods:           10, | ||||
| 			oldReplicas:         10, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 9, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			readyPods:           10, | ||||
| 			oldReplicas:         10, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			readyPods:          8, | ||||
| 			oldReplicas:        10, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			readyPods:          10, | ||||
| 			oldReplicas:        0, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			readyPods:          1, | ||||
| 			oldReplicas:        10, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
| 		oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) | ||||
| 		allRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		oldRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"}) | ||||
| 		fakeClientset := fake.Clientset{} | ||||
| 		fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { | ||||
| 			switch action.(type) { | ||||
| 			case core.ListAction: | ||||
| 				podList := &api.PodList{} | ||||
| 				for podIndex := 0; podIndex < test.readyPods; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-pod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: map[string]string{"foo": "bar"}, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				return true, podList, nil | ||||
| 			} | ||||
| 			return false, nil, nil | ||||
| 		}) | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fakeClientset, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
| 		scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !test.scaleExpected { | ||||
| 			if scaled != 0 { | ||||
| 				t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		if test.scaleExpected && scaled == 0 { | ||||
| 			t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions()) | ||||
| 			continue | ||||
| 		} | ||||
| 		// There are both list and update actions logged, so extract the update | ||||
| 		// action for verification. | ||||
| 		var updateAction core.UpdateAction | ||||
| 		for _, action := range fakeClientset.Actions() { | ||||
| 			switch a := action.(type) { | ||||
| 			case core.UpdateAction: | ||||
| 				if updateAction != nil { | ||||
| 					t.Errorf("expected only 1 update action; had %v and found %v", updateAction, a) | ||||
| 				} else { | ||||
| 					updateAction = a | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		if updateAction == nil { | ||||
| 			t.Errorf("expected an update action") | ||||
| 			continue | ||||
| 		} | ||||
| 		updated := updateAction.GetObject().(*exp.ReplicaSet) | ||||
| 		if e, a := test.expectedOldReplicas, int(updated.Spec.Replicas); e != a { | ||||
| 			t.Errorf("expected update to %d replicas, got %d", e, a) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_cleanupDeployment(t *testing.T) { | ||||
| 	selector := map[string]string{"foo": "bar"} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		oldRSs               []*exp.ReplicaSet | ||||
| 		revisionHistoryLimit int | ||||
| 		expectedDeletions    int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-2", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-3", 0, 0, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 1, | ||||
| 			expectedDeletions:    2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Only delete the replica set with Spec.Replicas = Status.Replicas = 0. | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-2", 0, 1, selector), | ||||
| 				newRSWithStatus("foo-3", 1, 0, selector), | ||||
| 				newRSWithStatus("foo-4", 1, 1, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 0, | ||||
| 			expectedDeletions:    1, | ||||
| 		}, | ||||
|  | ||||
| 		{ | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-2", 0, 0, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 0, | ||||
| 			expectedDeletions:    2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 1, 1, selector), | ||||
| 				newRSWithStatus("foo-2", 1, 1, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 0, | ||||
| 			expectedDeletions:    0, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		fake := &fake.Clientset{} | ||||
| 		controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) | ||||
|  | ||||
| 		controller.eventRecorder = &record.FakeRecorder{} | ||||
| 		controller.rsStoreSynced = alwaysReady | ||||
| 		controller.podStoreSynced = alwaysReady | ||||
| 		for _, rs := range test.oldRSs { | ||||
| 			controller.rsStore.Add(rs) | ||||
| 		} | ||||
|  | ||||
| 		d := newDeployment(1, &tests[i].revisionHistoryLimit) | ||||
| 		controller.cleanupDeployment(test.oldRSs, d) | ||||
|  | ||||
| 		gotDeletions := 0 | ||||
| 		for _, action := range fake.Actions() { | ||||
| 			if "delete" == action.GetVerb() { | ||||
| 				gotDeletions++ | ||||
| 			} | ||||
| 		} | ||||
| 		if gotDeletions != test.expectedDeletions { | ||||
| 			t.Errorf("expect %v old replica sets been deleted, but got %v", test.expectedDeletions, gotDeletions) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func getKey(d *exp.Deployment, t *testing.T) string { | ||||
| 	if key, err := controller.KeyFunc(d); err != nil { | ||||
| 		t.Errorf("Unexpected error getting key for deployment %v: %v", d.Name, err) | ||||
|   | ||||
							
								
								
									
										92
									
								
								pkg/controller/deployment/recreate.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								pkg/controller/deployment/recreate.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,92 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package deployment | ||||
|  | ||||
| import ( | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| ) | ||||
|  | ||||
| // rolloutRecreate implements the logic for recreating a replica set. | ||||
| func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error { | ||||
| 	// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down | ||||
| 	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	allRSs := append(oldRSs, newRS) | ||||
|  | ||||
| 	// scale down old replica sets | ||||
| 	scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if scaledDown { | ||||
| 		// Update DeploymentStatus | ||||
| 		return dc.updateDeploymentStatus(allRSs, newRS, deployment) | ||||
| 	} | ||||
|  | ||||
| 	// If we need to create a new RS, create it now | ||||
| 	// TODO: Create a new RS without re-listing all RSs. | ||||
| 	if newRS == nil { | ||||
| 		newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		allRSs = append(oldRSs, newRS) | ||||
| 	} | ||||
|  | ||||
| 	// scale up new replica set | ||||
| 	scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if scaledUp { | ||||
| 		// Update DeploymentStatus | ||||
| 		return dc.updateDeploymentStatus(allRSs, newRS, deployment) | ||||
| 	} | ||||
|  | ||||
| 	dc.cleanupDeployment(oldRSs, deployment) | ||||
|  | ||||
| 	// Sync deployment status | ||||
| 	return dc.syncDeploymentStatus(allRSs, newRS, deployment) | ||||
| } | ||||
|  | ||||
| // scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate" | ||||
| func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { | ||||
| 	scaled := false | ||||
| 	for _, rs := range oldRSs { | ||||
| 		// Scaling not required. | ||||
| 		if rs.Spec.Replicas == 0 { | ||||
| 			continue | ||||
| 		} | ||||
| 		scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		if scaledRS { | ||||
| 			scaled = true | ||||
| 		} | ||||
| 	} | ||||
| 	return scaled, nil | ||||
| } | ||||
|  | ||||
| // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" | ||||
| func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { | ||||
| 	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) | ||||
| 	return scaled, err | ||||
| } | ||||
							
								
								
									
										105
									
								
								pkg/controller/deployment/rollback.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								pkg/controller/deployment/rollback.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package deployment | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	deploymentutil "k8s.io/kubernetes/pkg/util/deployment" | ||||
| ) | ||||
|  | ||||
| // Rolling back to a revision; no-op if the toRevision is deployment's current revision | ||||
| func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { | ||||
| 	newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	allRSs := append(allOldRSs, newRS) | ||||
| 	// If rollback revision is 0, rollback to the last revision | ||||
| 	if *toRevision == 0 { | ||||
| 		if *toRevision = lastRevision(allRSs); *toRevision == 0 { | ||||
| 			// If we still can't find the last revision, gives up rollback | ||||
| 			dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") | ||||
| 			// Gives up rollback | ||||
| 			return dc.updateDeploymentAndClearRollbackTo(deployment) | ||||
| 		} | ||||
| 	} | ||||
| 	for _, rs := range allRSs { | ||||
| 		v, err := deploymentutil.Revision(rs) | ||||
| 		if err != nil { | ||||
| 			glog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if v == *toRevision { | ||||
| 			glog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v) | ||||
| 			// rollback by copying podTemplate.Spec from the replica set, and increment revision number by 1 | ||||
| 			// no-op if the the spec matches current deployment's podTemplate.Spec | ||||
| 			deployment, performedRollback, err := dc.rollbackToTemplate(deployment, rs) | ||||
| 			if performedRollback && err == nil { | ||||
| 				dc.emitRollbackNormalEvent(deployment, fmt.Sprintf("Rolled back deployment %q to revision %d", deployment.Name, *toRevision)) | ||||
| 			} | ||||
| 			return deployment, err | ||||
| 		} | ||||
| 	} | ||||
| 	dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.") | ||||
| 	// Gives up rollback | ||||
| 	return dc.updateDeploymentAndClearRollbackTo(deployment) | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rs *extensions.ReplicaSet) (d *extensions.Deployment, performedRollback bool, err error) { | ||||
| 	if !reflect.DeepEqual(deploymentutil.GetNewReplicaSetTemplate(deployment), rs.Spec.Template) { | ||||
| 		glog.Infof("Rolling back deployment %s to template spec %+v", deployment.Name, rs.Spec.Template.Spec) | ||||
| 		deploymentutil.SetFromReplicaSetTemplate(deployment, rs.Spec.Template) | ||||
| 		// set RS (the old RS we'll rolling back to) annotations back to the deployment; | ||||
| 		// otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback. | ||||
| 		// | ||||
| 		// For example, | ||||
| 		// A Deployment has old RS1 with annotation {change-cause:create}, and new RS2 {change-cause:edit}. | ||||
| 		// Note that both annotations are copied from Deployment, and the Deployment should be annotated {change-cause:edit} as well. | ||||
| 		// Now, rollback Deployment to RS1, we should update Deployment's pod-template and also copy annotation from RS1. | ||||
| 		// Deployment is now annotated {change-cause:create}, and we have new RS1 {change-cause:create}, old RS2 {change-cause:edit}. | ||||
| 		// | ||||
| 		// If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit}, | ||||
| 		// and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct. | ||||
| 		setDeploymentAnnotationsTo(deployment, rs) | ||||
| 		performedRollback = true | ||||
| 	} else { | ||||
| 		glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %s, skipping rollback...", deployment.Name) | ||||
| 		dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackTemplateUnchanged, fmt.Sprintf("The rollback revision contains the same template as current deployment %q", deployment.Name)) | ||||
| 	} | ||||
| 	d, err = dc.updateDeploymentAndClearRollbackTo(deployment) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) emitRollbackWarningEvent(deployment *extensions.Deployment, reason, message string) { | ||||
| 	dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, reason, message) | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) emitRollbackNormalEvent(deployment *extensions.Deployment, message string) { | ||||
| 	dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, deploymentutil.RollbackDone, message) | ||||
| } | ||||
|  | ||||
| // updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment | ||||
| func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *extensions.Deployment) (*extensions.Deployment, error) { | ||||
| 	glog.V(4).Infof("Cleans up rollbackTo of deployment %s", deployment.Name) | ||||
| 	deployment.Spec.RollbackTo = nil | ||||
| 	return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) | ||||
| } | ||||
							
								
								
									
										243
									
								
								pkg/controller/deployment/rolling.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										243
									
								
								pkg/controller/deployment/rolling.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,243 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package deployment | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"sort" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	deploymentutil "k8s.io/kubernetes/pkg/util/deployment" | ||||
| 	"k8s.io/kubernetes/pkg/util/integer" | ||||
| ) | ||||
|  | ||||
| // rolloutRolling implements the logic for rolling a new replica set. | ||||
| func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error { | ||||
| 	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	allRSs := append(oldRSs, newRS) | ||||
|  | ||||
| 	// Scale up, if we can. | ||||
| 	scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if scaledUp { | ||||
| 		// Update DeploymentStatus | ||||
| 		return dc.updateDeploymentStatus(allRSs, newRS, deployment) | ||||
| 	} | ||||
|  | ||||
| 	// Scale down, if we can. | ||||
| 	scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if scaledDown { | ||||
| 		// Update DeploymentStatus | ||||
| 		return dc.updateDeploymentStatus(allRSs, newRS, deployment) | ||||
| 	} | ||||
|  | ||||
| 	dc.cleanupDeployment(oldRSs, deployment) | ||||
|  | ||||
| 	// Sync deployment status | ||||
| 	return dc.syncDeploymentStatus(allRSs, newRS, deployment) | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { | ||||
| 	if newRS.Spec.Replicas == deployment.Spec.Replicas { | ||||
| 		// Scaling not required. | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	if newRS.Spec.Replicas > deployment.Spec.Replicas { | ||||
| 		// Scale down. | ||||
| 		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) | ||||
| 		return scaled, err | ||||
| 	} | ||||
| 	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
| 	} | ||||
| 	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) | ||||
| 	return scaled, err | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { | ||||
| 	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) | ||||
| 	if oldPodsCount == 0 { | ||||
| 		// Can't scale down further | ||||
| 		return false, nil | ||||
| 	} | ||||
|  | ||||
| 	minReadySeconds := deployment.Spec.MinReadySeconds | ||||
| 	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) | ||||
| 	// TODO: use dc.getAvailablePodsForReplicaSets instead | ||||
| 	newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds) | ||||
| 	if err != nil { | ||||
| 		return false, fmt.Errorf("could not find available pods: %v", err) | ||||
| 	} | ||||
| 	maxUnavailable := maxUnavailable(*deployment) | ||||
|  | ||||
| 	// Check if we can scale down. We can scale down in the following 2 cases: | ||||
| 	// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further | ||||
| 	//  increase unavailability. | ||||
| 	// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step. | ||||
| 	// | ||||
| 	// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable | ||||
| 	// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from | ||||
| 	// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further | ||||
| 	// step(that will increase unavailability). | ||||
| 	// | ||||
| 	// Concrete example: | ||||
| 	// | ||||
| 	// * 10 replicas | ||||
| 	// * 2 maxUnavailable (absolute number, not percent) | ||||
| 	// * 3 maxSurge (absolute number, not percent) | ||||
| 	// | ||||
| 	// case 1: | ||||
| 	// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5. | ||||
| 	// * The new replica set pods crashloop and never become available. | ||||
| 	// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5. | ||||
| 	// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down. | ||||
| 	// * The user notices the crashloop and does kubectl rollout undo to rollback. | ||||
| 	// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down. | ||||
| 	// * The total number of pods will then be 9 and the newRS can be scaled up to 10. | ||||
| 	// | ||||
| 	// case 2: | ||||
| 	// Same example, but pushing a new pod template instead of rolling back (aka "roll over"): | ||||
| 	// * The new replica set created must start with 0 replicas because allPodsCount is already at 13. | ||||
| 	// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then | ||||
| 	// allow the new replica set to be scaled up by 5. | ||||
| 	minAvailable := deployment.Spec.Replicas - maxUnavailable | ||||
| 	newRSUnavailablePodCount := newRS.Spec.Replicas - newRSAvailablePodCount | ||||
| 	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount | ||||
| 	if maxScaledDown <= 0 { | ||||
| 		return false, nil | ||||
| 	} | ||||
|  | ||||
| 	// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment | ||||
| 	// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737 | ||||
| 	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, deployment.Spec.MinReadySeconds, maxScaledDown) | ||||
| 	if err != nil { | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	glog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount) | ||||
|  | ||||
| 	// Scale down old replica sets, need check maxUnavailable to ensure we can scale down | ||||
| 	allRSs = append(oldRSs, newRS) | ||||
| 	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment) | ||||
| 	if err != nil { | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	glog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount) | ||||
|  | ||||
| 	totalScaledDown := cleanupCount + scaledDownCount | ||||
| 	return totalScaledDown > 0, nil | ||||
| } | ||||
|  | ||||
| // cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted. | ||||
| func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, minReadySeconds, maxCleanupCount int32) ([]*extensions.ReplicaSet, int32, error) { | ||||
| 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) | ||||
| 	// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order | ||||
| 	// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will | ||||
| 	// been deleted first and won't increase unavailability. | ||||
| 	totalScaledDown := int32(0) | ||||
| 	for i, targetRS := range oldRSs { | ||||
| 		if totalScaledDown >= maxCleanupCount { | ||||
| 			break | ||||
| 		} | ||||
| 		if targetRS.Spec.Replicas == 0 { | ||||
| 			// cannot scale down this replica set. | ||||
| 			continue | ||||
| 		} | ||||
| 		// TODO: use dc.getAvailablePodsForReplicaSets instead | ||||
| 		availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, minReadySeconds) | ||||
| 		if err != nil { | ||||
| 			return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err) | ||||
| 		} | ||||
| 		if targetRS.Spec.Replicas == availablePodCount { | ||||
| 			// no unhealthy replicas found, no scaling required. | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(targetRS.Spec.Replicas-availablePodCount))) | ||||
| 		newReplicasCount := targetRS.Spec.Replicas - scaledDownCount | ||||
| 		if newReplicasCount > targetRS.Spec.Replicas { | ||||
| 			return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) | ||||
| 		} | ||||
| 		_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) | ||||
| 		if err != nil { | ||||
| 			return nil, totalScaledDown, err | ||||
| 		} | ||||
| 		totalScaledDown += scaledDownCount | ||||
| 		oldRSs[i] = updatedOldRS | ||||
| 	} | ||||
| 	return oldRSs, totalScaledDown, nil | ||||
| } | ||||
|  | ||||
| // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". | ||||
| // Need check maxUnavailable to ensure availability | ||||
| func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) { | ||||
| 	maxUnavailable := maxUnavailable(*deployment) | ||||
|  | ||||
| 	// Check if we can scale down. | ||||
| 	minAvailable := deployment.Spec.Replicas - maxUnavailable | ||||
| 	minReadySeconds := deployment.Spec.MinReadySeconds | ||||
| 	// Find the number of ready pods. | ||||
| 	// TODO: use dc.getAvailablePodsForReplicaSets instead | ||||
| 	availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds) | ||||
| 	if err != nil { | ||||
| 		return 0, fmt.Errorf("could not find available pods: %v", err) | ||||
| 	} | ||||
| 	if availablePodCount <= minAvailable { | ||||
| 		// Cannot scale down. | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	glog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name) | ||||
|  | ||||
| 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) | ||||
|  | ||||
| 	totalScaledDown := int32(0) | ||||
| 	totalScaleDownCount := availablePodCount - minAvailable | ||||
| 	for _, targetRS := range oldRSs { | ||||
| 		if totalScaledDown >= totalScaleDownCount { | ||||
| 			// No further scaling required. | ||||
| 			break | ||||
| 		} | ||||
| 		if targetRS.Spec.Replicas == 0 { | ||||
| 			// cannot scale down this ReplicaSet. | ||||
| 			continue | ||||
| 		} | ||||
| 		// Scale down. | ||||
| 		scaleDownCount := int32(integer.IntMin(int(targetRS.Spec.Replicas), int(totalScaleDownCount-totalScaledDown))) | ||||
| 		newReplicasCount := targetRS.Spec.Replicas - scaleDownCount | ||||
| 		if newReplicasCount > targetRS.Spec.Replicas { | ||||
| 			return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) | ||||
| 		} | ||||
| 		_, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) | ||||
| 		if err != nil { | ||||
| 			return totalScaledDown, err | ||||
| 		} | ||||
|  | ||||
| 		totalScaledDown += scaleDownCount | ||||
| 	} | ||||
|  | ||||
| 	return totalScaledDown, nil | ||||
| } | ||||
							
								
								
									
										505
									
								
								pkg/controller/deployment/rolling_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										505
									
								
								pkg/controller/deployment/rolling_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,505 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package deployment | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	exp "k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	"k8s.io/kubernetes/pkg/client/testing/core" | ||||
| 	"k8s.io/kubernetes/pkg/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/util/intstr" | ||||
| ) | ||||
|  | ||||
| func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		deploymentReplicas  int | ||||
| 		maxSurge            intstr.IntOrString | ||||
| 		oldReplicas         int | ||||
| 		newReplicas         int | ||||
| 		scaleExpected       bool | ||||
| 		expectedNewReplicas int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			// Should not scale up. | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxSurge:           intstr.FromInt(0), | ||||
| 			oldReplicas:        10, | ||||
| 			newReplicas:        0, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxSurge:            intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedNewReplicas: 2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxSurge:            intstr.FromInt(2), | ||||
| 			oldReplicas:         5, | ||||
| 			newReplicas:         0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedNewReplicas: 7, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxSurge:           intstr.FromInt(2), | ||||
| 			oldReplicas:        10, | ||||
| 			newReplicas:        2, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Should scale down. | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxSurge:            intstr.FromInt(2), | ||||
| 			oldReplicas:         2, | ||||
| 			newReplicas:         11, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedNewReplicas: 10, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
| 		newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp) | ||||
| 		oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) | ||||
| 		allRSs := []*exp.ReplicaSet{newRS, oldRS} | ||||
| 		deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil) | ||||
| 		fake := fake.Clientset{} | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fake, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
| 		scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !test.scaleExpected { | ||||
| 			if scaled || len(fake.Actions()) > 0 { | ||||
| 				t.Errorf("unexpected scaling: %v", fake.Actions()) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		if test.scaleExpected && !scaled { | ||||
| 			t.Errorf("expected scaling to occur") | ||||
| 			continue | ||||
| 		} | ||||
| 		if len(fake.Actions()) != 1 { | ||||
| 			t.Errorf("expected 1 action during scale, got: %v", fake.Actions()) | ||||
| 			continue | ||||
| 		} | ||||
| 		updated := fake.Actions()[0].(core.UpdateAction).GetObject().(*exp.ReplicaSet) | ||||
| 		if e, a := test.expectedNewReplicas, int(updated.Spec.Replicas); e != a { | ||||
| 			t.Errorf("expected update to %d replicas, got %d", e, a) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		deploymentReplicas  int | ||||
| 		maxUnavailable      intstr.IntOrString | ||||
| 		oldReplicas         int | ||||
| 		newReplicas         int | ||||
| 		readyPodsFromOldRS  int | ||||
| 		readyPodsFromNewRS  int | ||||
| 		scaleExpected       bool | ||||
| 		expectedOldReplicas int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(0), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  10, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 9, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  10, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ // expect unhealthy replicas from old replica sets been cleaned up | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  8, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ // expect 1 unhealthy replica from old replica sets been cleaned up, and 1 ready pod been scaled down | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			oldReplicas:         10, | ||||
| 			newReplicas:         0, | ||||
| 			readyPodsFromOldRS:  9, | ||||
| 			readyPodsFromNewRS:  0, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ // the unavailable pods from the newRS would not make us scale down old RSs in a further step | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			oldReplicas:        8, | ||||
| 			newReplicas:        2, | ||||
| 			readyPodsFromOldRS: 8, | ||||
| 			readyPodsFromNewRS: 0, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 	} | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
|  | ||||
| 		newSelector := map[string]string{"foo": "new"} | ||||
| 		oldSelector := map[string]string{"foo": "old"} | ||||
| 		newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp) | ||||
| 		oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp) | ||||
| 		oldRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		allRSs := []*exp.ReplicaSet{oldRS, newRS} | ||||
|  | ||||
| 		deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector) | ||||
| 		fakeClientset := fake.Clientset{} | ||||
| 		fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { | ||||
| 			switch action.(type) { | ||||
| 			case core.ListAction: | ||||
| 				podList := &api.PodList{} | ||||
| 				for podIndex := 0; podIndex < test.readyPodsFromOldRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-oldReadyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: oldSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-oldUnhealthyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: oldSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionFalse, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.readyPodsFromNewRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-newReadyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: newSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-newUnhealthyPod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: newSelector, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionFalse, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				return true, podList, nil | ||||
| 			} | ||||
| 			return false, nil, nil | ||||
| 		}) | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fakeClientset, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
|  | ||||
| 		scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !test.scaleExpected && scaled { | ||||
| 			t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) | ||||
| 		} | ||||
| 		if test.scaleExpected && !scaled { | ||||
| 			t.Errorf("expected scaling to occur") | ||||
| 			continue | ||||
| 		} | ||||
| 		continue | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		oldReplicas          int | ||||
| 		readyPods            int | ||||
| 		unHealthyPods        int | ||||
| 		maxCleanupCount      int | ||||
| 		cleanupCountExpected int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            8, | ||||
| 			unHealthyPods:        2, | ||||
| 			maxCleanupCount:      1, | ||||
| 			cleanupCountExpected: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            8, | ||||
| 			unHealthyPods:        2, | ||||
| 			maxCleanupCount:      3, | ||||
| 			cleanupCountExpected: 2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            8, | ||||
| 			unHealthyPods:        2, | ||||
| 			maxCleanupCount:      0, | ||||
| 			cleanupCountExpected: 0, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldReplicas:          10, | ||||
| 			readyPods:            10, | ||||
| 			unHealthyPods:        0, | ||||
| 			maxCleanupCount:      3, | ||||
| 			cleanupCountExpected: 0, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
| 		oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) | ||||
| 		oldRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil) | ||||
| 		fakeClientset := fake.Clientset{} | ||||
| 		fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { | ||||
| 			switch action.(type) { | ||||
| 			case core.ListAction: | ||||
| 				podList := &api.PodList{} | ||||
| 				for podIndex := 0; podIndex < test.readyPods; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name: fmt.Sprintf("%s-readyPod-%d", oldRS.Name, podIndex), | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				for podIndex := 0; podIndex < test.unHealthyPods; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name: fmt.Sprintf("%s-unHealthyPod-%d", oldRS.Name, podIndex), | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionFalse, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				return true, podList, nil | ||||
| 			} | ||||
| 			return false, nil, nil | ||||
| 		}) | ||||
|  | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fakeClientset, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
| 		_, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, 0, int32(test.maxCleanupCount)) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if int(cleanupCount) != test.cleanupCountExpected { | ||||
| 			t.Errorf("expected %v unhealthy replicas been cleaned up, got %v", test.cleanupCountExpected, cleanupCount) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		deploymentReplicas  int | ||||
| 		maxUnavailable      intstr.IntOrString | ||||
| 		readyPods           int | ||||
| 		oldReplicas         int | ||||
| 		scaleExpected       bool | ||||
| 		expectedOldReplicas int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(0), | ||||
| 			readyPods:           10, | ||||
| 			oldReplicas:         10, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 9, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas:  10, | ||||
| 			maxUnavailable:      intstr.FromInt(2), | ||||
| 			readyPods:           10, | ||||
| 			oldReplicas:         10, | ||||
| 			scaleExpected:       true, | ||||
| 			expectedOldReplicas: 8, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			readyPods:          8, | ||||
| 			oldReplicas:        10, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			readyPods:          10, | ||||
| 			oldReplicas:        0, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			deploymentReplicas: 10, | ||||
| 			maxUnavailable:     intstr.FromInt(2), | ||||
| 			readyPods:          1, | ||||
| 			oldReplicas:        10, | ||||
| 			scaleExpected:      false, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		t.Logf("executing scenario %d", i) | ||||
| 		oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) | ||||
| 		allRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		oldRSs := []*exp.ReplicaSet{oldRS} | ||||
| 		deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"}) | ||||
| 		fakeClientset := fake.Clientset{} | ||||
| 		fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { | ||||
| 			switch action.(type) { | ||||
| 			case core.ListAction: | ||||
| 				podList := &api.PodList{} | ||||
| 				for podIndex := 0; podIndex < test.readyPods; podIndex++ { | ||||
| 					podList.Items = append(podList.Items, api.Pod{ | ||||
| 						ObjectMeta: api.ObjectMeta{ | ||||
| 							Name:   fmt.Sprintf("%s-pod-%d", oldRS.Name, podIndex), | ||||
| 							Labels: map[string]string{"foo": "bar"}, | ||||
| 						}, | ||||
| 						Status: api.PodStatus{ | ||||
| 							Conditions: []api.PodCondition{ | ||||
| 								{ | ||||
| 									Type:   api.PodReady, | ||||
| 									Status: api.ConditionTrue, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}) | ||||
| 				} | ||||
| 				return true, podList, nil | ||||
| 			} | ||||
| 			return false, nil, nil | ||||
| 		}) | ||||
| 		controller := &DeploymentController{ | ||||
| 			client:        &fakeClientset, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
| 		scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !test.scaleExpected { | ||||
| 			if scaled != 0 { | ||||
| 				t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 		if test.scaleExpected && scaled == 0 { | ||||
| 			t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions()) | ||||
| 			continue | ||||
| 		} | ||||
| 		// There are both list and update actions logged, so extract the update | ||||
| 		// action for verification. | ||||
| 		var updateAction core.UpdateAction | ||||
| 		for _, action := range fakeClientset.Actions() { | ||||
| 			switch a := action.(type) { | ||||
| 			case core.UpdateAction: | ||||
| 				if updateAction != nil { | ||||
| 					t.Errorf("expected only 1 update action; had %v and found %v", updateAction, a) | ||||
| 				} else { | ||||
| 					updateAction = a | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		if updateAction == nil { | ||||
| 			t.Errorf("expected an update action") | ||||
| 			continue | ||||
| 		} | ||||
| 		updated := updateAction.GetObject().(*exp.ReplicaSet) | ||||
| 		if e, a := test.expectedOldReplicas, int(updated.Spec.Replicas); e != a { | ||||
| 			t.Errorf("expected update to %d replicas, got %d", e, a) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										527
									
								
								pkg/controller/deployment/sync.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										527
									
								
								pkg/controller/deployment/sync.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,527 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package deployment | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/api/errors" | ||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	deploymentutil "k8s.io/kubernetes/pkg/util/deployment" | ||||
| 	utilerrors "k8s.io/kubernetes/pkg/util/errors" | ||||
| 	labelsutil "k8s.io/kubernetes/pkg/util/labels" | ||||
| 	podutil "k8s.io/kubernetes/pkg/util/pod" | ||||
| 	rsutil "k8s.io/kubernetes/pkg/util/replicaset" | ||||
| ) | ||||
|  | ||||
| // sync is responsible for reconciling deployments on scaling events or when they | ||||
| // are paused. | ||||
| func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { | ||||
| 	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := dc.scale(deployment, newRS, oldRSs); err != nil { | ||||
| 		// If we get an error while trying to scale, the deployment will be requeued | ||||
| 		// so we can abort this resync | ||||
| 		return err | ||||
| 	} | ||||
| 	dc.cleanupDeployment(oldRSs, deployment) | ||||
|  | ||||
| 	allRSs := append(oldRSs, newRS) | ||||
| 	return dc.syncDeploymentStatus(allRSs, newRS, deployment) | ||||
| } | ||||
|  | ||||
| // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. | ||||
| // 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). | ||||
| // 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), | ||||
| //    only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. | ||||
| // 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. | ||||
| // Note that currently the deployment controller is using caches to avoid querying the server for reads. | ||||
| // This may lead to stale reads of replica sets, thus incorrect deployment status. | ||||
| func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { | ||||
| 	// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods | ||||
| 	rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) | ||||
| 	} | ||||
| 	_, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
|  | ||||
| 	// Calculate the max revision number among all old RSes | ||||
| 	maxOldV := maxRevision(allOldRSs) | ||||
|  | ||||
| 	// Get new replica set with the updated revision number | ||||
| 	newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
|  | ||||
| 	// Sync deployment's revision number with new replica set | ||||
| 	if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 && | ||||
| 		(deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) { | ||||
| 		if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil { | ||||
| 			glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return newRS, allOldRSs, nil | ||||
| } | ||||
|  | ||||
| // rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced. | ||||
| func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) { | ||||
| 	rsList, err := deploymentutil.ListReplicaSets(deployment, | ||||
| 		func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { | ||||
| 			return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) | ||||
| 		}) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) | ||||
| 	} | ||||
| 	syncedRSList := []extensions.ReplicaSet{} | ||||
| 	for _, rs := range rsList { | ||||
| 		// Add pod-template-hash information if it's not in the RS. | ||||
| 		// Otherwise, new RS produced by Deployment will overlap with pre-existing ones | ||||
| 		// that aren't constrained by the pod-template-hash. | ||||
| 		syncedRS, err := dc.addHashKeyToRSAndPods(rs) | ||||
| 		if err != nil { | ||||
| 			return nil, nil, err | ||||
| 		} | ||||
| 		syncedRSList = append(syncedRSList, *syncedRS) | ||||
| 	} | ||||
| 	syncedPodList, err := dc.listPods(deployment) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 	return syncedRSList, syncedPodList, nil | ||||
| } | ||||
|  | ||||
| // addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: | ||||
| // 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created | ||||
| // 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas | ||||
| // 3. Add hash label to the rs's label and selector | ||||
| func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { | ||||
| 	updatedRS = &rs | ||||
| 	// If the rs already has the new hash label in its selector, it's done syncing | ||||
| 	if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { | ||||
| 		return | ||||
| 	} | ||||
| 	namespace := rs.Namespace | ||||
| 	hash := rsutil.GetPodTemplateSpecHash(rs) | ||||
| 	rsUpdated := false | ||||
| 	// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. | ||||
| 	updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, | ||||
| 		func(updated *extensions.ReplicaSet) error { | ||||
| 			// Precondition: the RS doesn't contain the new hash in its pod template label. | ||||
| 			if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { | ||||
| 				return utilerrors.ErrPreconditionViolated | ||||
| 			} | ||||
| 			updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) | ||||
| 			return nil | ||||
| 		}) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) | ||||
| 	} | ||||
| 	if !rsUpdated { | ||||
| 		// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. | ||||
| 		// Return here and retry in the next sync loop. | ||||
| 		return &rs, nil | ||||
| 	} | ||||
| 	// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). | ||||
| 	if updatedRS.Generation > updatedRS.Status.ObservedGeneration { | ||||
| 		if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { | ||||
| 			return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) | ||||
| 		} | ||||
| 	} | ||||
| 	glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) | ||||
|  | ||||
| 	// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. | ||||
| 	selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) | ||||
| 	} | ||||
| 	options := api.ListOptions{LabelSelector: selector} | ||||
| 	podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) | ||||
| 	} | ||||
| 	allPodsLabeled := false | ||||
| 	if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil { | ||||
| 		return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) | ||||
| 	} | ||||
| 	// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error. | ||||
| 	// Return here and retry in the next sync loop. | ||||
| 	if !allPodsLabeled { | ||||
| 		return updatedRS, nil | ||||
| 	} | ||||
|  | ||||
| 	// We need to wait for the replicaset controller to observe the pods being | ||||
| 	// labeled with pod template hash. Because previously we've called | ||||
| 	// WaitForReplicaSetUpdated, the replicaset controller should have dropped | ||||
| 	// FullyLabeledReplicas to 0 already, we only need to wait it to increase | ||||
| 	// back to the number of replicas in the spec. | ||||
| 	if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { | ||||
| 		return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) | ||||
| 	} | ||||
|  | ||||
| 	// 3. Update rs label and selector to include the new hash label | ||||
| 	// Copy the old selector, so that we can scrub out any orphaned pods | ||||
| 	if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, | ||||
| 		func(updated *extensions.ReplicaSet) error { | ||||
| 			// Precondition: the RS doesn't contain the new hash in its label or selector. | ||||
| 			if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { | ||||
| 				return utilerrors.ErrPreconditionViolated | ||||
| 			} | ||||
| 			updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) | ||||
| 			updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) | ||||
| 			return nil | ||||
| 		}); err != nil { | ||||
| 		return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) | ||||
| 	} | ||||
| 	if rsUpdated { | ||||
| 		glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) | ||||
| 	} | ||||
| 	// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet. | ||||
|  | ||||
| 	// TODO: look for orphaned pods and label them in the background somewhere else periodically | ||||
|  | ||||
| 	return updatedRS, nil | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { | ||||
| 	return deploymentutil.ListPods(deployment, | ||||
| 		func(namespace string, options api.ListOptions) (*api.PodList, error) { | ||||
| 			podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) | ||||
| 			return &podList, err | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| // Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet. | ||||
| // 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's). | ||||
| // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. | ||||
| // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. | ||||
| // Note that the pod-template-hash will be added to adopted RSes and pods. | ||||
| func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { | ||||
| 	// Calculate revision number for this new replica set | ||||
| 	newRevision := strconv.FormatInt(maxOldRevision+1, 10) | ||||
|  | ||||
| 	existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} else if existingNewRS != nil { | ||||
| 		// Set existing new replica set's annotation | ||||
| 		if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { | ||||
| 			return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS) | ||||
| 		} | ||||
| 		return existingNewRS, nil | ||||
| 	} | ||||
|  | ||||
| 	if !createIfNotExisted { | ||||
| 		return nil, nil | ||||
| 	} | ||||
|  | ||||
| 	// new ReplicaSet does not exist, create one. | ||||
| 	namespace := deployment.ObjectMeta.Namespace | ||||
| 	podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template) | ||||
| 	newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment) | ||||
| 	// Add podTemplateHash label to selector. | ||||
| 	newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) | ||||
|  | ||||
| 	// Create new ReplicaSet | ||||
| 	newRS := extensions.ReplicaSet{ | ||||
| 		ObjectMeta: api.ObjectMeta{ | ||||
| 			// Make the name deterministic, to ensure idempotence | ||||
| 			Name:      deployment.Name + "-" + fmt.Sprintf("%d", podTemplateSpecHash), | ||||
| 			Namespace: namespace, | ||||
| 		}, | ||||
| 		Spec: extensions.ReplicaSetSpec{ | ||||
| 			Replicas: 0, | ||||
| 			Selector: newRSSelector, | ||||
| 			Template: newRSTemplate, | ||||
| 		}, | ||||
| 	} | ||||
| 	allRSs := append(oldRSs, &newRS) | ||||
| 	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	newRS.Spec.Replicas = newReplicasCount | ||||
| 	// Set new replica set's annotation | ||||
| 	setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) | ||||
| 	createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) | ||||
| 	if err != nil { | ||||
| 		dc.enqueueDeployment(deployment) | ||||
| 		return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) | ||||
| 	} | ||||
| 	if newReplicasCount > 0 { | ||||
| 		dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) | ||||
| 	} | ||||
|  | ||||
| 	return createdRS, dc.updateDeploymentRevision(deployment, newRevision) | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error { | ||||
| 	if deployment.Annotations == nil { | ||||
| 		deployment.Annotations = make(map[string]string) | ||||
| 	} | ||||
| 	if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision { | ||||
| 		deployment.Annotations[deploymentutil.RevisionAnnotation] = revision | ||||
| 		_, err := dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size | ||||
| // of the new replica set and scaling down can decrease the sizes of the old ones, both of which would | ||||
| // have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable | ||||
| // replicas in the event of a problem with the rolled out template. Should run only on scaling events or | ||||
| // when a deployment is paused and not during the normal rollout process. | ||||
| func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error { | ||||
| 	// If there is only one active replica set then we should scale that up to the full count of the | ||||
| 	// deployment. If there is no active replica set, then we should scale up the newest replica set. | ||||
| 	if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { | ||||
| 		if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas { | ||||
| 			return nil | ||||
| 		} | ||||
| 		_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// If the new replica set is saturated, old replica sets should be fully scaled down. | ||||
| 	// This case handles replica set adoption during a saturated new replica set. | ||||
| 	if deploymentutil.IsSaturated(deployment, newRS) { | ||||
| 		for _, old := range controller.FilterActiveReplicaSets(oldRSs) { | ||||
| 			if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// There are old replica sets with pods and the new replica set is not saturated. | ||||
| 	// We need to proportionally scale all replica sets (new and old) in case of a | ||||
| 	// rolling deployment. | ||||
| 	if deploymentutil.IsRollingUpdate(deployment) { | ||||
| 		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) | ||||
| 		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) | ||||
|  | ||||
| 		allowedSize := int32(0) | ||||
| 		if deployment.Spec.Replicas > 0 { | ||||
| 			allowedSize = deployment.Spec.Replicas + maxSurge(*deployment) | ||||
| 		} | ||||
|  | ||||
| 		// Number of additional replicas that can be either added or removed from the total | ||||
| 		// replicas count. These replicas should be distributed proportionally to the active | ||||
| 		// replica sets. | ||||
| 		deploymentReplicasToAdd := allowedSize - allRSsReplicas | ||||
|  | ||||
| 		// The additional replicas should be distributed proportionally amongst the active | ||||
| 		// replica sets from the larger to the smaller in size replica set. Scaling direction | ||||
| 		// drives what happens in case we are trying to scale replica sets of the same size. | ||||
| 		// In such a case when scaling up, we should scale up newer replica sets first, and | ||||
| 		// when scaling down, we should scale down older replica sets first. | ||||
| 		scalingOperation := "up" | ||||
| 		switch { | ||||
| 		case deploymentReplicasToAdd > 0: | ||||
| 			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) | ||||
|  | ||||
| 		case deploymentReplicasToAdd < 0: | ||||
| 			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) | ||||
| 			scalingOperation = "down" | ||||
|  | ||||
| 		default: /* deploymentReplicasToAdd == 0 */ | ||||
| 			// Nothing to add. | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		// Iterate over all active replica sets and estimate proportions for each of them. | ||||
| 		// The absolute value of deploymentReplicasAdded should never exceed the absolute | ||||
| 		// value of deploymentReplicasToAdd. | ||||
| 		deploymentReplicasAdded := int32(0) | ||||
| 		for i := range allRSs { | ||||
| 			rs := allRSs[i] | ||||
|  | ||||
| 			proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) | ||||
|  | ||||
| 			rs.Spec.Replicas += proportion | ||||
| 			deploymentReplicasAdded += proportion | ||||
| 		} | ||||
|  | ||||
| 		// Update all replica sets | ||||
| 		for i := range allRSs { | ||||
| 			rs := allRSs[i] | ||||
|  | ||||
| 			// Add/remove any leftovers to the largest replica set. | ||||
| 			if i == 0 { | ||||
| 				leftover := deploymentReplicasToAdd - deploymentReplicasAdded | ||||
| 				rs.Spec.Replicas += leftover | ||||
| 				if rs.Spec.Replicas < 0 { | ||||
| 					rs.Spec.Replicas = 0 | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil { | ||||
| 				// Return as soon as we fail, the deployment is requeued | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) { | ||||
| 	// No need to scale | ||||
| 	if rs.Spec.Replicas == newScale { | ||||
| 		return false, rs, nil | ||||
| 	} | ||||
| 	var scalingOperation string | ||||
| 	if rs.Spec.Replicas < newScale { | ||||
| 		scalingOperation = "up" | ||||
| 	} else { | ||||
| 		scalingOperation = "down" | ||||
| 	} | ||||
| 	newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation) | ||||
| 	return true, newRS, err | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) { | ||||
| 	// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. | ||||
| 	rs.Spec.Replicas = newScale | ||||
| 	setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) | ||||
| 	rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) | ||||
| 	if err == nil { | ||||
| 		dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) | ||||
| 	} else { | ||||
| 		glog.Warningf("Cannot update replica set %q: %v", rs.Name, err) | ||||
| 		dc.enqueueDeployment(deployment) | ||||
| 	} | ||||
| 	return rs, err | ||||
| } | ||||
|  | ||||
| // cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets | ||||
| // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept | ||||
| // around by default 1) for historical reasons and 2) for the ability to rollback a deployment. | ||||
| func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error { | ||||
| 	if deployment.Spec.RevisionHistoryLimit == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit | ||||
| 	if diff <= 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) | ||||
|  | ||||
| 	var errList []error | ||||
| 	// TODO: This should be parallelized. | ||||
| 	for i := int32(0); i < diff; i++ { | ||||
| 		rs := oldRSs[i] | ||||
| 		// Avoid delete replica set with non-zero replica counts | ||||
| 		if rs.Status.Replicas != 0 || rs.Spec.Replicas != 0 || rs.Generation > rs.Status.ObservedGeneration { | ||||
| 			continue | ||||
| 		} | ||||
| 		if err := dc.client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) { | ||||
| 			glog.V(2).Infof("Failed deleting old replica set %v for deployment %v: %v", rs.Name, deployment.Name, err) | ||||
| 			errList = append(errList, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return utilerrors.NewAggregate(errList) | ||||
| } | ||||
|  | ||||
| // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary | ||||
| func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { | ||||
| 	newStatus, err := dc.calculateStatus(allRSs, newRS, d) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if !reflect.DeepEqual(d.Status, newStatus) { | ||||
| 		return dc.updateDeploymentStatus(allRSs, newRS, d) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) { | ||||
| 	availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs) | ||||
| 	if err != nil { | ||||
| 		return deployment.Status, fmt.Errorf("failed to count available pods: %v", err) | ||||
| 	} | ||||
| 	totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) | ||||
|  | ||||
| 	return extensions.DeploymentStatus{ | ||||
| 		// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. | ||||
| 		ObservedGeneration:  deployment.Generation, | ||||
| 		Replicas:            deploymentutil.GetActualReplicaCountForReplicaSets(allRSs), | ||||
| 		UpdatedReplicas:     deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), | ||||
| 		AvailableReplicas:   availableReplicas, | ||||
| 		UnavailableReplicas: totalReplicas - availableReplicas, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) { | ||||
| 	podList, err := dc.listPods(deployment) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds) | ||||
| } | ||||
|  | ||||
| func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error { | ||||
| 	newStatus, err := dc.calculateStatus(allRSs, newRS, deployment) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	newDeployment := deployment | ||||
| 	newDeployment.Status = newStatus | ||||
| 	_, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| // isScalingEvent checks whether the provided deployment has been updated with a scaling event | ||||
| // by looking at the desired-replicas annotation in the active replica sets of the deployment. | ||||
| func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool { | ||||
| 	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) | ||||
| 	if err != nil { | ||||
| 		return false | ||||
| 	} | ||||
| 	// If there is no new replica set matching this deployment and the deployment isn't paused | ||||
| 	// then there is a new rollout that waits to happen | ||||
| 	if newRS == nil && !d.Spec.Paused { | ||||
| 		return false | ||||
| 	} | ||||
| 	allRSs := append(oldRSs, newRS) | ||||
| 	for _, rs := range controller.FilterActiveReplicaSets(allRSs) { | ||||
| 		desired, ok := getDesiredReplicasAnnotation(rs) | ||||
| 		if !ok { | ||||
| 			continue | ||||
| 		} | ||||
| 		if desired != d.Spec.Replicas { | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
							
								
								
									
										348
									
								
								pkg/controller/deployment/sync_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										348
									
								
								pkg/controller/deployment/sync_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,348 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package deployment | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||
| 	exp "k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	"k8s.io/kubernetes/pkg/util/intstr" | ||||
| ) | ||||
|  | ||||
| func TestScale(t *testing.T) { | ||||
| 	newTimestamp := unversioned.Date(2016, 5, 20, 2, 0, 0, 0, time.UTC) | ||||
| 	oldTimestamp := unversioned.Date(2016, 5, 20, 1, 0, 0, 0, time.UTC) | ||||
| 	olderTimestamp := unversioned.Date(2016, 5, 20, 0, 0, 0, 0, time.UTC) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		name          string | ||||
| 		deployment    *exp.Deployment | ||||
| 		oldDeployment *exp.Deployment | ||||
|  | ||||
| 		newRS  *exp.ReplicaSet | ||||
| 		oldRSs []*exp.ReplicaSet | ||||
|  | ||||
| 		expectedNew *exp.ReplicaSet | ||||
| 		expectedOld []*exp.ReplicaSet | ||||
|  | ||||
| 		desiredReplicasAnnotations map[string]int32 | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:          "normal scaling event: 10 -> 12", | ||||
| 			deployment:    newDeployment(12, nil), | ||||
| 			oldDeployment: newDeployment(10, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v1", 10, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v1", 12, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "normal scaling event: 10 -> 5", | ||||
| 			deployment:    newDeployment(5, nil), | ||||
| 			oldDeployment: newDeployment(10, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v1", 10, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v1", 5, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 5 -> 10", | ||||
| 			deployment:    newDeployment(10, nil), | ||||
| 			oldDeployment: newDeployment(5, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 4, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 5 -> 3", | ||||
| 			deployment:    newDeployment(3, nil), | ||||
| 			oldDeployment: newDeployment(5, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 1, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 2, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 9 -> 4", | ||||
| 			deployment:    newDeployment(4, nil), | ||||
| 			oldDeployment: newDeployment(9, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 8, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 1, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 4, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 0, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 7 -> 10", | ||||
| 			deployment:    newDeployment(10, nil), | ||||
| 			oldDeployment: newDeployment(7, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 3, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 4, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 13 -> 8", | ||||
| 			deployment:    newDeployment(8, nil), | ||||
| 			oldDeployment: newDeployment(13, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 8, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 5, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales up the new replica set. | ||||
| 		{ | ||||
| 			name:          "leftover distribution: 3 -> 4", | ||||
| 			deployment:    newDeployment(4, nil), | ||||
| 			oldDeployment: newDeployment(3, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales down the older replica set. | ||||
| 		{ | ||||
| 			name:          "leftover distribution: 3 -> 2", | ||||
| 			deployment:    newDeployment(2, nil), | ||||
| 			oldDeployment: newDeployment(3, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 1, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales up the latest replica set first. | ||||
| 		{ | ||||
| 			name:          "proportional scaling (no new rs): 4 -> 5", | ||||
| 			deployment:    newDeployment(5, nil), | ||||
| 			oldDeployment: newDeployment(4, nil), | ||||
|  | ||||
| 			newRS:  nil, | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: nil, | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales down to zero | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 6 -> 0", | ||||
| 			deployment:    newDeployment(0, nil), | ||||
| 			oldDeployment: newDeployment(6, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 3, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 0, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scales up from zero | ||||
| 		{ | ||||
| 			name:          "proportional scaling: 0 -> 6", | ||||
| 			deployment:    newDeployment(6, nil), | ||||
| 			oldDeployment: newDeployment(0, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 0, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 6, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, | ||||
| 		}, | ||||
| 		// Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 ) | ||||
| 		// Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to | ||||
| 		// update. | ||||
| 		{ | ||||
| 			name:          "failed rs update", | ||||
| 			deployment:    newDeployment(5, nil), | ||||
| 			oldDeployment: newDeployment(5, nil), | ||||
|  | ||||
| 			newRS:  rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v3", 2, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, | ||||
|  | ||||
| 			desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "deployment with surge pods", | ||||
| 			deployment:    newDeploymentEnhanced(20, intstr.FromInt(2)), | ||||
| 			oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(2)), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 6, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 11, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 11, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "change both surge and size", | ||||
| 			deployment:    newDeploymentEnhanced(50, intstr.FromInt(6)), | ||||
| 			oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(3)), | ||||
|  | ||||
| 			newRS:  rs("foo-v2", 5, nil, newTimestamp), | ||||
| 			oldRSs: []*exp.ReplicaSet{rs("foo-v1", 8, nil, oldTimestamp)}, | ||||
|  | ||||
| 			expectedNew: rs("foo-v2", 22, nil, newTimestamp), | ||||
| 			expectedOld: []*exp.ReplicaSet{rs("foo-v1", 34, nil, oldTimestamp)}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range tests { | ||||
| 		_ = olderTimestamp | ||||
| 		t.Log(test.name) | ||||
| 		fake := fake.Clientset{} | ||||
| 		dc := &DeploymentController{ | ||||
| 			client:        &fake, | ||||
| 			eventRecorder: &record.FakeRecorder{}, | ||||
| 		} | ||||
|  | ||||
| 		if test.newRS != nil { | ||||
| 			desiredReplicas := test.oldDeployment.Spec.Replicas | ||||
| 			if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok { | ||||
| 				desiredReplicas = desired | ||||
| 			} | ||||
| 			setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) | ||||
| 		} | ||||
| 		for i := range test.oldRSs { | ||||
| 			rs := test.oldRSs[i] | ||||
| 			if rs == nil { | ||||
| 				continue | ||||
| 			} | ||||
| 			desiredReplicas := test.oldDeployment.Spec.Replicas | ||||
| 			if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok { | ||||
| 				desiredReplicas = desired | ||||
| 			} | ||||
| 			setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) | ||||
| 		} | ||||
|  | ||||
| 		if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { | ||||
| 			t.Errorf("%s: unexpected error: %v", test.name, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas { | ||||
| 			t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas) | ||||
| 			continue | ||||
| 		} | ||||
| 		if len(test.expectedOld) != len(test.oldRSs) { | ||||
| 			t.Errorf("%s: expected %d old replica sets, got %d", test.name, len(test.expectedOld), len(test.oldRSs)) | ||||
| 			continue | ||||
| 		} | ||||
| 		for n := range test.oldRSs { | ||||
| 			rs := test.oldRSs[n] | ||||
| 			exp := test.expectedOld[n] | ||||
| 			if exp.Spec.Replicas != rs.Spec.Replicas { | ||||
| 				t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, exp.Spec.Replicas, rs.Spec.Replicas) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeploymentController_cleanupDeployment(t *testing.T) { | ||||
| 	selector := map[string]string{"foo": "bar"} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		oldRSs               []*exp.ReplicaSet | ||||
| 		revisionHistoryLimit int | ||||
| 		expectedDeletions    int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-2", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-3", 0, 0, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 1, | ||||
| 			expectedDeletions:    2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Only delete the replica set with Spec.Replicas = Status.Replicas = 0. | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-2", 0, 1, selector), | ||||
| 				newRSWithStatus("foo-3", 1, 0, selector), | ||||
| 				newRSWithStatus("foo-4", 1, 1, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 0, | ||||
| 			expectedDeletions:    1, | ||||
| 		}, | ||||
|  | ||||
| 		{ | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 0, 0, selector), | ||||
| 				newRSWithStatus("foo-2", 0, 0, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 0, | ||||
| 			expectedDeletions:    2, | ||||
| 		}, | ||||
| 		{ | ||||
| 			oldRSs: []*exp.ReplicaSet{ | ||||
| 				newRSWithStatus("foo-1", 1, 1, selector), | ||||
| 				newRSWithStatus("foo-2", 1, 1, selector), | ||||
| 			}, | ||||
| 			revisionHistoryLimit: 0, | ||||
| 			expectedDeletions:    0, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i, test := range tests { | ||||
| 		fake := &fake.Clientset{} | ||||
| 		controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) | ||||
|  | ||||
| 		controller.eventRecorder = &record.FakeRecorder{} | ||||
| 		controller.rsStoreSynced = alwaysReady | ||||
| 		controller.podStoreSynced = alwaysReady | ||||
| 		for _, rs := range test.oldRSs { | ||||
| 			controller.rsStore.Add(rs) | ||||
| 		} | ||||
|  | ||||
| 		d := newDeployment(1, &tests[i].revisionHistoryLimit) | ||||
| 		controller.cleanupDeployment(test.oldRSs, d) | ||||
|  | ||||
| 		gotDeletions := 0 | ||||
| 		for _, action := range fake.Actions() { | ||||
| 			if "delete" == action.GetVerb() { | ||||
| 				gotDeletions++ | ||||
| 			} | ||||
| 		} | ||||
| 		if gotDeletions != test.expectedDeletions { | ||||
| 			t.Errorf("expect %v old replica sets been deleted, but got %v", test.expectedDeletions, gotDeletions) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -23,12 +23,123 @@ import ( | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api/annotations" | ||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	deploymentutil "k8s.io/kubernetes/pkg/util/deployment" | ||||
| 	"k8s.io/kubernetes/pkg/util/integer" | ||||
| ) | ||||
|  | ||||
| func maxRevision(allRSs []*extensions.ReplicaSet) int64 { | ||||
| 	max := int64(0) | ||||
| 	for _, rs := range allRSs { | ||||
| 		if v, err := deploymentutil.Revision(rs); err != nil { | ||||
| 			// Skip the replica sets when it failed to parse their revision information | ||||
| 			glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) | ||||
| 		} else if v > max { | ||||
| 			max = v | ||||
| 		} | ||||
| 	} | ||||
| 	return max | ||||
| } | ||||
|  | ||||
| // lastRevision finds the second max revision number in all replica sets (the last revision) | ||||
| func lastRevision(allRSs []*extensions.ReplicaSet) int64 { | ||||
| 	max, secMax := int64(0), int64(0) | ||||
| 	for _, rs := range allRSs { | ||||
| 		if v, err := deploymentutil.Revision(rs); err != nil { | ||||
| 			// Skip the replica sets when it failed to parse their revision information | ||||
| 			glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) | ||||
| 		} else if v >= max { | ||||
| 			secMax = max | ||||
| 			max = v | ||||
| 		} else if v > secMax { | ||||
| 			secMax = v | ||||
| 		} | ||||
| 	} | ||||
| 	return secMax | ||||
| } | ||||
|  | ||||
| // setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and | ||||
| // copying required deployment annotations to it; it returns true if replica set's annotation is changed. | ||||
| func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { | ||||
| 	// First, copy deployment's annotations (except for apply and revision annotations) | ||||
| 	annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) | ||||
| 	// Then, update replica set's revision annotation | ||||
| 	if newRS.Annotations == nil { | ||||
| 		newRS.Annotations = make(map[string]string) | ||||
| 	} | ||||
| 	// The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number | ||||
| 	// of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and | ||||
| 	// newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. | ||||
| 	if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision { | ||||
| 		newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision | ||||
| 		annotationChanged = true | ||||
| 		glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) | ||||
| 	} | ||||
| 	if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) { | ||||
| 		annotationChanged = true | ||||
| 	} | ||||
| 	return annotationChanged | ||||
| } | ||||
|  | ||||
| var annotationsToSkip = map[string]bool{ | ||||
| 	annotations.LastAppliedConfigAnnotation:  true, | ||||
| 	deploymentutil.RevisionAnnotation:        true, | ||||
| 	deploymentutil.DesiredReplicasAnnotation: true, | ||||
| 	deploymentutil.MaxReplicasAnnotation:     true, | ||||
| } | ||||
|  | ||||
| // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key | ||||
| // TODO: How to decide which annotations should / should not be copied? | ||||
| //       See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 | ||||
| func skipCopyAnnotation(key string) bool { | ||||
| 	return annotationsToSkip[key] | ||||
| } | ||||
|  | ||||
| // copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations, | ||||
| // and returns true if replica set's annotation is changed. | ||||
| // Note that apply and revision annotations are not copied. | ||||
| func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool { | ||||
| 	rsAnnotationsChanged := false | ||||
| 	if rs.Annotations == nil { | ||||
| 		rs.Annotations = make(map[string]string) | ||||
| 	} | ||||
| 	for k, v := range deployment.Annotations { | ||||
| 		// newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated | ||||
| 		// by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of | ||||
| 		// deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable. | ||||
| 		if skipCopyAnnotation(k) || rs.Annotations[k] == v { | ||||
| 			continue | ||||
| 		} | ||||
| 		rs.Annotations[k] = v | ||||
| 		rsAnnotationsChanged = true | ||||
| 	} | ||||
| 	return rsAnnotationsChanged | ||||
| } | ||||
|  | ||||
| // setDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations. | ||||
| // This action should be done if and only if the deployment is rolling back to this rs. | ||||
| // Note that apply and revision annotations are not changed. | ||||
| func setDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) { | ||||
| 	deployment.Annotations = getSkippedAnnotations(deployment.Annotations) | ||||
| 	for k, v := range rollbackToRS.Annotations { | ||||
| 		if !skipCopyAnnotation(k) { | ||||
| 			deployment.Annotations[k] = v | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func getSkippedAnnotations(annotations map[string]string) map[string]string { | ||||
| 	skippedAnnotations := make(map[string]string) | ||||
| 	for k, v := range annotations { | ||||
| 		if skipCopyAnnotation(k) { | ||||
| 			skippedAnnotations[k] = v | ||||
| 		} | ||||
| 	} | ||||
| 	return skippedAnnotations | ||||
| } | ||||
|  | ||||
| // findActiveOrLatest returns the only active or the latest replica set in case there is at most one active | ||||
| // replica set. If there are more active replica sets, then we should proportionally scale them. | ||||
| func findActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user