Files
wlan-cloud-kube-state-metrics/pkg/metricshandler/metrics_handler.go
fpetkovski 83887f1611 Replace multiListerWatcher with independent listWatchers per namespace
The multiListerWatcher is a composite object encapsulating multiple
ListerWatchers and implements the ListerWatcher interface.
With the current implementation, when an individual lister fails, the
entire List operation fails. This causes no metrics to be shown when KSM
has no permissions to a single namespace.

In addition to this, the multiListerWatcher takes advantage of internal
implementation details if the client-go library by modifiying and
relying on the ResourceVersion metadata field. This introduces a bug
where reconnecting to the API server will break the multiListerWatcher
completely.

This commit replaces the multiListerWatcher with individual
ListerWatchers per each configured namespace, resolving both issues.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
2021-06-29 13:27:57 +02:00

258 lines
7.7 KiB
Go

/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricshandler
import (
"compress/gzip"
"context"
"io"
"net/http"
"strconv"
"strings"
"sync"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kube-state-metrics/v2/internal/store"
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
"k8s.io/kube-state-metrics/v2/pkg/options"
)
// MetricsHandler is a http.Handler that exposes the main kube-state-metrics
// /metrics endpoint. It allows concurrent reconfiguration at runtime.
type MetricsHandler struct {
opts *options.Options
kubeClient kubernetes.Interface
storeBuilder *store.Builder
enableGZIPEncoding bool
cancel func()
// mtx protects metricsWriters, curShard, and curTotalShards
mtx *sync.RWMutex
metricsWriters []metricsstore.MetricsWriter
curShard int32
curTotalShards int
}
// New creates and returns a new MetricsHandler with the given options.
func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder *store.Builder, enableGZIPEncoding bool) *MetricsHandler {
return &MetricsHandler{
opts: opts,
kubeClient: kubeClient,
storeBuilder: storeBuilder,
enableGZIPEncoding: enableGZIPEncoding,
mtx: &sync.RWMutex{},
}
}
// ConfigureSharding (re-)configures sharding. Re-configuration can be done
// concurrently.
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.cancel != nil {
m.cancel()
}
if totalShards != 1 {
klog.Infof("configuring sharding of this instance to be shard index %d (zero-indexed) out of %d total shards", shard, totalShards)
}
ctx, m.cancel = context.WithCancel(ctx)
m.storeBuilder.WithSharding(shard, totalShards)
m.storeBuilder.WithContext(ctx)
m.metricsWriters = m.storeBuilder.Build()
m.curShard = shard
m.curTotalShards = totalShards
}
// Run configures the MetricsHandler's sharding and if autosharding is enabled
// re-configures sharding on re-sharding events. Run should only be called
// once.
func (m *MetricsHandler) Run(ctx context.Context) error {
autoSharding := len(m.opts.Pod) > 0 && len(m.opts.Namespace) > 0
if !autoSharding {
klog.Info("Autosharding disabled")
m.ConfigureSharding(ctx, m.opts.Shard, m.opts.TotalShards)
<-ctx.Done()
return ctx.Err()
}
klog.Infof("Autosharding enabled with pod=%v pod_namespace=%v", m.opts.Pod, m.opts.Namespace)
klog.Infof("Auto detecting sharding settings.")
ss, err := detectStatefulSet(m.kubeClient, m.opts.Pod, m.opts.Namespace)
if err != nil {
return errors.Wrap(err, "detect StatefulSet")
}
statefulSetName := ss.Name
labelSelectorOptions := func(o *metav1.ListOptions) {
o.LabelSelector = fields.SelectorFromSet(ss.Labels).String()
}
i := cache.NewSharedIndexInformer(
cache.NewFilteredListWatchFromClient(m.kubeClient.AppsV1().RESTClient(), "statefulsets", m.opts.Namespace, labelSelectorOptions),
&appsv1.StatefulSet{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
i.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
ss := o.(*appsv1.StatefulSet)
if ss.Name != statefulSetName {
return
}
shard, totalShards, err := shardingSettingsFromStatefulSet(ss, m.opts.Pod)
if err != nil {
klog.Errorf("detect sharding settings from StatefulSet: %v", err)
return
}
m.mtx.RLock()
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
m.mtx.RUnlock()
if shardingUnchanged {
return
}
m.ConfigureSharding(ctx, shard, totalShards)
},
UpdateFunc: func(oldo, curo interface{}) {
old := oldo.(*appsv1.StatefulSet)
cur := curo.(*appsv1.StatefulSet)
if cur.Name != statefulSetName {
return
}
if old.ResourceVersion == cur.ResourceVersion {
return
}
shard, totalShards, err := shardingSettingsFromStatefulSet(cur, m.opts.Pod)
if err != nil {
klog.Errorf("detect sharding settings from StatefulSet: %v", err)
return
}
m.mtx.RLock()
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
m.mtx.RUnlock()
if shardingUnchanged {
return
}
m.ConfigureSharding(ctx, shard, totalShards)
},
})
go i.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), i.HasSynced) {
return errors.New("waiting for informer cache to sync failed")
}
<-ctx.Done()
return ctx.Err()
}
// ServeHTTP implements the http.Handler interface. It writes all generated
// metrics to the response body.
func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mtx.RLock()
defer m.mtx.RUnlock()
resHeader := w.Header()
var writer io.Writer = w
resHeader.Set("Content-Type", `text/plain; version=`+"0.0.4")
if m.enableGZIPEncoding {
// Gzip response if requested. Taken from
// github.com/prometheus/client_golang/prometheus/promhttp.decorateWriter.
reqHeader := r.Header.Get("Accept-Encoding")
parts := strings.Split(reqHeader, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
writer = gzip.NewWriter(writer)
resHeader.Set("Content-Encoding", "gzip")
}
}
}
for _, w := range m.metricsWriters {
w.WriteAll(writer)
}
// In case we gzipped the response, we have to close the writer.
if closer, ok := writer.(io.Closer); ok {
closer.Close()
}
}
func shardingSettingsFromStatefulSet(ss *appsv1.StatefulSet, podName string) (nominal int32, totalReplicas int, err error) {
nominal, err = detectNominalFromPod(ss.Name, podName)
if err != nil {
return 0, 0, errors.Wrap(err, "detecting Pod nominal")
}
totalReplicas = 1
replicas := ss.Spec.Replicas
if replicas != nil {
totalReplicas = int(*replicas)
}
return nominal, totalReplicas, nil
}
func detectNominalFromPod(statefulSetName, podName string) (int32, error) {
nominalString := strings.TrimPrefix(podName, statefulSetName+"-")
nominal, err := strconv.Atoi(nominalString)
if err != nil {
return 0, errors.Wrapf(err, "failed to detect shard index for Pod %s of StatefulSet %s, parsed %s", podName, statefulSetName, nominalString)
}
return int32(nominal), nil
}
func detectStatefulSet(kubeClient kubernetes.Interface, podName, namespaceName string) (*appsv1.StatefulSet, error) {
p, err := kubeClient.CoreV1().Pods(namespaceName).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, "retrieve pod %s for sharding", podName)
}
owners := p.GetOwnerReferences()
for _, o := range owners {
if o.APIVersion != "apps/v1" || o.Kind != "StatefulSet" || o.Controller == nil || !*o.Controller {
continue
}
ss, err := kubeClient.AppsV1().StatefulSets(namespaceName).Get(context.TODO(), o.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, "retrieve shard's StatefulSet: %s/%s", namespaceName, o.Name)
}
return ss, nil
}
return nil, errors.Errorf("no suitable statefulset found for auto detecting sharding for Pod %s/%s", namespaceName, podName)
}