mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 11:48:15 +00:00 
			
		
		
		
	Merge pull request #89151 from jingyih/add_metric_etcd_db_size
apiserver: add a metric exposing etcd database size
This commit is contained in:
		@@ -159,6 +159,7 @@ func TestAddFlags(t *testing.T) {
 | 
			
		||||
				Prefix:                "/registry",
 | 
			
		||||
				CompactionInterval:    storagebackend.DefaultCompactInterval,
 | 
			
		||||
				CountMetricPollPeriod: time.Minute,
 | 
			
		||||
				DBMetricPollInterval:  storagebackend.DefaultDBMetricPollInterval,
 | 
			
		||||
			},
 | 
			
		||||
			DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
 | 
			
		||||
			DeleteCollectionWorkers: 1,
 | 
			
		||||
 
 | 
			
		||||
@@ -176,6 +176,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
 | 
			
		||||
 | 
			
		||||
	fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+
 | 
			
		||||
		"Frequency of polling etcd for number of resources per type. 0 disables the metric collection.")
 | 
			
		||||
 | 
			
		||||
	fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval,
 | 
			
		||||
		"The interval of requests to poll etcd and update metric. 0 disables the metric collection")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
 | 
			
		||||
 
 | 
			
		||||
@@ -49,6 +49,14 @@ var (
 | 
			
		||||
		},
 | 
			
		||||
		[]string{"resource"},
 | 
			
		||||
	)
 | 
			
		||||
	dbTotalSize = compbasemetrics.NewGaugeVec(
 | 
			
		||||
		&compbasemetrics.GaugeOpts{
 | 
			
		||||
			Name:           "etcd_db_total_size_in_bytes",
 | 
			
		||||
			Help:           "Total size of the etcd database file physically allocated in bytes.",
 | 
			
		||||
			StabilityLevel: compbasemetrics.ALPHA,
 | 
			
		||||
		},
 | 
			
		||||
		[]string{"endpoint"},
 | 
			
		||||
	)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var registerMetrics sync.Once
 | 
			
		||||
