mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	HPA: only send updates when the status has changed
This commit only sends updates if the status has actually changed. Since the HPA runs at a regular interval, this should reduce the volume of writes, especially on short HPA intervals with relatively constant metrics.
This commit is contained in:
		@@ -32,6 +32,7 @@ go_library(
 | 
			
		||||
        "//pkg/controller:go_default_library",
 | 
			
		||||
        "//pkg/controller/podautoscaler/metrics:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/resource"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
@@ -371,6 +372,12 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
 | 
			
		||||
		return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
 | 
			
		||||
	}
 | 
			
		||||
	hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
 | 
			
		||||
	hpaStatusOriginalRaw, err := api.Scheme.DeepCopy(&hpa.Status)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		a.eventRecorder.Event(hpav1Shared, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
 | 
			
		||||
		return fmt.Errorf("failed to deep-copy the HPA status: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	hpaStatusOriginal := hpaStatusOriginalRaw.(*autoscalingv2.HorizontalPodAutoscalerStatus)
 | 
			
		||||
 | 
			
		||||
	reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
 | 
			
		||||
 | 
			
		||||
@@ -378,7 +385,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
 | 
			
		||||
		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
 | 
			
		||||
		a.update(hpa)
 | 
			
		||||
		a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
 | 
			
		||||
		return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
 | 
			
		||||
	}
 | 
			
		||||
	setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
 | 
			
		||||
@@ -412,7 +419,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
 | 
			
		||||
	} else {
 | 
			
		||||
		metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			a.updateCurrentReplicasInStatus(hpa, currentReplicas)
 | 
			
		||||
			a.setCurrentReplicasInStatus(hpa, currentReplicas)
 | 
			
		||||
			if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
 | 
			
		||||
				utilruntime.HandleError(err)
 | 
			
		||||
			}
 | 
			
		||||
			a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
 | 
			
		||||
			return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -489,7 +499,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
 | 
			
		||||
			setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
 | 
			
		||||
			a.updateCurrentReplicasInStatus(hpa, currentReplicas)
 | 
			
		||||
			a.setCurrentReplicasInStatus(hpa, currentReplicas)
 | 
			
		||||
			if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
 | 
			
		||||
				utilruntime.HandleError(err)
 | 
			
		||||
			}
 | 
			
		||||
			return fmt.Errorf("failed to rescale %s: %v", reference, err)
 | 
			
		||||
		}
 | 
			
		||||
		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
 | 
			
		||||
