Merge pull request #133120 from utam0k/kep-5229-metrics

KEP-5229: Add the metrics
This commit is contained in:
Kubernetes Prow Robot
2025-07-28 03:58:36 -07:00
committed by GitHub
5 changed files with 147 additions and 19 deletions

View File

@@ -18,12 +18,14 @@ package apidispatcher
import (
"context"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// APIDispatcher implements the fwk.APIDispatcher interface and allows for queueing and dispatching API calls asynchronously.
@@ -100,7 +102,18 @@ func (ad *APIDispatcher) Run(logger klog.Logger) {
go func() {
defer ad.goroutinesLimiter.release()
startTime := time.Now()
err := apiCall.Execute(ctx, ad.client)
result := metrics.GoroutineResultSuccess
if err != nil {
result = metrics.GoroutineResultError
}
callType := string(apiCall.CallType())
metrics.AsyncAPICallsTotal.WithLabelValues(callType, result).Inc()
metrics.AsyncAPICallDuration.WithLabelValues(callType, result).Observe(time.Since(startTime).Seconds())
ad.callQueue.finalize(apiCall)
apiCall.sendOnFinish(err)
}()

View File

@@ -24,10 +24,22 @@ import (
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2/ktesting"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
func init() {
metrics.Register()
}
func resetMetrics() {
metrics.AsyncAPICallsTotal.Reset()
metrics.AsyncAPICallDuration.Reset()
metrics.AsyncAPIPendingCalls.Reset()
}
const (
mockCallTypeLow fwk.APICallType = "low"
mockCallTypeHigh fwk.APICallType = "high"
@@ -87,6 +99,9 @@ func (mac *mockAPICall) IsNoOp() bool {
}
func TestAPIDispatcherLifecycle(t *testing.T) {
// Reset all async API metrics
resetMetrics()
logger, _ := ktesting.NewTestContext(t)
uid := types.UID("uid")
@@ -129,7 +144,6 @@ func TestAPIDispatcherLifecycle(t *testing.T) {
if err := dispatcher.Add(call1, opts1); err != nil {
t.Fatalf("Unexpected error while adding a call1: %v", err)
}
if err := dispatcher.Add(call2, opts2); err != nil {
t.Fatalf("Unexpected error while adding a call2: %v", err)
}
@@ -160,4 +174,8 @@ func TestAPIDispatcherLifecycle(t *testing.T) {
if isNoOpCalls != 2 {
t.Errorf("Expected call2's IsNoOp() to be called two times, but was %v times", executeCalls)
}
// Verify execution metrics
testutil.AssertVectorCount(t, "scheduler_async_api_call_execution_total", map[string]string{"call_type": "low", "result": "success"}, 1)
testutil.AssertHistogramTotalCount(t, "scheduler_async_api_call_execution_duration_seconds", map[string]string{"call_type": "low", "result": "success"}, 1)
}

View File

@@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/utils/buffer"
)
@@ -122,6 +123,11 @@ func (cq *callQueue) merge(oldAPICall, apiCall *queuedAPICall) error {
}
if oldAPICall.CallType() != apiCall.CallType() {
// API call types don't match, so we overwrite the old one.
// Update the pending calls metric if the old call is not in-flight.
if !cq.inFlightEntities.Has(oldAPICall.UID()) {
metrics.AsyncAPIPendingCalls.WithLabelValues(string(oldAPICall.CallType())).Dec()
metrics.AsyncAPIPendingCalls.WithLabelValues(string(apiCall.CallType())).Inc()
}
oldAPICall.sendOnFinish(fmt.Errorf("a more relevant call was enqueued for this object: %w", fwk.ErrCallOverwritten))
return nil
}
@@ -170,6 +176,7 @@ func (cq *callQueue) enqueue(apiCall *queuedAPICall, oldCallPresent bool) error
return nil
}
cq.callsQueue.WriteOne(objectUID)
metrics.AsyncAPIPendingCalls.WithLabelValues(string(apiCall.CallType())).Inc()
cq.cond.Broadcast()
return nil
}
@@ -197,9 +204,15 @@ func removeFromQueue(queue *buffer.Ring[types.UID], objectUID types.UID) *buffer
// (i.e., are not in-flight).
// Must be called under cq.lock.
func (cq *callQueue) removePending(objectUID types.UID) {
apiCall, ok := cq.apiCalls[objectUID]
if !ok {
return
}
delete(cq.apiCalls, objectUID)
if !cq.inFlightEntities.Has(objectUID) {
cq.callsQueue = *removeFromQueue(&cq.callsQueue, objectUID)
callType := string(apiCall.CallType())
metrics.AsyncAPIPendingCalls.WithLabelValues(callType).Dec()
}
}
@@ -246,6 +259,8 @@ func (cq *callQueue) pop() (*queuedAPICall, error) {
return nil, fmt.Errorf("object %s is not present in a map with API calls details", objectUID)
}
cq.inFlightEntities.Insert(objectUID)
callType := string(apiCall.CallType())
metrics.AsyncAPIPendingCalls.WithLabelValues(callType).Dec()
return apiCall, nil
}
@@ -266,6 +281,8 @@ func (cq *callQueue) finalize(apiCall *queuedAPICall) {
} else {
// The API call in the map has changed, so re-queue the object for the new call to be processed.
cq.callsQueue.WriteOne(objectUID)
callType := string(newAPICall.CallType())
metrics.AsyncAPIPendingCalls.WithLabelValues(callType).Inc()
cq.cond.Broadcast()
}
cq.inFlightEntities.Delete(objectUID)

View File

@@ -26,7 +26,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/testutil"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
var queuedAPICallCmpOpts = []cmp.Option{
@@ -35,12 +37,25 @@ var queuedAPICallCmpOpts = []cmp.Option{
cmpopts.IgnoreFields(mockAPICall{}, "executeFn", "mergeFn", "syncFn"),
}
// verifyQueueLen is a test helper to check the length of the callsQueue.
func verifyQueueLen(t *testing.T, cq *callQueue, len int) {
// verifyQueueState is a test helper to check both the queue length and pending call metrics.
func verifyQueueState(t *testing.T, cq *callQueue, expectedPendingCalls map[string]int) {
t.Helper()
if got := cq.callsQueue.Len(); got != len {
t.Errorf("Expected callsQueue to have %d item(s), but has %d", len, got)
expectedQueueLen := 0
// Check pending call metrics
for callType, expected := range expectedPendingCalls {
expectedQueueLen += expected
value, err := testutil.GetGaugeMetricValue(metrics.AsyncAPIPendingCalls.WithLabelValues(callType))
if err != nil {
t.Errorf("Failed to get pending calls metric for %s: %v", callType, err)
} else if int(value) != expected {
t.Errorf("Expected pending calls metric for %s to be %d, got %d", callType, expected, int(value))
}
}
if got := cq.callsQueue.Len(); got != expectedQueueLen {
t.Errorf("Expected callsQueue to have %d item(s), but has %d", expectedQueueLen, got)
}
}
@@ -91,6 +106,8 @@ func TestCallQueueAdd(t *testing.T) {
uid2 := types.UID("uid2")
t.Run("First call is added without collision", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
call := &queuedAPICall{
APICall: &mockAPICall{
@@ -102,11 +119,13 @@ func TestCallQueueAdd(t *testing.T) {
if err := cq.add(call); err != nil {
t.Fatalf("Unexpected error while adding call: %v", err)
}
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"low": 1})
verifyCalls(t, cq, call)
})
t.Run("No-op call is skipped", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
onFinishCh := make(chan error, 1)
call := &queuedAPICall{
@@ -124,12 +143,14 @@ func TestCallQueueAdd(t *testing.T) {
if !errors.Is(err, fwk.ErrCallSkipped) {
t.Fatalf("Expected call to be skipped, but got %v", err)
}
verifyQueueLen(t, cq, 0)
verifyQueueState(t, cq, map[string]int{"low": 0})
verifyCalls(t, cq)
expectOnFinish(t, onFinishCh, fwk.ErrCallSkipped)
})
t.Run("Two calls for different objects don't collide", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
call1 := &queuedAPICall{
APICall: &mockAPICall{
@@ -150,11 +171,13 @@ func TestCallQueueAdd(t *testing.T) {
if err := cq.add(call2); err != nil {
t.Fatalf("Unexpected error while adding call2: %v", err)
}
verifyQueueLen(t, cq, 2)
verifyQueueState(t, cq, map[string]int{"low": 1, "high": 1})
verifyCalls(t, cq, call1, call2)
})
t.Run("New call overwrites less relevant call", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
onFinishCh := make(chan error, 1)
callLow := &queuedAPICall{
@@ -177,12 +200,14 @@ func TestCallQueueAdd(t *testing.T) {
if err := cq.add(callHigh); err != nil {
t.Fatalf("Unexpected error while adding callHigh: %v", err)
}
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"high": 1})
verifyCalls(t, cq, callHigh)
expectOnFinish(t, onFinishCh, fwk.ErrCallOverwritten)
})
t.Run("New call is skipped if less relevant", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
onFinishCh := make(chan error, 1)
callLow := &queuedAPICall{
@@ -206,12 +231,14 @@ func TestCallQueueAdd(t *testing.T) {
if !errors.Is(err, fwk.ErrCallSkipped) {
t.Fatalf("Expected callLow to be skipped, but got %v", err)
}
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"high": 1})
verifyCalls(t, cq, callHigh)
expectOnFinish(t, onFinishCh, fwk.ErrCallSkipped)
})
t.Run("New call merges with old call and skips if no-op", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
onFinishCh1 := make(chan error, 1)
call1 := &queuedAPICall{
@@ -261,7 +288,7 @@ func TestCallQueueAdd(t *testing.T) {
if err := cq.add(call2); err != nil {
t.Fatalf("Unexpected error while adding call2: %v", err)
}
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"low": 1})
verifyCalls(t, cq, call2)
expectOnFinish(t, onFinishCh1, fwk.ErrCallOverwritten)
@@ -269,7 +296,7 @@ func TestCallQueueAdd(t *testing.T) {
if !errors.Is(err, fwk.ErrCallSkipped) {
t.Fatalf("Expected call3 to be skipped, but got %v", err)
}
verifyQueueLen(t, cq, 0)
verifyQueueState(t, cq, map[string]int{"low": 0})
verifyCalls(t, cq)
expectOnFinish(t, onFinishCh2, fwk.ErrCallOverwritten)
expectOnFinish(t, onFinishCh3, fwk.ErrCallSkipped)
@@ -281,6 +308,8 @@ func TestCallQueuePop(t *testing.T) {
uid2 := types.UID("uid2")
t.Run("Calls are popped from the queue in FIFO order", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
call1 := &queuedAPICall{
APICall: &mockAPICall{
@@ -301,6 +330,9 @@ func TestCallQueuePop(t *testing.T) {
t.Fatalf("Unexpected error while adding call2: %v", err)
}
// Verify pending calls after adding
verifyQueueState(t, cq, map[string]int{"low": 2})
poppedCall, err := cq.pop()
if err != nil {
t.Fatalf("Unexpected error while popping call1: %v", err)
@@ -308,7 +340,7 @@ func TestCallQueuePop(t *testing.T) {
if diff := cmp.Diff(call1, poppedCall, queuedAPICallCmpOpts...); diff != "" {
t.Errorf("First popped call does not patch call1 (-want +got):\n%s", diff)
}
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"low": 1})
verifyCalls(t, cq, call1, call2)
verifyInFlight(t, cq, uid1)
@@ -319,7 +351,7 @@ func TestCallQueuePop(t *testing.T) {
if diff := cmp.Diff(call2, poppedCall, queuedAPICallCmpOpts...); diff != "" {
t.Errorf("Second popped call does not match call2 (-want +got):\n%s", diff)
}
verifyQueueLen(t, cq, 0)
verifyQueueState(t, cq, map[string]int{"low": 0})
verifyCalls(t, cq, call1, call2)
verifyInFlight(t, cq, uid1, uid2)
})
@@ -363,6 +395,8 @@ func TestCallQueueFinalize(t *testing.T) {
uid := types.UID("uid")
t.Run("Call details are cleared if there is no waiting call", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
call := &queuedAPICall{
APICall: &mockAPICall{
@@ -380,13 +414,14 @@ func TestCallQueueFinalize(t *testing.T) {
}
cq.finalize(poppedCall)
verifyQueueLen(t, cq, 0)
verifyQueueState(t, cq, map[string]int{"low": 0})
verifyCalls(t, cq)
verifyInFlight(t, cq)
})
t.Run("UID is re-queued if a new call arrived while one was in-flight", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
call1 := &queuedAPICall{
APICall: &mockAPICall{
@@ -416,7 +451,7 @@ func TestCallQueueFinalize(t *testing.T) {
cq.finalize(poppedCall)
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"low": 1})
verifyCalls(t, cq, call2)
verifyInFlight(t, cq)
})
@@ -427,6 +462,8 @@ func TestCallQueueSyncObject(t *testing.T) {
uid2 := types.UID("uid2")
t.Run("Object is synced with pending call details", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
obj := &metav1.ObjectMeta{
UID: uid1,
@@ -449,7 +486,7 @@ func TestCallQueueSyncObject(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error while syncing object: %v", err)
}
verifyQueueLen(t, cq, 1)
verifyQueueState(t, cq, map[string]int{"low": 1})
verifyCalls(t, cq, call)
verifyInFlight(t, cq)
@@ -460,6 +497,8 @@ func TestCallQueueSyncObject(t *testing.T) {
})
t.Run("Pending call is canceled if sync results in no-op", func(t *testing.T) {
resetMetrics()
cq := newCallQueue(mockRelevances)
obj := &metav1.ObjectMeta{UID: uid1}
onFinishCh := make(chan error, 1)
@@ -486,7 +525,7 @@ func TestCallQueueSyncObject(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error while syncing object: %v", err)
}
verifyQueueLen(t, cq, 0)
verifyQueueState(t, cq, map[string]int{"low": 0})
verifyCalls(t, cq)
verifyInFlight(t, cq)
expectOnFinish(t, onFinishCh, fwk.ErrCallSkipped)

View File

@@ -120,6 +120,11 @@ var (
PreemptionGoroutinesDuration *metrics.HistogramVec
PreemptionGoroutinesExecutionTotal *metrics.CounterVec
// The below are only available when the SchedulerAsyncAPICalls feature gate is enabled.
AsyncAPICallsTotal *metrics.CounterVec
AsyncAPICallDuration *metrics.HistogramVec
AsyncAPIPendingCalls *metrics.GaugeVec
// metricsList is a list of all metrics that should be registered always, regardless of any feature gate's value.
metricsList []metrics.Registerable
)
@@ -140,6 +145,13 @@ func Register() {
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption) {
RegisterMetrics(PreemptionGoroutinesDuration, PreemptionGoroutinesExecutionTotal)
}
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) {
RegisterMetrics(
AsyncAPICallsTotal,
AsyncAPICallDuration,
AsyncAPIPendingCalls,
)
}
})
}
@@ -334,6 +346,35 @@ func InitMetrics() {
},
[]string{"result"})
// The below (AsyncAPICallsTotal, AsyncAPICallDuration and AsyncAPIPendingCalls) are only available when the SchedulerAsyncAPICalls feature gate is enabled.
AsyncAPICallsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "async_api_call_execution_total",
Help: "Total number of API calls executed by the async dispatcher.",
StabilityLevel: metrics.ALPHA,
},
[]string{"call_type", "result"})
AsyncAPICallDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "async_api_call_execution_duration_seconds",
Help: "Duration in seconds for executing API call in the async dispatcher.",
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
StabilityLevel: metrics.ALPHA,
},
[]string{"call_type", "result"})
AsyncAPIPendingCalls = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
Name: "pending_async_api_calls",
Help: "Number of API calls currently pending in the async queue.",
StabilityLevel: metrics.ALPHA,
},
[]string{"call_type"})
metricsList = []metrics.Registerable{
scheduleAttempts,
schedulingLatency,