@@ -59,6 +67,7 @@ func Register() {
 | 
			
		||||
	registerMetrics.Do(func() {
 | 
			
		||||
		legacyregistry.MustRegister(etcdRequestLatency)
 | 
			
		||||
		legacyregistry.MustRegister(objectCounts)
 | 
			
		||||
		legacyregistry.MustRegister(dbTotalSize)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -81,3 +90,8 @@ func Reset() {
 | 
			
		||||
func sinceInSeconds(start time.Time) float64 {
 | 
			
		||||
	return time.Since(start).Seconds()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
 | 
			
		||||
func UpdateEtcdDbSize(ep string, size int64) {
 | 
			
		||||
	dbTotalSize.WithLabelValues(ep).Set(float64(size))
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -29,6 +29,7 @@ const (
 | 
			
		||||
	StorageTypeETCD3 = "etcd3"
 | 
			
		||||
 | 
			
		||||
	DefaultCompactInterval      = 5 * time.Minute
 | 
			
		||||
	DefaultDBMetricPollInterval = 30 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TransportConfig holds all connection related info,  i.e. equal TransportConfig means equal servers we talk to.
 | 
			
		||||
@@ -71,6 +72,8 @@ type Config struct {
 | 
			
		||||
	CompactionInterval time.Duration
 | 
			
		||||
	// CountMetricPollPeriod specifies how often should count metric be updated
 | 
			
		||||
	CountMetricPollPeriod time.Duration
 | 
			
		||||
	// DBMetricPollInterval specifies how often should storage backend metric be updated.
 | 
			
		||||
	DBMetricPollInterval time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
 | 
			
		||||
@@ -79,5 +82,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
 | 
			
		||||
		Prefix:               prefix,
 | 
			
		||||
		Codec:                codec,
 | 
			
		||||
		CompactionInterval:   DefaultCompactInterval,
 | 
			
		||||
		DBMetricPollInterval: DefaultDBMetricPollInterval,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -39,6 +39,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
 | 
			
		||||
@@ -46,6 +47,7 @@ go_library(
 | 
			
		||||
        "//vendor/go.etcd.io/etcd/clientv3:go_default_library",
 | 
			
		||||
        "//vendor/go.etcd.io/etcd/pkg/transport:go_default_library",
 | 
			
		||||
        "//vendor/google.golang.org/grpc:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/klog:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -36,20 +36,26 @@ import (
 | 
			
		||||
	"k8s.io/apiserver/pkg/server/egressselector"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/etcd3"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/etcd3/metrics"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/storagebackend"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/value"
 | 
			
		||||
	"k8s.io/component-base/metrics/legacyregistry"
 | 
			
		||||
	"k8s.io/klog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// The short keepalive timeout and interval have been chosen to aggressively
 | 
			
		||||
// detect a failed etcd server without introducing much overhead.
 | 
			
		||||
const keepaliveTime = 30 * time.Second
 | 
			
		||||
const keepaliveTimeout = 10 * time.Second
 | 
			
		||||
const (
 | 
			
		||||
	// The short keepalive timeout and interval have been chosen to aggressively
 | 
			
		||||
	// detect a failed etcd server without introducing much overhead.
 | 
			
		||||
	keepaliveTime    = 30 * time.Second
 | 
			
		||||
	keepaliveTimeout = 10 * time.Second
 | 
			
		||||
 | 
			
		||||
// dialTimeout is the timeout for failing to establish a connection.
 | 
			
		||||
// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
 | 
			
		||||
// on heavily loaded arm64 CPUs (issue #64649)
 | 
			
		||||
const dialTimeout = 20 * time.Second
 | 
			
		||||
	// dialTimeout is the timeout for failing to establish a connection.
 | 
			
		||||
	// It is set to 20 seconds as times shorter than that will cause TLS connections to fail
 | 
			
		||||
	// on heavily loaded arm64 CPUs (issue #64649)
 | 
			
		||||
	dialTimeout = 20 * time.Second
 | 
			
		||||
 | 
			
		||||
	dbMetricsMonitorJitter = 0.5
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	// grpcprom auto-registers (via an init function) their client metrics, since we are opting out of
 | 
			
		||||
@@ -57,6 +63,7 @@ func init() {
 | 
			
		||||
	// we need to explicitly register these metrics to our global registry here.
 | 
			
		||||
	// For reference: https://github.com/kubernetes/kubernetes/pull/81387
 | 
			
		||||
	legacyregistry.RawMustRegister(grpcprom.DefaultClientMetrics)
 | 
			
		||||
	dbMetricsMonitors = make(map[string]struct{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
 | 
			
		||||
@@ -153,16 +160,20 @@ type runningCompactor struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	lock       sync.Mutex
 | 
			
		||||
	// compactorsMu guards access to compactors map
 | 
			
		||||
	compactorsMu sync.Mutex
 | 
			
		||||
	compactors   = map[string]*runningCompactor{}
 | 
			
		||||
	// dbMetricsMonitorsMu guards access to dbMetricsMonitors map
 | 
			
		||||
	dbMetricsMonitorsMu sync.Mutex
 | 
			
		||||
	dbMetricsMonitors   map[string]struct{}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the
 | 
			
		||||
// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called,
 | 
			
		||||
// the compactor is stopped.
 | 
			
		||||
func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) {
 | 
			
		||||
	lock.Lock()
 | 
			
		||||
	defer lock.Unlock()
 | 
			
		||||
	compactorsMu.Lock()
 | 
			
		||||
	defer compactorsMu.Unlock()
 | 
			
		||||
 | 
			
		||||
	key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
 | 
			
		||||
	if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
 | 
			
		||||
@@ -193,8 +204,8 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
 | 
			
		||||
	compactors[key].refs++
 | 
			
		||||
 | 
			
		||||
	return func() {
 | 
			
		||||
		lock.Lock()
 | 
			
		||||
		defer lock.Unlock()
 | 
			
		||||
		compactorsMu.Lock()
 | 
			
		||||
		defer compactorsMu.Unlock()
 | 
			
		||||
 | 
			
		||||
		compactor := compactors[key]
 | 
			
		||||
		compactor.refs--
 | 
			
		||||
@@ -218,6 +229,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
 | 
			
		||||
		return nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var once sync.Once
 | 
			
		||||
	destroyFunc := func() {
 | 
			
		||||
		// we know that storage destroy funcs are called multiple times (due to reuse in subresources).
 | 
			
		||||
@@ -225,6 +241,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
 | 
			
		||||
		// TODO: fix duplicated storage destroy calls higher level
 | 
			
		||||
		once.Do(func() {
 | 
			
		||||
			stopCompactor()
 | 
			
		||||
			stopDBSizeMonitor()
 | 
			
		||||
			client.Close()
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
@@ -234,3 +251,36 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
 | 
			
		||||
	}
 | 
			
		||||
	return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
 | 
			
		||||
// corresponding metric etcd_db_total_size_in_bytes for each etcd server endpoint.
 | 
			
		||||
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
 | 
			
		||||
	if interval == 0 {
 | 
			
		||||
		return func() {}, nil
 | 
			
		||||
	}
 | 
			
		||||
	dbMetricsMonitorsMu.Lock()
 | 
			
		||||
	defer dbMetricsMonitorsMu.Unlock()
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	for _, ep := range client.Endpoints() {
 | 
			
		||||
		if _, found := dbMetricsMonitors[ep]; found {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		dbMetricsMonitors[ep] = struct{}{}
 | 
			
		||||
		endpoint := ep
 | 
			
		||||
		klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval)
 | 
			
		||||
		go wait.JitterUntilWithContext(ctx, func(context.Context) {
 | 
			
		||||
			epStatus, err := client.Maintenance.Status(ctx, endpoint)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err)
 | 
			
		||||
				metrics.UpdateEtcdDbSize(endpoint, -1)
 | 
			
		||||
			} else {
 | 
			
		||||
				metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize)
 | 
			
		||||
			}
 | 
			
		||||
		}, interval, dbMetricsMonitorJitter, true)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return func() {
 | 
			
		||||
		cancel()
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user