mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			236 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2016 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package deployment
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 
 | |
| 	apps "k8s.io/api/apps/v1"
 | |
| 	"k8s.io/klog/v2"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| 	deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
 | |
| )
 | |
| 
 | |
| // rolloutRolling implements the logic for rolling a new replica set.
 | |
| func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
 | |
| 	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	allRSs := append(oldRSs, newRS)
 | |
| 
 | |
| 	// Scale up, if we can.
 | |
| 	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if scaledUp {
 | |
| 		// Update DeploymentStatus
 | |
| 		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 | |
| 	}
 | |
| 
 | |
| 	// Scale down, if we can.
 | |
| 	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if scaledDown {
 | |
| 		// Update DeploymentStatus
 | |
| 		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 | |
| 	}
 | |
| 
 | |
| 	if deploymentutil.DeploymentComplete(d, &d.Status) {
 | |
| 		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Sync deployment status
 | |
| 	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
 | |
| 	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
 | |
| 		// Scaling not required.
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
 | |
| 		// Scale down.
 | |
| 		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
 | |
| 		return scaled, err
 | |
| 	}
 | |
| 	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
 | |
| 	if err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
 | |
| 	return scaled, err
 | |
| }
 | |
| 
 | |
| func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
 | |
| 	if oldPodsCount == 0 {
 | |
| 		// Can't scale down further
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
 | |
| 	logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas)
 | |
| 	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
 | |
| 
 | |
| 	// Check if we can scale down. We can scale down in the following 2 cases:
 | |
| 	// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
 | |
| 	//  increase unavailability.
 | |
| 	// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
 | |
| 	//
 | |
| 	// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
 | |
| 	// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
 | |
| 	// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
 | |
| 	// step(that will increase unavailability).
 | |
| 	//
 | |
| 	// Concrete example:
 | |
| 	//
 | |
| 	// * 10 replicas
 | |
| 	// * 2 maxUnavailable (absolute number, not percent)
 | |
| 	// * 3 maxSurge (absolute number, not percent)
 | |
| 	//
 | |
| 	// case 1:
 | |
| 	// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
 | |
| 	// * The new replica set pods crashloop and never become available.
 | |
| 	// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
 | |
| 	// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
 | |
| 	// * The user notices the crashloop and does kubectl rollout undo to rollback.
 | |
| 	// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
 | |
| 	// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
 | |
| 	//
 | |
| 	// case 2:
 | |
| 	// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
 | |
| 	// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
 | |
| 	// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
 | |
| 	// allow the new replica set to be scaled up by 5.
 | |
| 	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
 | |
| 	newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
 | |
| 	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
 | |
| 	if maxScaledDown <= 0 {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 
 | |
| 	// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
 | |
| 	// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
 | |
| 	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
 | |
| 	if err != nil {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount)
 | |
| 
 | |
| 	// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
 | |
| 	allRSs = append(oldRSs, newRS)
 | |
| 	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
 | |
| 	if err != nil {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount)
 | |
| 
 | |
| 	totalScaledDown := cleanupCount + scaledDownCount
 | |
| 	return totalScaledDown > 0, nil
 | |
| }
 | |
| 
 | |
| // cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
 | |
| func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
 | |
| 	// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
 | |
| 	// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
 | |
| 	// been deleted first and won't increase unavailability.
 | |
| 	totalScaledDown := int32(0)
 | |
| 	for i, targetRS := range oldRSs {
 | |
| 		if totalScaledDown >= maxCleanupCount {
 | |
| 			break
 | |
| 		}
 | |
| 		if *(targetRS.Spec.Replicas) == 0 {
 | |
| 			// cannot scale down this replica set.
 | |
| 			continue
 | |
| 		}
 | |
| 		logger.V(4).Info("Found available pods in old RS", "replicaSet", klog.KObj(targetRS), "availableReplicas", targetRS.Status.AvailableReplicas)
 | |
| 		if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas {
 | |
| 			// no unhealthy replicas found, no scaling required.
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		scaledDownCount := min(maxCleanupCount-totalScaledDown, *(targetRS.Spec.Replicas)-targetRS.Status.AvailableReplicas)
 | |
| 		newReplicasCount := *(targetRS.Spec.Replicas) - scaledDownCount
 | |
| 		if newReplicasCount > *(targetRS.Spec.Replicas) {
 | |
| 			return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
 | |
| 		}
 | |
| 		_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
 | |
| 		if err != nil {
 | |
| 			return nil, totalScaledDown, err
 | |
| 		}
 | |
| 		totalScaledDown += scaledDownCount
 | |
| 		oldRSs[i] = updatedOldRS
 | |
| 	}
 | |
| 	return oldRSs, totalScaledDown, nil
 | |
| }
 | |
| 
 | |
| // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
 | |
| // Need check maxUnavailable to ensure availability
 | |
| func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
 | |
| 	logger := klog.FromContext(ctx)
 | |
| 	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
 | |
| 
 | |
| 	// Check if we can scale down.
 | |
| 	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
 | |
| 	// Find the number of available pods.
 | |
| 	availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
 | |
| 	if availablePodCount <= minAvailable {
 | |
| 		// Cannot scale down.
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 	logger.V(4).Info("Found available pods in deployment, scaling down old RSes", "deployment", klog.KObj(deployment), "availableReplicas", availablePodCount)
 | |
| 
 | |
| 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
 | |
| 
 | |
| 	totalScaledDown := int32(0)
 | |
| 	totalScaleDownCount := availablePodCount - minAvailable
 | |
| 	for _, targetRS := range oldRSs {
 | |
| 		if totalScaledDown >= totalScaleDownCount {
 | |
| 			// No further scaling required.
 | |
| 			break
 | |
| 		}
 | |
| 		if *(targetRS.Spec.Replicas) == 0 {
 | |
| 			// cannot scale down this ReplicaSet.
 | |
| 			continue
 | |
| 		}
 | |
| 		// Scale down.
 | |
| 		scaleDownCount := min(*(targetRS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
 | |
| 		newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
 | |
| 		if newReplicasCount > *(targetRS.Spec.Replicas) {
 | |
| 			return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
 | |
| 		}
 | |
| 		_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
 | |
| 		if err != nil {
 | |
| 			return totalScaledDown, err
 | |
| 		}
 | |
| 
 | |
| 		totalScaledDown += scaleDownCount
 | |
| 	}
 | |
| 
 | |
| 	return totalScaledDown, nil
 | |
| }
 | 
