mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	CustomMetrics in HPA controller
This commit is contained in:
		@@ -17,12 +17,14 @@ limitations under the License.
 | 
				
			|||||||
package podautoscaler
 | 
					package podautoscaler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"math"
 | 
						"math"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/resource"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
						"k8s.io/kubernetes/pkg/api/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/apis/extensions"
 | 
						"k8s.io/kubernetes/pkg/apis/extensions"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/client/record"
 | 
						"k8s.io/kubernetes/pkg/client/record"
 | 
				
			||||||
@@ -35,6 +37,9 @@ const (
 | 
				
			|||||||
	// Usage shoud exceed the tolerance before we start downscale or upscale the pods.
 | 
						// Usage shoud exceed the tolerance before we start downscale or upscale the pods.
 | 
				
			||||||
	// TODO: make it a flag or HPA spec element.
 | 
						// TODO: make it a flag or HPA spec element.
 | 
				
			||||||
	tolerance = 0.1
 | 
						tolerance = 0.1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						HpaCustomMetricsDefinitionAnnotationName = "alpha/definiton.custom-metrics.podautoscaler.kubernetes.io"
 | 
				
			||||||
 | 
						HpaCustomMetricsStatusAnnotationName     = "alpha/status.custom-metrics.podautoscaler.kubernetes.io"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type HorizontalController struct {
 | 
					type HorizontalController struct {
 | 
				
			||||||
@@ -93,6 +98,71 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.H
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation as json-serialized
 | 
				
			||||||
 | 
					// extensions.CustomMetricsTargetList.
 | 
				
			||||||
 | 
					// Returns number of replicas, status string (also json-serialized extensions.CustomMetricsCurrentStatusList),
 | 
				
			||||||
 | 
					// last timestamp of the metrics involved in computations or error, if occurred.
 | 
				
			||||||
 | 
					func (a *HorizontalController) computeReplicasForCustomMetrics(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale,
 | 
				
			||||||
 | 
						cmAnnotation string) (int, string, time.Time, error) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						currentReplicas := scale.Status.Replicas
 | 
				
			||||||
 | 
						replicas := 0
 | 
				
			||||||
 | 
						timestamp := time.Time{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if cmAnnotation == "" {
 | 
				
			||||||
 | 
							return 0, "", time.Time{}, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var targetList extensions.CustomMetricTargetList
 | 
				
			||||||
 | 
						if err := json.Unmarshal([]byte(cmAnnotation), &targetList); err != nil {
 | 
				
			||||||
 | 
							return 0, "", time.Time{}, fmt.Errorf("failed to parse custom metrics annotation: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(targetList.Items) == 0 {
 | 
				
			||||||
 | 
							return 0, "", time.Time{}, fmt.Errorf("no custom metrics in annotation")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						statusList := extensions.CustomMetricCurrentStatusList{
 | 
				
			||||||
 | 
							Items: make([]extensions.CustomMetricCurrentStatus, 0),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, customMetricTarget := range targetList.Items {
 | 
				
			||||||
 | 
							value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, scale.Status.Selector)
 | 
				
			||||||
 | 
							// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedGetCustomMetrics", err.Error())
 | 
				
			||||||
 | 
								return 0, "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
 | 
				
			||||||
 | 
							usageRatio := *value / floatTarget
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							replicaCountProposal := 0
 | 
				
			||||||
 | 
							if math.Abs(1.0-usageRatio) > tolerance {
 | 
				
			||||||
 | 
								replicaCountProposal = int(math.Ceil(usageRatio * float64(currentReplicas)))
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								replicaCountProposal = currentReplicas
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if replicaCountProposal > replicas {
 | 
				
			||||||
 | 
								timestamp = currentTimestamp
 | 
				
			||||||
 | 
								replicas = replicaCountProposal
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value))
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return 0, "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							statusList.Items = append(statusList.Items, extensions.CustomMetricCurrentStatus{
 | 
				
			||||||
 | 
								Name:         customMetricTarget.Name,
 | 
				
			||||||
 | 
								CurrentValue: *quantity,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						byteStatusList, err := json.Marshal(statusList)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return 0, "", time.Time{}, fmt.Errorf("failed to serialize custom metric status: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return replicas, string(byteStatusList), timestamp, nil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodAutoscaler) error {
 | 
					func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodAutoscaler) error {
 | 
				
			||||||
	reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Namespace, hpa.Spec.ScaleRef.Name)
 | 
						reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Namespace, hpa.Spec.ScaleRef.Name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -103,10 +173,40 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	currentReplicas := scale.Status.Replicas
 | 
						currentReplicas := scale.Status.Replicas
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	desiredReplicas, currentUtilization, timestamp, err := a.computeReplicasForCPUUtilization(hpa, scale)
 | 
						cpuDesiredReplicas := 0
 | 
				
			||||||
	if err != nil {
 | 
						var cpuCurrentUtilization *int = nil
 | 
				
			||||||
		a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error())
 | 
						cpuTimestamp := time.Time{}
 | 
				
			||||||
		return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
 | 
					
 | 
				
			||||||
 | 
						cmDesiredReplicas := 0
 | 
				
			||||||
 | 
						cmStatus := ""
 | 
				
			||||||
 | 
						cmTimestamp := time.Time{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if hpa.Spec.CPUUtilization != nil {
 | 
				
			||||||
 | 
							cpuDesiredReplicas, cpuCurrentUtilization, cpuTimestamp, err = a.computeReplicasForCPUUtilization(hpa, scale)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error())
 | 
				
			||||||
 | 
								return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if cmAnnotation, cmAnnotationFound := hpa.Annotations[HpaCustomMetricsDefinitionAnnotationName]; cmAnnotationFound {
 | 
				
			||||||
 | 
							cmDesiredReplicas, cmStatus, cmTimestamp, err = a.computeReplicasForCustomMetrics(hpa, scale, cmAnnotation)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeCMReplicas", err.Error())
 | 
				
			||||||
 | 
								return fmt.Errorf("failed to compute desired number of replicas based on Custom Metrics for %s: %v", reference, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						desiredReplicas := 0
 | 
				
			||||||
 | 
						timestamp := time.Time{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if cpuDesiredReplicas > desiredReplicas {
 | 
				
			||||||
 | 
							desiredReplicas = cpuDesiredReplicas
 | 
				
			||||||
 | 
							timestamp = cpuTimestamp
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if cmDesiredReplicas > desiredReplicas {
 | 
				
			||||||
 | 
							desiredReplicas = cmDesiredReplicas
 | 
				
			||||||
 | 
							timestamp = cmTimestamp
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas {
 | 
						if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas {
 | 
				
			||||||
@@ -158,9 +258,13 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
 | 
				
			|||||||
	hpa.Status = extensions.HorizontalPodAutoscalerStatus{
 | 
						hpa.Status = extensions.HorizontalPodAutoscalerStatus{
 | 
				
			||||||
		CurrentReplicas:                 currentReplicas,
 | 
							CurrentReplicas:                 currentReplicas,
 | 
				
			||||||
		DesiredReplicas:                 desiredReplicas,
 | 
							DesiredReplicas:                 desiredReplicas,
 | 
				
			||||||
		CurrentCPUUtilizationPercentage: currentUtilization,
 | 
							CurrentCPUUtilizationPercentage: cpuCurrentUtilization,
 | 
				
			||||||
		LastScaleTime:                   hpa.Status.LastScaleTime,
 | 
							LastScaleTime:                   hpa.Status.LastScaleTime,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if cmStatus != "" {
 | 
				
			||||||
 | 
							hpa.Annotations[HpaCustomMetricsStatusAnnotationName] = cmStatus
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if rescale {
 | 
						if rescale {
 | 
				
			||||||
		now := unversioned.NewTime(time.Now())
 | 
							now := unversioned.NewTime(time.Now())
 | 
				
			||||||
		hpa.Status.LastScaleTime = &now
 | 
							hpa.Status.LastScaleTime = &now
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -62,6 +62,7 @@ type testCase struct {
 | 
				
			|||||||
	CPUTarget           int
 | 
						CPUTarget           int
 | 
				
			||||||
	reportedLevels      []uint64
 | 
						reportedLevels      []uint64
 | 
				
			||||||
	reportedCPURequests []resource.Quantity
 | 
						reportedCPURequests []resource.Quantity
 | 
				
			||||||
 | 
						cmTarget            *extensions.CustomMetricTargetList
 | 
				
			||||||
	scaleUpdated        bool
 | 
						scaleUpdated        bool
 | 
				
			||||||
	eventCreated        bool
 | 
						eventCreated        bool
 | 
				
			||||||
	verifyEvents        bool
 | 
						verifyEvents        bool
 | 
				
			||||||
@@ -101,6 +102,14 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
 | 
				
			|||||||
		if tc.CPUTarget > 0.0 {
 | 
							if tc.CPUTarget > 0.0 {
 | 
				
			||||||
			obj.Items[0].Spec.CPUUtilization = &extensions.CPUTargetUtilization{TargetPercentage: tc.CPUTarget}
 | 
								obj.Items[0].Spec.CPUUtilization = &extensions.CPUTargetUtilization{TargetPercentage: tc.CPUTarget}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							if tc.cmTarget != nil {
 | 
				
			||||||
 | 
								b, err := json.Marshal(tc.cmTarget)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("Failed to marshal cm: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								obj.Items[0].Annotations = make(map[string]string)
 | 
				
			||||||
 | 
								obj.Items[0].Annotations[HpaCustomMetricsDefinitionAnnotationName] = string(b)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return true, obj, nil
 | 
							return true, obj, nil
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -229,6 +238,25 @@ func TestScaleUp(t *testing.T) {
 | 
				
			|||||||
	tc.runTest(t)
 | 
						tc.runTest(t)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestScaleUpCM(t *testing.T) {
 | 
				
			||||||
 | 
						tc := testCase{
 | 
				
			||||||
 | 
							minReplicas:     2,
 | 
				
			||||||
 | 
							maxReplicas:     6,
 | 
				
			||||||
 | 
							initialReplicas: 3,
 | 
				
			||||||
 | 
							desiredReplicas: 4,
 | 
				
			||||||
 | 
							CPUTarget:       0,
 | 
				
			||||||
 | 
							cmTarget: &extensions.CustomMetricTargetList{
 | 
				
			||||||
 | 
								Items: []extensions.CustomMetricTarget{{
 | 
				
			||||||
 | 
									Name:        "qps",
 | 
				
			||||||
 | 
									TargetValue: resource.MustParse("15.0"),
 | 
				
			||||||
 | 
								}},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							reportedLevels:      []uint64{20, 10, 30},
 | 
				
			||||||
 | 
							reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tc.runTest(t)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestScaleDown(t *testing.T) {
 | 
					func TestScaleDown(t *testing.T) {
 | 
				
			||||||
	tc := testCase{
 | 
						tc := testCase{
 | 
				
			||||||
		minReplicas:         2,
 | 
							minReplicas:         2,
 | 
				
			||||||
@@ -242,6 +270,24 @@ func TestScaleDown(t *testing.T) {
 | 
				
			|||||||
	tc.runTest(t)
 | 
						tc.runTest(t)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestScaleDownCM(t *testing.T) {
 | 
				
			||||||
 | 
						tc := testCase{
 | 
				
			||||||
 | 
							minReplicas:     2,
 | 
				
			||||||
 | 
							maxReplicas:     6,
 | 
				
			||||||
 | 
							initialReplicas: 5,
 | 
				
			||||||
 | 
							desiredReplicas: 3,
 | 
				
			||||||
 | 
							CPUTarget:       0,
 | 
				
			||||||
 | 
							cmTarget: &extensions.CustomMetricTargetList{
 | 
				
			||||||
 | 
								Items: []extensions.CustomMetricTarget{{
 | 
				
			||||||
 | 
									Name:        "qps",
 | 
				
			||||||
 | 
									TargetValue: resource.MustParse("20"),
 | 
				
			||||||
 | 
								}}},
 | 
				
			||||||
 | 
							reportedLevels:      []uint64{12, 12, 12, 12, 12},
 | 
				
			||||||
 | 
							reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tc.runTest(t)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestTolerance(t *testing.T) {
 | 
					func TestTolerance(t *testing.T) {
 | 
				
			||||||
	tc := testCase{
 | 
						tc := testCase{
 | 
				
			||||||
		minReplicas:         1,
 | 
							minReplicas:         1,
 | 
				
			||||||
@@ -255,6 +301,23 @@ func TestTolerance(t *testing.T) {
 | 
				
			|||||||
	tc.runTest(t)
 | 
						tc.runTest(t)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestToleranceCM(t *testing.T) {
 | 
				
			||||||
 | 
						tc := testCase{
 | 
				
			||||||
 | 
							minReplicas:     1,
 | 
				
			||||||
 | 
							maxReplicas:     5,
 | 
				
			||||||
 | 
							initialReplicas: 3,
 | 
				
			||||||
 | 
							desiredReplicas: 3,
 | 
				
			||||||
 | 
							cmTarget: &extensions.CustomMetricTargetList{
 | 
				
			||||||
 | 
								Items: []extensions.CustomMetricTarget{{
 | 
				
			||||||
 | 
									Name:        "qps",
 | 
				
			||||||
 | 
									TargetValue: resource.MustParse("20"),
 | 
				
			||||||
 | 
								}}},
 | 
				
			||||||
 | 
							reportedLevels:      []uint64{20, 21, 21},
 | 
				
			||||||
 | 
							reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tc.runTest(t)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestMinReplicas(t *testing.T) {
 | 
					func TestMinReplicas(t *testing.T) {
 | 
				
			||||||
	tc := testCase{
 | 
						tc := testCase{
 | 
				
			||||||
		minReplicas:         2,
 | 
							minReplicas:         2,
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user