mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #30983 from mwielgus/planer2
Automatic merge from submit-queue Support for preexisting replicas and estimated capacity in federated replicaset controller With this PR the planer will be able to: * Keep already existing replicas in their current clusters if rebalance = false and min/max boundaries are met. * Limit the number of replicas in a cluster to the level that was measured by the count of running and unschedulable pods. And provide an estimate how much more pods would be nice to put in a cluster so that if they are scheduled we will be closer to the desired layout or to schedule the desired number of replicas at all. cc: @quinton-hoole @jianhuiz @wojtek-t @kubernetes/sig-cluster-federation
This commit is contained in:
		@@ -17,6 +17,7 @@ limitations under the License.
 | 
			
		||||
package planer
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math"
 | 
			
		||||
	"sort"
 | 
			
		||||
 | 
			
		||||
	fed_api "k8s.io/kubernetes/federation/apis/federation"
 | 
			
		||||
@@ -54,9 +55,18 @@ func NewPlanner(preferences *fed_api.FederatedReplicaSetPreferences) *Planner {
 | 
			
		||||
// sum of MinReplicas for all cluster is bigger thant replicasToDistribute then some cluster will not
 | 
			
		||||
// have all of the replicas assigned. In such case a cluster with higher weight has priority over
 | 
			
		||||
// cluster with lower weight (or with lexicographically smaller name in case of draw).
 | 
			
		||||
func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) map[string]int64 {
 | 
			
		||||
// It can also use the current replica count and estimated capacity to provide better planning and
 | 
			
		||||
// adhere to rebalance policy.
 | 
			
		||||
// Two maps are returned:
 | 
			
		||||
// * a map that contains information how many replicas will be possible to run in a cluster.
 | 
			
		||||
// * a map that contains information how many extra replicas would be nice to schedule in a cluster so,
 | 
			
		||||
//   if by chance, they are scheudled we will be closer to the desired replicas layout.
 | 
			
		||||
func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string, currentReplicaCount map[string]int64,
 | 
			
		||||
	estimatedCapacity map[string]int64) (map[string]int64, map[string]int64) {
 | 
			
		||||
 | 
			
		||||
	preferences := make([]*namedClusterReplicaSetPreferences, 0, len(availableClusters))
 | 
			
		||||
	plan := make(map[string]int64, len(preferences))
 | 
			
		||||
	overflow := make(map[string]int64, len(preferences))
 | 
			
		||||
 | 
			
		||||
	named := func(name string, pref fed_api.ClusterReplicaSetPreferences) *namedClusterReplicaSetPreferences {
 | 
			
		||||
		return &namedClusterReplicaSetPreferences{
 | 
			
		||||
@@ -83,19 +93,59 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m
 | 
			
		||||
	// Assign each cluster the minimum number of replicas it requested.
 | 
			
		||||
	for _, preference := range preferences {
 | 
			
		||||
		min := minInt64(preference.MinReplicas, remainingReplicas)
 | 
			
		||||
		if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
 | 
			
		||||
			min = minInt64(min, capacity)
 | 
			
		||||
		}
 | 
			
		||||
		remainingReplicas -= min
 | 
			
		||||
		plan[preference.clusterName] = min
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// This map contains information how many replicas were assigned to
 | 
			
		||||
	// the cluster based only on the current replica count and
 | 
			
		||||
	// rebalance=false preference. It will be later used in remaining replica
 | 
			
		||||
	// distribution code.
 | 
			
		||||
	preallocated := make(map[string]int64)
 | 
			
		||||
 | 
			
		||||
	if p.preferences.Rebalance == false {
 | 
			
		||||
		for _, preference := range preferences {
 | 
			
		||||
			planned := plan[preference.clusterName]
 | 
			
		||||
			count, hasSome := currentReplicaCount[preference.clusterName]
 | 
			
		||||
			if hasSome && count > planned {
 | 
			
		||||
				target := count
 | 
			
		||||
				if preference.MaxReplicas != nil {
 | 
			
		||||
					target = minInt64(*preference.MaxReplicas, target)
 | 
			
		||||
				}
 | 
			
		||||
				if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity {
 | 
			
		||||
					target = minInt64(capacity, target)
 | 
			
		||||
				}
 | 
			
		||||
				extra := minInt64(target-planned, remainingReplicas)
 | 
			
		||||
				if extra < 0 {
 | 
			
		||||
					extra = 0
 | 
			
		||||
				}
 | 
			
		||||
				remainingReplicas -= extra
 | 
			
		||||
				preallocated[preference.clusterName] = extra
 | 
			
		||||
				plan[preference.clusterName] = extra + planned
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	modified := true
 | 
			
		||||
 | 
			
		||||
	// It is possible single pass of the loop is not enough to distribue all replicas among clusters due
 | 
			
		||||
	// to weight, max and rounding corner cases. In such case we iterate until either
 | 
			
		||||
	// there is no replicas or no cluster gets any more replicas or the number
 | 
			
		||||
	// of attempts is less than available cluster count. Every loop either distributes all remainingReplicas
 | 
			
		||||
	// or maxes out at least one cluster.
 | 
			
		||||
	// TODO: This algorithm is O(clusterCount^2). When needed use sweep-like algorithm for O(n log n).
 | 
			
		||||
	for trial := 0; trial < len(availableClusters) && modified && remainingReplicas > 0; trial++ {
 | 
			
		||||
	// of attempts is less than available cluster count. If there is no preallocated pods
 | 
			
		||||
	// every loop either distributes all remainingReplicas or maxes out at least one cluster.
 | 
			
		||||
	// If there are preallocated then the replica spreading may take longer.
 | 
			
		||||
	// We reduce the number of pending preallocated replicas by at least half with each iteration so
 | 
			
		||||
	// we may need log(replicasAtStart) iterations.
 | 
			
		||||
	// TODO: Prove that clusterCount * log(replicas) iterations solves the problem or adjust the number.
 | 
			
		||||
	// TODO: This algorithm is O(clusterCount^2 * log(replicas)) which is good for up to 100 clusters.
 | 
			
		||||
	// Find something faster.
 | 
			
		||||
	replicasAtStart := remainingReplicas
 | 
			
		||||
	for trial := 0; trial < int(2*(1+math.Log(float64(replicasAtStart)))*float64(len(availableClusters))) &&
 | 
			
		||||
		modified && remainingReplicas > 0; trial++ {
 | 
			
		||||
 | 
			
		||||
		modified = false
 | 
			
		||||
		weightSum := int64(0)
 | 
			
		||||
		for _, preference := range preferences {
 | 
			
		||||
@@ -104,21 +154,41 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m
 | 
			
		||||
		newPreferences := make([]*namedClusterReplicaSetPreferences, 0, len(preferences))
 | 
			
		||||
 | 
			
		||||
		distributeInThisLoop := remainingReplicas
 | 
			
		||||
 | 
			
		||||
		for _, preference := range preferences {
 | 
			
		||||
			if weightSum > 0 {
 | 
			
		||||
				start := plan[preference.clusterName]
 | 
			
		||||
				// Distribute the remaining replicas, rounding fractions always up.
 | 
			
		||||
				extra := (distributeInThisLoop*preference.Weight + weightSum - 1) / weightSum
 | 
			
		||||
				extra = minInt64(extra, remainingReplicas)
 | 
			
		||||
 | 
			
		||||
				// Account preallocated.
 | 
			
		||||
				prealloc := preallocated[preference.clusterName]
 | 
			
		||||
				usedPrealloc := minInt64(extra, prealloc)
 | 
			
		||||
				preallocated[preference.clusterName] = prealloc - usedPrealloc
 | 
			
		||||
				extra = extra - usedPrealloc
 | 
			
		||||
				if usedPrealloc > 0 {
 | 
			
		||||
					modified = true
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// In total there should be the amount that was there at start plus whatever is due
 | 
			
		||||
				// in this iteration
 | 
			
		||||
				total := start + extra
 | 
			
		||||
 | 
			
		||||
				// Check if we don't overflow the cluster, and if yes don't consider this cluster
 | 
			
		||||
				// in any of the following iterations.
 | 
			
		||||
				full := false
 | 
			
		||||
				if preference.MaxReplicas != nil && total > *preference.MaxReplicas {
 | 
			
		||||
					total = *preference.MaxReplicas
 | 
			
		||||
				} else {
 | 
			
		||||
					full = true
 | 
			
		||||
				}
 | 
			
		||||
				if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity {
 | 
			
		||||
					overflow[preference.clusterName] = total - capacity
 | 
			
		||||
					total = capacity
 | 
			
		||||
					full = true
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if !full {
 | 
			
		||||
					newPreferences = append(newPreferences, preference)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
@@ -137,7 +207,20 @@ func (p *Planner) Plan(replicasToDistribute int64, availableClusters []string) m
 | 
			
		||||
		preferences = newPreferences
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return plan
 | 
			
		||||
	if p.preferences.Rebalance {
 | 
			
		||||
		return plan, overflow
 | 
			
		||||
	} else {
 | 
			
		||||
		// If rebalance = false then overflow is trimmed at the level
 | 
			
		||||
		// of replicas that it failed to place somewhere.
 | 
			
		||||
		newOverflow := make(map[string]int64)
 | 
			
		||||
		for key, value := range overflow {
 | 
			
		||||
			value = minInt64(value, remainingReplicas)
 | 
			
		||||
			if value > 0 {
 | 
			
		||||
				newOverflow[key] = value
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return plan, newOverflow
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func minInt64(a int64, b int64) int64 {
 | 
			
		||||
 
 | 
			
		||||
@@ -28,8 +28,33 @@ func doCheck(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences,
 | 
			
		||||
	planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
 | 
			
		||||
		Clusters: pref,
 | 
			
		||||
	})
 | 
			
		||||
	plan := planer.Plan(replicas, clusters)
 | 
			
		||||
	plan, overflow := planer.Plan(replicas, clusters, map[string]int64{}, map[string]int64{})
 | 
			
		||||
	assert.EqualValues(t, expected, plan)
 | 
			
		||||
	assert.Equal(t, 0, len(overflow))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func doCheckWithExisting(t *testing.T, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string,
 | 
			
		||||
	existing map[string]int64, expected map[string]int64) {
 | 
			
		||||
	planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
 | 
			
		||||
		Clusters: pref,
 | 
			
		||||
	})
 | 
			
		||||
	plan, overflow := planer.Plan(replicas, clusters, existing, map[string]int64{})
 | 
			
		||||
	assert.Equal(t, 0, len(overflow))
 | 
			
		||||
	assert.EqualValues(t, expected, plan)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func doCheckWithExistingAndCapacity(t *testing.T, rebalance bool, pref map[string]fed_api.ClusterReplicaSetPreferences, replicas int64, clusters []string,
 | 
			
		||||
	existing map[string]int64,
 | 
			
		||||
	capacity map[string]int64,
 | 
			
		||||
	expected map[string]int64,
 | 
			
		||||
	expectedOverflow map[string]int64) {
 | 
			
		||||
	planer := NewPlanner(&fed_api.FederatedReplicaSetPreferences{
 | 
			
		||||
		Rebalance: rebalance,
 | 
			
		||||
		Clusters:  pref,
 | 
			
		||||
	})
 | 
			
		||||
	plan, overflow := planer.Plan(replicas, clusters, existing, capacity)
 | 
			
		||||
	assert.EqualValues(t, expected, plan)
 | 
			
		||||
	assert.Equal(t, expectedOverflow, overflow)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func pint(val int64) *int64 {
 | 
			
		||||
@@ -68,6 +93,132 @@ func TestEqual(t *testing.T) {
 | 
			
		||||
		map[string]int64{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestEqualWithExisting(t *testing.T) {
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		50, []string{"A", "B", "C"},
 | 
			
		||||
		map[string]int64{"C": 30},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 10, "C": 30})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		50, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{"A": 30},
 | 
			
		||||
		map[string]int64{"A": 30, "B": 20})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		500000, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{"A": 300000},
 | 
			
		||||
		map[string]int64{"A": 300000, "B": 200000})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		50, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{"A": 10},
 | 
			
		||||
		map[string]int64{"A": 25, "B": 25})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		50, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 70},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 40})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		1, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{"A": 30},
 | 
			
		||||
		map[string]int64{"A": 1, "B": 0})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExisting(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		50, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 20},
 | 
			
		||||
		map[string]int64{"A": 25, "B": 25})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWithExistingAndCapacity(t *testing.T) {
 | 
			
		||||
	// desired without capacity: map[string]int64{"A": 17, "B": 17, "C": 16})
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {Weight: 1}},
 | 
			
		||||
		50, []string{"A", "B", "C"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"C": 10},
 | 
			
		||||
		map[string]int64{"A": 20, "B": 20, "C": 10},
 | 
			
		||||
		map[string]int64{"C": 6})
 | 
			
		||||
 | 
			
		||||
	// desired B:50 C:0
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"A": {Weight: 10000},
 | 
			
		||||
		"B": {Weight: 1}},
 | 
			
		||||
		50, []string{"B", "C"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"B": 10},
 | 
			
		||||
		map[string]int64{"B": 10, "C": 0},
 | 
			
		||||
		map[string]int64{"B": 40},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// desired A:20 B:40
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"A": {Weight: 1},
 | 
			
		||||
		"B": {Weight: 2}},
 | 
			
		||||
		60, []string{"A", "B", "C"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"B": 10},
 | 
			
		||||
		map[string]int64{"A": 50, "B": 10, "C": 0},
 | 
			
		||||
		map[string]int64{"B": 30})
 | 
			
		||||
 | 
			
		||||
	// map[string]int64{"A": 10, "B": 30, "C": 21, "D": 10})
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"A": {Weight: 10000, MaxReplicas: pint(10)},
 | 
			
		||||
		"B": {Weight: 1},
 | 
			
		||||
		"C": {Weight: 1, MaxReplicas: pint(21)},
 | 
			
		||||
		"D": {Weight: 1, MaxReplicas: pint(10)}},
 | 
			
		||||
		71, []string{"A", "B", "C", "D"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"C": 10},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 41, "C": 10, "D": 10},
 | 
			
		||||
		map[string]int64{"C": 11},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// desired A:20 B:20
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"A": {Weight: 1},
 | 
			
		||||
		"B": {Weight: 1}},
 | 
			
		||||
		60, []string{"A", "B", "C"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 10},
 | 
			
		||||
		map[string]int64{"A": 10, "B": 10, "C": 0},
 | 
			
		||||
		map[string]int64{"A": 20, "B": 20})
 | 
			
		||||
 | 
			
		||||
	// desired A:10 B:50 although A:50 B:10 is fuly acceptable because rebalance = false
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"A": {Weight: 1},
 | 
			
		||||
		"B": {Weight: 5}},
 | 
			
		||||
		60, []string{"A", "B", "C"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"B": 10},
 | 
			
		||||
		map[string]int64{"A": 50, "B": 10, "C": 0},
 | 
			
		||||
		map[string]int64{})
 | 
			
		||||
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, false, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {MinReplicas: 20, Weight: 0}},
 | 
			
		||||
		50, []string{"A", "B", "C"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"B": 10},
 | 
			
		||||
		map[string]int64{"A": 20, "B": 10, "C": 20},
 | 
			
		||||
		map[string]int64{})
 | 
			
		||||
 | 
			
		||||
	// Actually we would like to have extra 20 in B but 15 is also good.
 | 
			
		||||
	doCheckWithExistingAndCapacity(t, true, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {MinReplicas: 20, Weight: 1}},
 | 
			
		||||
		60, []string{"A", "B"},
 | 
			
		||||
		map[string]int64{},
 | 
			
		||||
		map[string]int64{"B": 10},
 | 
			
		||||
		map[string]int64{"A": 50, "B": 10},
 | 
			
		||||
		map[string]int64{"B": 15})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMin(t *testing.T) {
 | 
			
		||||
	doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"*": {MinReplicas: 2, Weight: 0}},
 | 
			
		||||
@@ -104,7 +255,6 @@ func TestMax(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWeight(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	doCheck(t, map[string]fed_api.ClusterReplicaSetPreferences{
 | 
			
		||||
		"A": {Weight: 1},
 | 
			
		||||
		"B": {Weight: 2}},
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user