@@ -501,7 +514,8 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
 | 
			
		||||
		desiredReplicas = currentReplicas
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return a.updateStatusWithReplicas(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
 | 
			
		||||
	a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
 | 
			
		||||
	return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool {
 | 
			
		||||
@@ -528,14 +542,14 @@ func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutos
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
 | 
			
		||||
	err := a.updateStatusWithReplicas(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		utilruntime.HandleError(err)
 | 
			
		||||
	}
 | 
			
		||||
// setCurrentReplicasInStatus sets the current replica count in the status of the HPA.
 | 
			
		||||
func (a *HorizontalController) setCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
 | 
			
		||||
	a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) error {
 | 
			
		||||
// setStatus recreates the status of the given HPA, updating the current and
 | 
			
		||||
// desired replicas, as well as the metric statuses
 | 
			
		||||
func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
 | 
			
		||||
	hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
 | 
			
		||||
		CurrentReplicas: currentReplicas,
 | 
			
		||||
		DesiredReplicas: desiredReplicas,
 | 
			
		||||
@@ -548,11 +562,19 @@ func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.Horiz
 | 
			
		||||
		now := metav1.NewTime(time.Now())
 | 
			
		||||
		hpa.Status.LastScaleTime = &now
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return a.update(hpa)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *HorizontalController) update(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
 | 
			
		||||
// updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
 | 
			
		||||
func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
 | 
			
		||||
	// skip a write if we wouldn't need to update
 | 
			
		||||
	if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return a.updateStatus(newHPA)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// updateStatus actually does the update request for the status of the given HPA
 | 
			
		||||
func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
 | 
			
		||||
	// convert back to autoscalingv1
 | 
			
		||||
	hpaRaw, err := UnsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -538,7 +538,7 @@ func (tc *testCase) verifyResults(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tc *testCase) runTest(t *testing.T) {
 | 
			
		||||
func (tc *testCase) setupController(t *testing.T) (*HorizontalController, informers.SharedInformerFactory) {
 | 
			
		||||
	testClient, testMetricsClient, testCMClient := tc.prepareTestClient(t)
 | 
			
		||||
	if tc.testClient != nil {
 | 
			
		||||
		testClient = tc.testClient
 | 
			
		||||
@@ -598,6 +598,10 @@ func (tc *testCase) runTest(t *testing.T) {
 | 
			
		||||
	)
 | 
			
		||||
	hpaController.hpaListerSynced = alwaysReady
 | 
			
		||||
 | 
			
		||||
	return hpaController, informerFactory
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tc *testCase) runTestWithController(t *testing.T, hpaController *HorizontalController, informerFactory informers.SharedInformerFactory) {
 | 
			
		||||
	stop := make(chan struct{})
 | 
			
		||||
	defer close(stop)
 | 
			
		||||
	informerFactory.Start(stop)
 | 
			
		||||
@@ -616,6 +620,11 @@ func (tc *testCase) runTest(t *testing.T) {
 | 
			
		||||
	tc.verifyResults(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tc *testCase) runTest(t *testing.T) {
 | 
			
		||||
	hpaController, informerFactory := tc.setupController(t)
 | 
			
		||||
	tc.runTestWithController(t, hpaController, informerFactory)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestScaleUp(t *testing.T) {
 | 
			
		||||
	tc := testCase{
 | 
			
		||||
		minReplicas:         2,
 | 
			
		||||
@@ -1594,4 +1603,73 @@ func TestScaleDownRCImmediately(t *testing.T) {
 | 
			
		||||
	tc.runTest(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAvoidUncessaryUpdates(t *testing.T) {
 | 
			
		||||
	tc := testCase{
 | 
			
		||||
		minReplicas:          2,
 | 
			
		||||
		maxReplicas:          6,
 | 
			
		||||
		initialReplicas:      3,
 | 
			
		||||
		desiredReplicas:      3,
 | 
			
		||||
		CPUTarget:            30,
 | 
			
		||||
		CPUCurrent:           40,
 | 
			
		||||
		verifyCPUCurrent:     true,
 | 
			
		||||
		reportedLevels:       []uint64{400, 500, 700},
 | 
			
		||||
		reportedCPURequests:  []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
 | 
			
		||||
		reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionFalse, v1.ConditionFalse},
 | 
			
		||||
		useMetricsApi:        true,
 | 
			
		||||
	}
 | 
			
		||||
	testClient, _, _ := tc.prepareTestClient(t)
 | 
			
		||||
	tc.testClient = testClient
 | 
			
		||||
	var savedHPA *autoscalingv1.HorizontalPodAutoscaler
 | 
			
		||||
	testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
 | 
			
		||||
		tc.Lock()
 | 
			
		||||
		defer tc.Unlock()
 | 
			
		||||
 | 
			
		||||
		if savedHPA != nil {
 | 
			
		||||
			// fake out the verification logic and mark that we're done processing
 | 
			
		||||
			go func() {
 | 
			
		||||
				// wait a tick and then mark that we're finished (otherwise, we have no
 | 
			
		||||
				// way to indicate that we're finished, because the function decides not to do anything)
 | 
			
		||||
				time.Sleep(1 * time.Second)
 | 
			
		||||
				tc.statusUpdated = true
 | 
			
		||||
				tc.processed <- "test-hpa"
 | 
			
		||||
			}()
 | 
			
		||||
			return true, &autoscalingv1.HorizontalPodAutoscalerList{
 | 
			
		||||
				Items: []autoscalingv1.HorizontalPodAutoscaler{*savedHPA},
 | 
			
		||||
			}, nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// fallthrough
 | 
			
		||||
		return false, nil, nil
 | 
			
		||||
	})
 | 
			
		||||
	testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
 | 
			
		||||
		tc.Lock()
 | 
			
		||||
		defer tc.Unlock()
 | 
			
		||||
 | 
			
		||||
		if savedHPA == nil {
 | 
			
		||||
			// save the HPA and return it
 | 
			
		||||
			savedHPA = action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler)
 | 
			
		||||
			return true, savedHPA, nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		assert.Fail(t, "should not have attempted to update the HPA when nothing changed")
 | 
			
		||||
		// mark that we've processed this HPA
 | 
			
		||||
		tc.processed <- ""
 | 
			
		||||
		return true, nil, fmt.Errorf("unexpected call")
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	controller, informerFactory := tc.setupController(t)
 | 
			
		||||
 | 
			
		||||
	// fake an initial processing loop to populate savedHPA
 | 
			
		||||
	initialHPAs, err := testClient.Autoscaling().HorizontalPodAutoscalers("test-namespace").List(metav1.ListOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := controller.reconcileAutoscaler(&initialHPAs.Items[0]); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// actually run the test
 | 
			
		||||
	tc.runTestWithController(t, controller, informerFactory)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO: add more tests
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user