diff --git a/pkg/scheduler/backend/api_dispatcher/api_dispatcher.go b/pkg/scheduler/backend/api_dispatcher/api_dispatcher.go index 8c3d6c6e545..4f003b17426 100644 --- a/pkg/scheduler/backend/api_dispatcher/api_dispatcher.go +++ b/pkg/scheduler/backend/api_dispatcher/api_dispatcher.go @@ -18,12 +18,14 @@ package apidispatcher import ( "context" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) // APIDispatcher implements the fwk.APIDispatcher interface and allows for queueing and dispatching API calls asynchronously. @@ -100,7 +102,18 @@ func (ad *APIDispatcher) Run(logger klog.Logger) { go func() { defer ad.goroutinesLimiter.release() + startTime := time.Now() + err := apiCall.Execute(ctx, ad.client) + + result := metrics.GoroutineResultSuccess + if err != nil { + result = metrics.GoroutineResultError + } + callType := string(apiCall.CallType()) + metrics.AsyncAPICallsTotal.WithLabelValues(callType, result).Inc() + metrics.AsyncAPICallDuration.WithLabelValues(callType, result).Observe(time.Since(startTime).Seconds()) + ad.callQueue.finalize(apiCall) apiCall.sendOnFinish(err) }() diff --git a/pkg/scheduler/backend/api_dispatcher/api_dispatcher_test.go b/pkg/scheduler/backend/api_dispatcher/api_dispatcher_test.go index 26eec0dd643..0134ec4c4ee 100644 --- a/pkg/scheduler/backend/api_dispatcher/api_dispatcher_test.go +++ b/pkg/scheduler/backend/api_dispatcher/api_dispatcher_test.go @@ -24,10 +24,22 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2/ktesting" fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) +func init() { + metrics.Register() +} + +func resetMetrics() { + metrics.AsyncAPICallsTotal.Reset() + metrics.AsyncAPICallDuration.Reset() + metrics.AsyncAPIPendingCalls.Reset() +} + const ( mockCallTypeLow fwk.APICallType = "low" mockCallTypeHigh fwk.APICallType = "high" @@ -87,6 +99,9 @@ func (mac *mockAPICall) IsNoOp() bool { } func TestAPIDispatcherLifecycle(t *testing.T) { + // Reset all async API metrics + resetMetrics() + logger, _ := ktesting.NewTestContext(t) uid := types.UID("uid") @@ -129,7 +144,6 @@ func TestAPIDispatcherLifecycle(t *testing.T) { if err := dispatcher.Add(call1, opts1); err != nil { t.Fatalf("Unexpected error while adding a call1: %v", err) } - if err := dispatcher.Add(call2, opts2); err != nil { t.Fatalf("Unexpected error while adding a call2: %v", err) } @@ -160,4 +174,8 @@ func TestAPIDispatcherLifecycle(t *testing.T) { if isNoOpCalls != 2 { t.Errorf("Expected call2's IsNoOp() to be called two times, but was %v times", executeCalls) } + + // Verify execution metrics + testutil.AssertVectorCount(t, "scheduler_async_api_call_execution_total", map[string]string{"call_type": "low", "result": "success"}, 1) + testutil.AssertHistogramTotalCount(t, "scheduler_async_api_call_execution_duration_seconds", map[string]string{"call_type": "low", "result": "success"}, 1) } diff --git a/pkg/scheduler/backend/api_dispatcher/call_queue.go b/pkg/scheduler/backend/api_dispatcher/call_queue.go index 8d7daf9fcfe..22e04b81e8a 100644 --- a/pkg/scheduler/backend/api_dispatcher/call_queue.go +++ b/pkg/scheduler/backend/api_dispatcher/call_queue.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/utils/buffer" ) @@ -122,6 +123,11 @@ func (cq *callQueue) merge(oldAPICall, apiCall *queuedAPICall) error { } if oldAPICall.CallType() != apiCall.CallType() { // API call types don't match, so we overwrite the old one. + // Update the pending calls metric if the old call is not in-flight. + if !cq.inFlightEntities.Has(oldAPICall.UID()) { + metrics.AsyncAPIPendingCalls.WithLabelValues(string(oldAPICall.CallType())).Dec() + metrics.AsyncAPIPendingCalls.WithLabelValues(string(apiCall.CallType())).Inc() + } oldAPICall.sendOnFinish(fmt.Errorf("a more relevant call was enqueued for this object: %w", fwk.ErrCallOverwritten)) return nil } @@ -170,6 +176,7 @@ func (cq *callQueue) enqueue(apiCall *queuedAPICall, oldCallPresent bool) error return nil } cq.callsQueue.WriteOne(objectUID) + metrics.AsyncAPIPendingCalls.WithLabelValues(string(apiCall.CallType())).Inc() cq.cond.Broadcast() return nil } @@ -197,9 +204,15 @@ func removeFromQueue(queue *buffer.Ring[types.UID], objectUID types.UID) *buffer // (i.e., are not in-flight). // Must be called under cq.lock. func (cq *callQueue) removePending(objectUID types.UID) { + apiCall, ok := cq.apiCalls[objectUID] + if !ok { + return + } delete(cq.apiCalls, objectUID) if !cq.inFlightEntities.Has(objectUID) { cq.callsQueue = *removeFromQueue(&cq.callsQueue, objectUID) + callType := string(apiCall.CallType()) + metrics.AsyncAPIPendingCalls.WithLabelValues(callType).Dec() } } @@ -246,6 +259,8 @@ func (cq *callQueue) pop() (*queuedAPICall, error) { return nil, fmt.Errorf("object %s is not present in a map with API calls details", objectUID) } cq.inFlightEntities.Insert(objectUID) + callType := string(apiCall.CallType()) + metrics.AsyncAPIPendingCalls.WithLabelValues(callType).Dec() return apiCall, nil } @@ -266,6 +281,8 @@ func (cq *callQueue) finalize(apiCall *queuedAPICall) { } else { // The API call in the map has changed, so re-queue the object for the new call to be processed. cq.callsQueue.WriteOne(objectUID) + callType := string(newAPICall.CallType()) + metrics.AsyncAPIPendingCalls.WithLabelValues(callType).Inc() cq.cond.Broadcast() } cq.inFlightEntities.Delete(objectUID) diff --git a/pkg/scheduler/backend/api_dispatcher/call_queue_test.go b/pkg/scheduler/backend/api_dispatcher/call_queue_test.go index 744cf05b72e..926f3cff755 100644 --- a/pkg/scheduler/backend/api_dispatcher/call_queue_test.go +++ b/pkg/scheduler/backend/api_dispatcher/call_queue_test.go @@ -26,7 +26,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/component-base/metrics/testutil" fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) var queuedAPICallCmpOpts = []cmp.Option{ @@ -35,12 +37,25 @@ var queuedAPICallCmpOpts = []cmp.Option{ cmpopts.IgnoreFields(mockAPICall{}, "executeFn", "mergeFn", "syncFn"), } -// verifyQueueLen is a test helper to check the length of the callsQueue. -func verifyQueueLen(t *testing.T, cq *callQueue, len int) { +// verifyQueueState is a test helper to check both the queue length and pending call metrics. +func verifyQueueState(t *testing.T, cq *callQueue, expectedPendingCalls map[string]int) { t.Helper() - if got := cq.callsQueue.Len(); got != len { - t.Errorf("Expected callsQueue to have %d item(s), but has %d", len, got) + expectedQueueLen := 0 + // Check pending call metrics + for callType, expected := range expectedPendingCalls { + expectedQueueLen += expected + + value, err := testutil.GetGaugeMetricValue(metrics.AsyncAPIPendingCalls.WithLabelValues(callType)) + if err != nil { + t.Errorf("Failed to get pending calls metric for %s: %v", callType, err) + } else if int(value) != expected { + t.Errorf("Expected pending calls metric for %s to be %d, got %d", callType, expected, int(value)) + } + } + + if got := cq.callsQueue.Len(); got != expectedQueueLen { + t.Errorf("Expected callsQueue to have %d item(s), but has %d", expectedQueueLen, got) } } @@ -91,6 +106,8 @@ func TestCallQueueAdd(t *testing.T) { uid2 := types.UID("uid2") t.Run("First call is added without collision", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) call := &queuedAPICall{ APICall: &mockAPICall{ @@ -102,11 +119,13 @@ func TestCallQueueAdd(t *testing.T) { if err := cq.add(call); err != nil { t.Fatalf("Unexpected error while adding call: %v", err) } - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"low": 1}) verifyCalls(t, cq, call) }) t.Run("No-op call is skipped", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) onFinishCh := make(chan error, 1) call := &queuedAPICall{ @@ -124,12 +143,14 @@ func TestCallQueueAdd(t *testing.T) { if !errors.Is(err, fwk.ErrCallSkipped) { t.Fatalf("Expected call to be skipped, but got %v", err) } - verifyQueueLen(t, cq, 0) + verifyQueueState(t, cq, map[string]int{"low": 0}) verifyCalls(t, cq) expectOnFinish(t, onFinishCh, fwk.ErrCallSkipped) }) t.Run("Two calls for different objects don't collide", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) call1 := &queuedAPICall{ APICall: &mockAPICall{ @@ -150,11 +171,13 @@ func TestCallQueueAdd(t *testing.T) { if err := cq.add(call2); err != nil { t.Fatalf("Unexpected error while adding call2: %v", err) } - verifyQueueLen(t, cq, 2) + verifyQueueState(t, cq, map[string]int{"low": 1, "high": 1}) verifyCalls(t, cq, call1, call2) }) t.Run("New call overwrites less relevant call", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) onFinishCh := make(chan error, 1) callLow := &queuedAPICall{ @@ -177,12 +200,14 @@ func TestCallQueueAdd(t *testing.T) { if err := cq.add(callHigh); err != nil { t.Fatalf("Unexpected error while adding callHigh: %v", err) } - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"high": 1}) verifyCalls(t, cq, callHigh) expectOnFinish(t, onFinishCh, fwk.ErrCallOverwritten) }) t.Run("New call is skipped if less relevant", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) onFinishCh := make(chan error, 1) callLow := &queuedAPICall{ @@ -206,12 +231,14 @@ func TestCallQueueAdd(t *testing.T) { if !errors.Is(err, fwk.ErrCallSkipped) { t.Fatalf("Expected callLow to be skipped, but got %v", err) } - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"high": 1}) verifyCalls(t, cq, callHigh) expectOnFinish(t, onFinishCh, fwk.ErrCallSkipped) }) t.Run("New call merges with old call and skips if no-op", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) onFinishCh1 := make(chan error, 1) call1 := &queuedAPICall{ @@ -261,7 +288,7 @@ func TestCallQueueAdd(t *testing.T) { if err := cq.add(call2); err != nil { t.Fatalf("Unexpected error while adding call2: %v", err) } - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"low": 1}) verifyCalls(t, cq, call2) expectOnFinish(t, onFinishCh1, fwk.ErrCallOverwritten) @@ -269,7 +296,7 @@ func TestCallQueueAdd(t *testing.T) { if !errors.Is(err, fwk.ErrCallSkipped) { t.Fatalf("Expected call3 to be skipped, but got %v", err) } - verifyQueueLen(t, cq, 0) + verifyQueueState(t, cq, map[string]int{"low": 0}) verifyCalls(t, cq) expectOnFinish(t, onFinishCh2, fwk.ErrCallOverwritten) expectOnFinish(t, onFinishCh3, fwk.ErrCallSkipped) @@ -281,6 +308,8 @@ func TestCallQueuePop(t *testing.T) { uid2 := types.UID("uid2") t.Run("Calls are popped from the queue in FIFO order", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) call1 := &queuedAPICall{ APICall: &mockAPICall{ @@ -301,6 +330,9 @@ func TestCallQueuePop(t *testing.T) { t.Fatalf("Unexpected error while adding call2: %v", err) } + // Verify pending calls after adding + verifyQueueState(t, cq, map[string]int{"low": 2}) + poppedCall, err := cq.pop() if err != nil { t.Fatalf("Unexpected error while popping call1: %v", err) @@ -308,7 +340,7 @@ func TestCallQueuePop(t *testing.T) { if diff := cmp.Diff(call1, poppedCall, queuedAPICallCmpOpts...); diff != "" { t.Errorf("First popped call does not patch call1 (-want +got):\n%s", diff) } - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"low": 1}) verifyCalls(t, cq, call1, call2) verifyInFlight(t, cq, uid1) @@ -319,7 +351,7 @@ func TestCallQueuePop(t *testing.T) { if diff := cmp.Diff(call2, poppedCall, queuedAPICallCmpOpts...); diff != "" { t.Errorf("Second popped call does not match call2 (-want +got):\n%s", diff) } - verifyQueueLen(t, cq, 0) + verifyQueueState(t, cq, map[string]int{"low": 0}) verifyCalls(t, cq, call1, call2) verifyInFlight(t, cq, uid1, uid2) }) @@ -363,6 +395,8 @@ func TestCallQueueFinalize(t *testing.T) { uid := types.UID("uid") t.Run("Call details are cleared if there is no waiting call", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) call := &queuedAPICall{ APICall: &mockAPICall{ @@ -380,13 +414,14 @@ func TestCallQueueFinalize(t *testing.T) { } cq.finalize(poppedCall) - - verifyQueueLen(t, cq, 0) + verifyQueueState(t, cq, map[string]int{"low": 0}) verifyCalls(t, cq) verifyInFlight(t, cq) }) t.Run("UID is re-queued if a new call arrived while one was in-flight", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) call1 := &queuedAPICall{ APICall: &mockAPICall{ @@ -416,7 +451,7 @@ func TestCallQueueFinalize(t *testing.T) { cq.finalize(poppedCall) - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"low": 1}) verifyCalls(t, cq, call2) verifyInFlight(t, cq) }) @@ -427,6 +462,8 @@ func TestCallQueueSyncObject(t *testing.T) { uid2 := types.UID("uid2") t.Run("Object is synced with pending call details", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) obj := &metav1.ObjectMeta{ UID: uid1, @@ -449,7 +486,7 @@ func TestCallQueueSyncObject(t *testing.T) { if err != nil { t.Fatalf("Unexpected error while syncing object: %v", err) } - verifyQueueLen(t, cq, 1) + verifyQueueState(t, cq, map[string]int{"low": 1}) verifyCalls(t, cq, call) verifyInFlight(t, cq) @@ -460,6 +497,8 @@ func TestCallQueueSyncObject(t *testing.T) { }) t.Run("Pending call is canceled if sync results in no-op", func(t *testing.T) { + resetMetrics() + cq := newCallQueue(mockRelevances) obj := &metav1.ObjectMeta{UID: uid1} onFinishCh := make(chan error, 1) @@ -486,7 +525,7 @@ func TestCallQueueSyncObject(t *testing.T) { if err != nil { t.Fatalf("Unexpected error while syncing object: %v", err) } - verifyQueueLen(t, cq, 0) + verifyQueueState(t, cq, map[string]int{"low": 0}) verifyCalls(t, cq) verifyInFlight(t, cq) expectOnFinish(t, onFinishCh, fwk.ErrCallSkipped) diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index e993fff5e9e..c383c06d6ff 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -120,6 +120,11 @@ var ( PreemptionGoroutinesDuration *metrics.HistogramVec PreemptionGoroutinesExecutionTotal *metrics.CounterVec + // The below are only available when the SchedulerAsyncAPICalls feature gate is enabled. + AsyncAPICallsTotal *metrics.CounterVec + AsyncAPICallDuration *metrics.HistogramVec + AsyncAPIPendingCalls *metrics.GaugeVec + // metricsList is a list of all metrics that should be registered always, regardless of any feature gate's value. metricsList []metrics.Registerable ) @@ -140,6 +145,13 @@ func Register() { if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption) { RegisterMetrics(PreemptionGoroutinesDuration, PreemptionGoroutinesExecutionTotal) } + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) { + RegisterMetrics( + AsyncAPICallsTotal, + AsyncAPICallDuration, + AsyncAPIPendingCalls, + ) + } }) } @@ -334,6 +346,35 @@ func InitMetrics() { }, []string{"result"}) + // The below (AsyncAPICallsTotal, AsyncAPICallDuration and AsyncAPIPendingCalls) are only available when the SchedulerAsyncAPICalls feature gate is enabled. + AsyncAPICallsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: SchedulerSubsystem, + Name: "async_api_call_execution_total", + Help: "Total number of API calls executed by the async dispatcher.", + StabilityLevel: metrics.ALPHA, + }, + []string{"call_type", "result"}) + + AsyncAPICallDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "async_api_call_execution_duration_seconds", + Help: "Duration in seconds for executing API call in the async dispatcher.", + Buckets: metrics.ExponentialBuckets(0.001, 2, 15), + StabilityLevel: metrics.ALPHA, + }, + []string{"call_type", "result"}) + + AsyncAPIPendingCalls = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: SchedulerSubsystem, + Name: "pending_async_api_calls", + Help: "Number of API calls currently pending in the async queue.", + StabilityLevel: metrics.ALPHA, + }, + []string{"call_type"}) + metricsList = []metrics.Registerable{ scheduleAttempts, schedulingLatency,