mirror of
https://github.com/Telecominfraproject/wlan-cloud-kube-state-metrics.git
synced 2025-11-02 11:37:55 +00:00
use clientgofix library add context parameter in verticalpodautoscaler rename PodDisruptionsAllowed to DisruptionsAllowed because of PR 85863 in k8s repo lint fixes and rename PodDisruptionsAllowed to DisruptionsAllowed in test file run go mod tidy
259 lines
7.6 KiB
Go
259 lines
7.6 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors All rights reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package metricshandler
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/klog"
|
|
|
|
"k8s.io/kube-state-metrics/internal/store"
|
|
metricsstore "k8s.io/kube-state-metrics/pkg/metrics_store"
|
|
"k8s.io/kube-state-metrics/pkg/options"
|
|
)
|
|
|
|
// MetricsHandler is a http.Handler that exposes the main kube-state-metrics
|
|
// /metrics endpoint. It allows concurrent reconfiguration at runtime.
|
|
type MetricsHandler struct {
|
|
opts *options.Options
|
|
kubeClient kubernetes.Interface
|
|
storeBuilder *store.Builder
|
|
enableGZIPEncoding bool
|
|
|
|
cancel func()
|
|
|
|
// mtx protects stores, curShard, and curTotalShards
|
|
mtx *sync.RWMutex
|
|
stores []cache.Store
|
|
curShard int32
|
|
curTotalShards int
|
|
}
|
|
|
|
// New creates and returns a new MetricsHandler with the given options.
|
|
func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder *store.Builder, enableGZIPEncoding bool) *MetricsHandler {
|
|
return &MetricsHandler{
|
|
opts: opts,
|
|
kubeClient: kubeClient,
|
|
storeBuilder: storeBuilder,
|
|
enableGZIPEncoding: enableGZIPEncoding,
|
|
mtx: &sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
// ConfigureSharding (re-)configures sharding. Re-configuration can be done
|
|
// concurrently.
|
|
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
|
|
if m.cancel != nil {
|
|
m.cancel()
|
|
}
|
|
if totalShards != 1 {
|
|
klog.Infof("configuring sharding of this instance to be shard index %d (zero-indexed) out of %d total shards", shard, totalShards)
|
|
}
|
|
ctx, m.cancel = context.WithCancel(ctx)
|
|
m.storeBuilder.WithSharding(shard, totalShards)
|
|
m.storeBuilder.WithContext(ctx)
|
|
m.stores = m.storeBuilder.Build()
|
|
m.curShard = shard
|
|
m.curTotalShards = totalShards
|
|
}
|
|
|
|
// Run configures the MetricsHandler's sharding and if autosharding is enabled
|
|
// re-configures sharding on re-sharding events. Run should only be called
|
|
// once.
|
|
func (m *MetricsHandler) Run(ctx context.Context) error {
|
|
autoSharding := len(m.opts.Pod) > 0 && len(m.opts.Namespace) > 0
|
|
|
|
if !autoSharding {
|
|
klog.Info("Autosharding disabled")
|
|
m.ConfigureSharding(ctx, m.opts.Shard, m.opts.TotalShards)
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
}
|
|
|
|
klog.Infof("Autosharding enabled with pod=%v pod_namespace=%v", m.opts.Pod, m.opts.Namespace)
|
|
klog.Infof("Auto detecting sharding settings.")
|
|
ss, err := detectStatefulSet(m.kubeClient, m.opts.Pod, m.opts.Namespace)
|
|
if err != nil {
|
|
return errors.Wrap(err, "detect StatefulSet")
|
|
}
|
|
statefulSetName := ss.Name
|
|
|
|
labelSelectorOptions := func(o *metav1.ListOptions) {
|
|
o.LabelSelector = fields.SelectorFromSet(ss.Labels).String()
|
|
}
|
|
|
|
i := cache.NewSharedIndexInformer(
|
|
cache.NewFilteredListWatchFromClient(m.kubeClient.AppsV1().RESTClient(), "statefulsets", m.opts.Namespace, labelSelectorOptions),
|
|
&appsv1.StatefulSet{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
|
)
|
|
i.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(o interface{}) {
|
|
ss := o.(*appsv1.StatefulSet)
|
|
if ss.Name != statefulSetName {
|
|
return
|
|
}
|
|
|
|
shard, totalShards, err := shardingSettingsFromStatefulSet(ss, m.opts.Pod)
|
|
if err != nil {
|
|
klog.Errorf("detect sharding settings from StatefulSet: %v", err)
|
|
return
|
|
}
|
|
|
|
m.mtx.RLock()
|
|
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
|
|
m.mtx.RUnlock()
|
|
|
|
if shardingUnchanged {
|
|
return
|
|
}
|
|
|
|
m.ConfigureSharding(ctx, shard, totalShards)
|
|
},
|
|
UpdateFunc: func(oldo, curo interface{}) {
|
|
old := oldo.(*appsv1.StatefulSet)
|
|
cur := curo.(*appsv1.StatefulSet)
|
|
if cur.Name != statefulSetName {
|
|
return
|
|
}
|
|
|
|
if old.ResourceVersion == cur.ResourceVersion {
|
|
return
|
|
}
|
|
|
|
shard, totalShards, err := shardingSettingsFromStatefulSet(cur, m.opts.Pod)
|
|
if err != nil {
|
|
klog.Errorf("detect sharding settings from StatefulSet: %v", err)
|
|
return
|
|
}
|
|
|
|
m.mtx.RLock()
|
|
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
|
|
m.mtx.RUnlock()
|
|
|
|
if shardingUnchanged {
|
|
return
|
|
}
|
|
|
|
m.ConfigureSharding(ctx, shard, totalShards)
|
|
},
|
|
})
|
|
go i.Run(ctx.Done())
|
|
if !cache.WaitForCacheSync(ctx.Done(), i.HasSynced) {
|
|
return errors.New("waiting for informer cache to sync failed")
|
|
}
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
}
|
|
|
|
// ServeHTTP implements the http.Handler interface. It writes the metrics in
|
|
// its stores to the response body.
|
|
func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
m.mtx.RLock()
|
|
defer m.mtx.RUnlock()
|
|
resHeader := w.Header()
|
|
var writer io.Writer = w
|
|
|
|
resHeader.Set("Content-Type", `text/plain; version=`+"0.0.4")
|
|
|
|
if m.enableGZIPEncoding {
|
|
// Gzip response if requested. Taken from
|
|
// github.com/prometheus/client_golang/prometheus/promhttp.decorateWriter.
|
|
reqHeader := r.Header.Get("Accept-Encoding")
|
|
parts := strings.Split(reqHeader, ",")
|
|
for _, part := range parts {
|
|
part = strings.TrimSpace(part)
|
|
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
|
|
writer = gzip.NewWriter(writer)
|
|
resHeader.Set("Content-Encoding", "gzip")
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, s := range m.stores {
|
|
ms := s.(*metricsstore.MetricsStore)
|
|
ms.WriteAll(w)
|
|
}
|
|
|
|
// In case we gzipped the response, we have to close the writer.
|
|
if closer, ok := writer.(io.Closer); ok {
|
|
closer.Close()
|
|
}
|
|
}
|
|
|
|
func shardingSettingsFromStatefulSet(ss *appsv1.StatefulSet, podName string) (nominal int32, totalReplicas int, err error) {
|
|
nominal, err = detectNominalFromPod(ss.Name, podName)
|
|
if err != nil {
|
|
return 0, 0, errors.Wrap(err, "detecting Pod nominal")
|
|
}
|
|
|
|
totalReplicas = 1
|
|
replicas := ss.Spec.Replicas
|
|
if replicas != nil {
|
|
totalReplicas = int(*replicas)
|
|
}
|
|
|
|
return nominal, totalReplicas, nil
|
|
}
|
|
|
|
func detectNominalFromPod(statefulSetName, podName string) (int32, error) {
|
|
nominalString := strings.TrimPrefix(podName, statefulSetName+"-")
|
|
nominal, err := strconv.Atoi(nominalString)
|
|
if err != nil {
|
|
return 0, errors.Wrapf(err, "failed to detect shard index for Pod %s of StatefulSet %s, parsed %s", podName, statefulSetName, nominalString)
|
|
}
|
|
|
|
return int32(nominal), nil
|
|
}
|
|
|
|
func detectStatefulSet(kubeClient kubernetes.Interface, podName, namespaceName string) (*appsv1.StatefulSet, error) {
|
|
p, err := kubeClient.CoreV1().Pods(namespaceName).Get(context.TODO(), podName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "retrieve pod %s for sharding", podName)
|
|
}
|
|
|
|
owners := p.GetOwnerReferences()
|
|
for _, o := range owners {
|
|
if o.APIVersion != "apps/v1" || o.Kind != "StatefulSet" || o.Controller == nil || !*o.Controller {
|
|
continue
|
|
}
|
|
|
|
ss, err := kubeClient.AppsV1().StatefulSets(namespaceName).Get(context.TODO(), o.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "retrieve shard's StatefulSet: %s/%s", namespaceName, o.Name)
|
|
}
|
|
|
|
return ss, nil
|
|
}
|
|
|
|
return nil, errors.Errorf("no suitable statefulset found for auto detecting sharding for Pod %s/%s", namespaceName, podName)
|
|
}
|