mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #13303 from mwielgus/hpa_heapster_interface
MetricsClient for HorizontalPodAutoscaler
This commit is contained in:
		@@ -36,6 +36,7 @@ import (
 | 
			
		||||
	clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/autoscaler"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/autoscaler/metrics"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/endpoint"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/namespace"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/node"
 | 
			
		||||
@@ -291,7 +292,8 @@ func (s *CMServer) Run(_ []string) error {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Fatalf("Invalid API configuration: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient)
 | 
			
		||||
		horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient,
 | 
			
		||||
			metrics.NewHeapsterMetricsClient(kubeClient))
 | 
			
		||||
		horizontalPodAutoscalerController.Run(s.HorizontalPodAutoscalerSyncPeriod)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -17,64 +17,33 @@ limitations under the License.
 | 
			
		||||
package autoscalercontroller
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/resource"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/autoscaler/metrics"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/expapi"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/fields"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/labels"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util"
 | 
			
		||||
 | 
			
		||||
	heapster "k8s.io/heapster/api/v1/types"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	heapsterNamespace = "kube-system"
 | 
			
		||||
	heapsterService   = "monitoring-heapster"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type HorizontalPodAutoscalerController struct {
 | 
			
		||||
	client        client.Interface
 | 
			
		||||
	expClient     client.ExperimentalInterface
 | 
			
		||||
	metricsClient metrics.MetricsClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Aggregates results into ResourceConsumption. Also returns number of
 | 
			
		||||
// pods included in the aggregation.
 | 
			
		||||
type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int)
 | 
			
		||||
 | 
			
		||||
type metricDefinition struct {
 | 
			
		||||
	name       string
 | 
			
		||||
	aggregator metricAggregator
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var resourceDefinitions = map[api.ResourceName]metricDefinition{
 | 
			
		||||
	//TODO: add memory
 | 
			
		||||
	api.ResourceCPU: {"cpu-usage",
 | 
			
		||||
		func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) {
 | 
			
		||||
			sum, count := calculateSumFromLatestSample(metrics)
 | 
			
		||||
			value := "0"
 | 
			
		||||
			if count > 0 {
 | 
			
		||||
				// assumes that cpu usage is in millis
 | 
			
		||||
				value = fmt.Sprintf("%dm", sum/uint64(count))
 | 
			
		||||
			}
 | 
			
		||||
			return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
 | 
			
		||||
		}},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var heapsterQueryStart, _ = time.ParseDuration("-5m")
 | 
			
		||||
var downscaleForbiddenWindow, _ = time.ParseDuration("20m")
 | 
			
		||||
var upscaleForbiddenWindow, _ = time.ParseDuration("3m")
 | 
			
		||||
 | 
			
		||||
func New(client client.Interface, expClient client.ExperimentalInterface) *HorizontalPodAutoscalerController {
 | 
			
		||||
func New(client client.Interface, expClient client.ExperimentalInterface, metricsClient metrics.MetricsClient) *HorizontalPodAutoscalerController {
 | 
			
		||||
	return &HorizontalPodAutoscalerController{
 | 
			
		||||
		client:        client,
 | 
			
		||||
		expClient:     expClient,
 | 
			
		||||
		metricsClient: metricsClient,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -100,57 +69,18 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
 | 
			
		||||
			glog.Warningf("Failed to query scale subresource for %s: %v", reference, err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		podList, err := a.client.Pods(hpa.Spec.ScaleRef.Namespace).
 | 
			
		||||
			List(labels.SelectorFromSet(labels.Set(scale.Status.Selector)), fields.Everything())
 | 
			
		||||
		currentReplicas := scale.Status.Replicas
 | 
			
		||||
		currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource,
 | 
			
		||||
			scale.Status.Selector)
 | 
			
		||||
 | 
			
		||||
		// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Warningf("Failed to get pod list for %s: %v", reference, err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		podNames := []string{}
 | 
			
		||||
		for _, pod := range podList.Items {
 | 
			
		||||
			podNames = append(podNames, pod.Name)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		metricSpec, metricDefined := resourceDefinitions[hpa.Spec.Target.Resource]
 | 
			
		||||
		if !metricDefined {
 | 
			
		||||
			glog.Warningf("Heapster metric not defined for %s %v", reference, hpa.Spec.Target.Resource)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
 | 
			
		||||
		startTime := now.Add(heapsterQueryStart)
 | 
			
		||||
		metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
 | 
			
		||||
			hpa.Spec.ScaleRef.Namespace,
 | 
			
		||||
			strings.Join(podNames, ","),
 | 
			
		||||
			metricSpec.name)
 | 
			
		||||
 | 
			
		||||
		resultRaw, err := a.client.Services(heapsterNamespace).
 | 
			
		||||
			ProxyGet(heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
 | 
			
		||||
			DoRaw()
 | 
			
		||||
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Warningf("Failed to get pods metrics for %s: %v", reference, err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var metrics heapster.MetricResultList
 | 
			
		||||
		err = json.Unmarshal(resultRaw, &metrics)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Warningf("Failed to unmarshall heapster response: %v", err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		glog.Infof("Metrics available for %s: %s", reference, string(resultRaw))
 | 
			
		||||
 | 
			
		||||
		currentConsumption, count := metricSpec.aggregator(metrics)
 | 
			
		||||
		if count != len(podList.Items) {
 | 
			
		||||
			glog.Warningf("Metrics obtained for %d/%d of pods", count, len(podList.Items))
 | 
			
		||||
			glog.Warningf("Error while getting metrics for %s: %v", reference, err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// if the ratio is 1.2 we want to have 2 replicas
 | 
			
		||||
		desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(count))/hpa.Spec.Target.Quantity.MilliValue())
 | 
			
		||||
		desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(currentReplicas))/hpa.Spec.Target.Quantity.MilliValue())
 | 
			
		||||
 | 
			
		||||
		if desiredReplicas < hpa.Spec.MinCount {
 | 
			
		||||
			desiredReplicas = hpa.Spec.MinCount
 | 
			
		||||
@@ -158,18 +88,17 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
 | 
			
		||||
		if desiredReplicas > hpa.Spec.MaxCount {
 | 
			
		||||
			desiredReplicas = hpa.Spec.MaxCount
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		now := time.Now()
 | 
			
		||||
		rescale := false
 | 
			
		||||
 | 
			
		||||
		if desiredReplicas != count {
 | 
			
		||||
		if desiredReplicas != currentReplicas {
 | 
			
		||||
			// Going down
 | 
			
		||||
			if desiredReplicas < count && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
 | 
			
		||||
			if desiredReplicas < currentReplicas && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
 | 
			
		||||
				hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) {
 | 
			
		||||
				rescale = true
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Going up
 | 
			
		||||
			if desiredReplicas > count && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
 | 
			
		||||
			if desiredReplicas > currentReplicas && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
 | 
			
		||||
				hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) {
 | 
			
		||||
				rescale = true
 | 
			
		||||
			}
 | 
			
		||||
@@ -185,9 +114,9 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		status := expapi.HorizontalPodAutoscalerStatus{
 | 
			
		||||
			CurrentReplicas:    count,
 | 
			
		||||
			CurrentReplicas:    currentReplicas,
 | 
			
		||||
			DesiredReplicas:    desiredReplicas,
 | 
			
		||||
			CurrentConsumption: ¤tConsumption,
 | 
			
		||||
			CurrentConsumption: currentConsumption,
 | 
			
		||||
		}
 | 
			
		||||
		hpa.Status = &status
 | 
			
		||||
		if rescale {
 | 
			
		||||
@@ -203,22 +132,3 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) {
 | 
			
		||||
	sum := uint64(0)
 | 
			
		||||
	count := 0
 | 
			
		||||
	for _, metrics := range metrics.Items {
 | 
			
		||||
		var newest *heapster.MetricPoint
 | 
			
		||||
		newest = nil
 | 
			
		||||
		for _, metricPoint := range metrics.Metrics {
 | 
			
		||||
			if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
 | 
			
		||||
				newest = &metricPoint
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if newest != nil {
 | 
			
		||||
			sum += newest.Value
 | 
			
		||||
			count++
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return sum, count
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -17,39 +17,33 @@ limitations under the License.
 | 
			
		||||
package autoscalercontroller
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	_ "k8s.io/kubernetes/pkg/api/latest"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/resource"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/testapi"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/autoscaler/metrics"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/expapi"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
 | 
			
		||||
	heapster "k8s.io/heapster/api/v1/types"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	namespace    = api.NamespaceDefault
 | 
			
		||||
	rcName       = "app-rc"
 | 
			
		||||
	podNameLabel = "app"
 | 
			
		||||
	podName      = "p1"
 | 
			
		||||
	hpaName      = "foo"
 | 
			
		||||
 | 
			
		||||
	hpaListHandler   = "HpaList"
 | 
			
		||||
	scaleHandler     = "Scale"
 | 
			
		||||
	podListHandler   = "PodList"
 | 
			
		||||
	heapsterHandler  = "Heapster"
 | 
			
		||||
	updateHpaHandler = "HpaUpdate"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -58,6 +52,26 @@ type serverResponse struct {
 | 
			
		||||
	obj        interface{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type fakeMetricsClient struct {
 | 
			
		||||
	consumption metrics.ResourceConsumptionClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type fakeResourceConsumptionClient struct {
 | 
			
		||||
	metrics map[api.ResourceName]expapi.ResourceConsumption
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeMetricsClient) ResourceConsumption(namespace string) metrics.ResourceConsumptionClient {
 | 
			
		||||
	return f.consumption
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeResourceConsumptionClient) Get(resource api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) {
 | 
			
		||||
	consumption, found := f.metrics[resource]
 | 
			
		||||
	if !found {
 | 
			
		||||
		return nil, fmt.Errorf("resource not found: %v", resource)
 | 
			
		||||
	}
 | 
			
		||||
	return &consumption, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) {
 | 
			
		||||
 | 
			
		||||
	handlers := map[string]*util.FakeHandler{}
 | 
			
		||||
@@ -73,16 +87,6 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte
 | 
			
		||||
		return &handler
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mkRawHandler := func(url string, response serverResponse) *util.FakeHandler {
 | 
			
		||||
		handler := util.FakeHandler{
 | 
			
		||||
			StatusCode:   response.statusCode,
 | 
			
		||||
			ResponseBody: *response.obj.(*string),
 | 
			
		||||
		}
 | 
			
		||||
		mux.Handle(url, &handler)
 | 
			
		||||
		glog.Infof("Will handle %s", url)
 | 
			
		||||
		return &handler
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if responses[hpaListHandler] != nil {
 | 
			
		||||
		handlers[hpaListHandler] = mkHandler("/experimental/v1/horizontalpodautoscalers", *responses[hpaListHandler])
 | 
			
		||||
	}
 | 
			
		||||
@@ -92,16 +96,6 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte
 | 
			
		||||
			fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), *responses[scaleHandler])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if responses[podListHandler] != nil {
 | 
			
		||||
		handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if responses[heapsterHandler] != nil {
 | 
			
		||||
		handlers[heapsterHandler] = mkRawHandler(
 | 
			
		||||
			fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage",
 | 
			
		||||
				namespace, podName), *responses[heapsterHandler])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if responses[updateHpaHandler] != nil {
 | 
			
		||||
		handlers[updateHpaHandler] = mkHandler(fmt.Sprintf("/experimental/v1/namespaces/%s/horizontalpodautoscalers/%s", namespace, hpaName),
 | 
			
		||||
			*responses[updateHpaHandler])
 | 
			
		||||
@@ -150,21 +144,6 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
 | 
			
		||||
	podListResponse := serverResponse{http.StatusOK, &api.PodList{
 | 
			
		||||
		Items: []api.Pod{
 | 
			
		||||
			{
 | 
			
		||||
				ObjectMeta: api.ObjectMeta{
 | 
			
		||||
					Name:      podName,
 | 
			
		||||
					Namespace: namespace,
 | 
			
		||||
				},
 | 
			
		||||
			}}}}
 | 
			
		||||
	timestamp := time.Now()
 | 
			
		||||
	metrics := heapster.MetricResultList{
 | 
			
		||||
		Items: []heapster.MetricResult{{
 | 
			
		||||
			Metrics:         []heapster.MetricPoint{{timestamp, 650}},
 | 
			
		||||
			LatestTimestamp: timestamp,
 | 
			
		||||
		}}}
 | 
			
		||||
 | 
			
		||||
	status := expapi.HorizontalPodAutoscalerStatus{
 | 
			
		||||
		CurrentReplicas: 1,
 | 
			
		||||
		DesiredReplicas: 3,
 | 
			
		||||
@@ -189,16 +168,10 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
 | 
			
		||||
		Status: &status,
 | 
			
		||||
	}}
 | 
			
		||||
 | 
			
		||||
	heapsterRawResponse, _ := json.Marshal(&metrics)
 | 
			
		||||
	heapsterStrResponse := string(heapsterRawResponse)
 | 
			
		||||
	heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse}
 | 
			
		||||
 | 
			
		||||
	testServer, handlers := makeTestServer(t,
 | 
			
		||||
		map[string]*serverResponse{
 | 
			
		||||
			hpaListHandler:   &hpaResponse,
 | 
			
		||||
			scaleHandler:     &scaleResponse,
 | 
			
		||||
			podListHandler:   &podListResponse,
 | 
			
		||||
			heapsterHandler:  &heapsterResponse,
 | 
			
		||||
			updateHpaHandler: &updateHpaResponse,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
@@ -206,7 +179,13 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
 | 
			
		||||
	kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
 | 
			
		||||
	expClient := client.NewExperimentalOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
 | 
			
		||||
 | 
			
		||||
	hpaController := New(kubeClient, expClient)
 | 
			
		||||
	fakeRC := fakeResourceConsumptionClient{metrics: map[api.ResourceName]expapi.ResourceConsumption{
 | 
			
		||||
		api.ResourceCPU: {Resource: api.ResourceCPU, Quantity: resource.MustParse("650m")},
 | 
			
		||||
	}}
 | 
			
		||||
	fake := fakeMetricsClient{consumption: &fakeRC}
 | 
			
		||||
 | 
			
		||||
	hpaController := New(kubeClient, expClient, &fake)
 | 
			
		||||
 | 
			
		||||
	err := hpaController.reconcileAutoscalers()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal("Failed to reconcile: %v", err)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										168
									
								
								pkg/controller/autoscaler/metrics/metrics_client.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										168
									
								
								pkg/controller/autoscaler/metrics/metrics_client.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,168 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package metrics
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/resource"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/expapi"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/fields"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/labels"
 | 
			
		||||
 | 
			
		||||
	heapster "k8s.io/heapster/api/v1/types"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	heapsterNamespace = "kube-system"
 | 
			
		||||
	heapsterService   = "monitoring-heapster"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var heapsterQueryStart, _ = time.ParseDuration("-5m")
 | 
			
		||||
 | 
			
		||||
// An interface for getting metrics for pods.
 | 
			
		||||
type MetricsClient interface {
 | 
			
		||||
	ResourceConsumption(namespace string) ResourceConsumptionClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ResourceConsumptionClient interface {
 | 
			
		||||
	// Gets average resource consumption for pods under the given selector.
 | 
			
		||||
	Get(resourceName api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Aggregates results into ResourceConsumption. Also returns number of
 | 
			
		||||
// pods included in the aggregation.
 | 
			
		||||
type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int)
 | 
			
		||||
 | 
			
		||||
type metricDefinition struct {
 | 
			
		||||
	name       string
 | 
			
		||||
	aggregator metricAggregator
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Heapster-based implementation of MetricsClient
 | 
			
		||||
type HeapsterMetricsClient struct {
 | 
			
		||||
	client client.Interface
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type HeapsterResourceConsumptionClient struct {
 | 
			
		||||
	namespace           string
 | 
			
		||||
	client              client.Interface
 | 
			
		||||
	resourceDefinitions map[api.ResourceName]metricDefinition
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewHeapsterMetricsClient(client client.Interface) *HeapsterMetricsClient {
 | 
			
		||||
	return &HeapsterMetricsClient{client: client}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
 | 
			
		||||
	//TODO: add memory
 | 
			
		||||
	api.ResourceCPU: {"cpu-usage",
 | 
			
		||||
		func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) {
 | 
			
		||||
			sum, count := calculateSumFromLatestSample(metrics)
 | 
			
		||||
			value := "0"
 | 
			
		||||
			if count > 0 {
 | 
			
		||||
				// assumes that cpu usage is in millis
 | 
			
		||||
				value = fmt.Sprintf("%dm", sum/uint64(count))
 | 
			
		||||
			}
 | 
			
		||||
			return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
 | 
			
		||||
		}},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *HeapsterMetricsClient) ResourceConsumption(namespace string) ResourceConsumptionClient {
 | 
			
		||||
	return &HeapsterResourceConsumptionClient{
 | 
			
		||||
		namespace:           namespace,
 | 
			
		||||
		client:              h.client,
 | 
			
		||||
		resourceDefinitions: heapsterMetricDefinitions,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *HeapsterResourceConsumptionClient) Get(resourceName api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) {
 | 
			
		||||
	podList, err := h.client.Pods(h.namespace).
 | 
			
		||||
		List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything())
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to get pod list: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	podNames := []string{}
 | 
			
		||||
	for _, pod := range podList.Items {
 | 
			
		||||
		podNames = append(podNames, pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	return h.getForPods(resourceName, podNames)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *HeapsterResourceConsumptionClient) getForPods(resourceName api.ResourceName, podNames []string) (*expapi.ResourceConsumption, error) {
 | 
			
		||||
 | 
			
		||||
	metricSpec, metricDefined := h.resourceDefinitions[resourceName]
 | 
			
		||||
	if !metricDefined {
 | 
			
		||||
		return nil, fmt.Errorf("heapster metric not defined for %v", resourceName)
 | 
			
		||||
	}
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
 | 
			
		||||
	startTime := now.Add(heapsterQueryStart)
 | 
			
		||||
	metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
 | 
			
		||||
		h.namespace,
 | 
			
		||||
		strings.Join(podNames, ","),
 | 
			
		||||
		metricSpec.name)
 | 
			
		||||
 | 
			
		||||
	resultRaw, err := h.client.Services(heapsterNamespace).
 | 
			
		||||
		ProxyGet(heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
 | 
			
		||||
		DoRaw()
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to get pods metrics: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var metrics heapster.MetricResultList
 | 
			
		||||
	err = json.Unmarshal(resultRaw, &metrics)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	glog.Infof("Metrics available: %s", string(resultRaw))
 | 
			
		||||
 | 
			
		||||
	currentConsumption, count := metricSpec.aggregator(metrics)
 | 
			
		||||
	if count != len(podNames) {
 | 
			
		||||
		return nil, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ¤tConsumption, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) {
 | 
			
		||||
	sum := uint64(0)
 | 
			
		||||
	count := 0
 | 
			
		||||
	for _, metrics := range metrics.Items {
 | 
			
		||||
		var newest *heapster.MetricPoint
 | 
			
		||||
		newest = nil
 | 
			
		||||
		for _, metricPoint := range metrics.Metrics {
 | 
			
		||||
			if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
 | 
			
		||||
				newest = &metricPoint
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if newest != nil {
 | 
			
		||||
			sum += newest.Value
 | 
			
		||||
			count++
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return sum, count
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										131
									
								
								pkg/controller/autoscaler/metrics/metrics_client_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										131
									
								
								pkg/controller/autoscaler/metrics/metrics_client_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,131 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package metrics
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	_ "k8s.io/kubernetes/pkg/api/latest"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/testapi"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util"
 | 
			
		||||
 | 
			
		||||
	heapster "k8s.io/heapster/api/v1/types"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	namespace       = "test-namespace"
 | 
			
		||||
	podName         = "pod1"
 | 
			
		||||
	podListHandler  = "podlisthandler"
 | 
			
		||||
	heapsterHandler = "heapsterhandler"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type serverResponse struct {
 | 
			
		||||
	statusCode int
 | 
			
		||||
	obj        interface{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) {
 | 
			
		||||
 | 
			
		||||
	handlers := map[string]*util.FakeHandler{}
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
 | 
			
		||||
	mkHandler := func(url string, response serverResponse) *util.FakeHandler {
 | 
			
		||||
		handler := util.FakeHandler{
 | 
			
		||||
			StatusCode:   response.statusCode,
 | 
			
		||||
			ResponseBody: runtime.EncodeOrDie(testapi.Codec(), response.obj.(runtime.Object)),
 | 
			
		||||
		}
 | 
			
		||||
		mux.Handle(url, &handler)
 | 
			
		||||
		glog.Infof("Will handle %s", url)
 | 
			
		||||
		return &handler
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mkRawHandler := func(url string, response serverResponse) *util.FakeHandler {
 | 
			
		||||
		handler := util.FakeHandler{
 | 
			
		||||
			StatusCode:   response.statusCode,
 | 
			
		||||
			ResponseBody: *response.obj.(*string),
 | 
			
		||||
		}
 | 
			
		||||
		mux.Handle(url, &handler)
 | 
			
		||||
		glog.Infof("Will handle %s", url)
 | 
			
		||||
		return &handler
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if responses[podListHandler] != nil {
 | 
			
		||||
		handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if responses[heapsterHandler] != nil {
 | 
			
		||||
		handlers[heapsterHandler] = mkRawHandler(
 | 
			
		||||
			fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage",
 | 
			
		||||
				namespace, podName), *responses[heapsterHandler])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
 | 
			
		||||
		t.Errorf("unexpected request: %v", req.RequestURI)
 | 
			
		||||
		res.WriteHeader(http.StatusNotFound)
 | 
			
		||||
	})
 | 
			
		||||
	return httptest.NewServer(mux), handlers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestHeapsterResourceConsumptionGet(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	podListResponse := serverResponse{http.StatusOK, &api.PodList{
 | 
			
		||||
		Items: []api.Pod{
 | 
			
		||||
			{
 | 
			
		||||
				ObjectMeta: api.ObjectMeta{
 | 
			
		||||
					Name:      podName,
 | 
			
		||||
					Namespace: namespace,
 | 
			
		||||
				},
 | 
			
		||||
			}}}}
 | 
			
		||||
 | 
			
		||||
	timestamp := time.Now()
 | 
			
		||||
	metrics := heapster.MetricResultList{
 | 
			
		||||
		Items: []heapster.MetricResult{{
 | 
			
		||||
			Metrics:         []heapster.MetricPoint{{timestamp, 650}},
 | 
			
		||||
			LatestTimestamp: timestamp,
 | 
			
		||||
		}}}
 | 
			
		||||
	heapsterRawResponse, _ := json.Marshal(&metrics)
 | 
			
		||||
	heapsterStrResponse := string(heapsterRawResponse)
 | 
			
		||||
	heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse}
 | 
			
		||||
 | 
			
		||||
	testServer, _ := makeTestServer(t,
 | 
			
		||||
		map[string]*serverResponse{
 | 
			
		||||
			heapsterHandler: &heapsterResponse,
 | 
			
		||||
			podListHandler:  &podListResponse,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	defer testServer.Close()
 | 
			
		||||
	kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
 | 
			
		||||
 | 
			
		||||
	metricsClient := NewHeapsterMetricsClient(kubeClient)
 | 
			
		||||
 | 
			
		||||
	val, err := metricsClient.ResourceConsumption(namespace).Get(api.ResourceCPU, map[string]string{"app": "test"})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error while getting consumption: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	assert.Equal(t, int64(650), val.Quantity.MilliValue())
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user