From 0aea6a1e45d178371b44eabcaf0d318a01f9ac3d Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 24 Jul 2025 21:38:10 +0000 Subject: [PATCH 1/3] TestStreamTranslator_BlockRedirects use subtests Reset metrics per status code also allow to log the particular status code that flake, in case the problem comes from an specific one. Change-Id: I29f67d4c5d76449350f45049f45ce8325a2d0ddd --- .../pkg/util/proxy/streamtranslator_test.go | 110 +++++++++--------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go index 992eef77a18..89f82a7f0d9 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go @@ -771,8 +771,8 @@ apiserver_stream_translator_requests_total{code="400"} 1 // redirects; it will thrown an error instead. func TestStreamTranslator_BlockRedirects(t *testing.T) { metrics.Register() - metrics.ResetForTest() t.Cleanup(metrics.ResetForTest) + for _, statusCode := range []int{ http.StatusMovedPermanently, // 301 http.StatusFound, // 302 @@ -780,65 +780,67 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) { http.StatusTemporaryRedirect, // 307 http.StatusPermanentRedirect, // 308 } { - // Create upstream fake SPDY server which returns a redirect. - spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Location", "/") - w.WriteHeader(statusCode) - })) - defer spdyServer.Close() - spdyLocation, err := url.Parse(spdyServer.URL) - if err != nil { - t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL) - } - spdyTransport, err := fakeTransport() - if err != nil { - t.Fatalf("Unexpected error creating transport: %v", err) - } - streams := Options{Stdout: true} - streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams) - streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - streamTranslator.ServeHTTP(w, req) - })) - defer streamTranslatorServer.Close() - // Now create the websocket client (executor), and point it to the "streamTranslatorServer". - streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL) - if err != nil { - t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL) - } - exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL) - if err != nil { - t.Errorf("unexpected error creating websocket executor: %v", err) - } - errorChan := make(chan error) - go func() { - // Start the streaming on the WebSocket "exec" client. - // Should return "redirect not allowed" error. - errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{}) - }() + t.Run(fmt.Sprintf("statusCode=%d", statusCode), func(t *testing.T) { + metrics.ResetForTest() + // Create upstream fake SPDY server which returns a redirect. + spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Location", "/") + w.WriteHeader(statusCode) + })) + defer spdyServer.Close() + spdyLocation, err := url.Parse(spdyServer.URL) + if err != nil { + t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL) + } + spdyTransport, err := fakeTransport() + if err != nil { + t.Fatalf("Unexpected error creating transport: %v", err) + } + streams := Options{Stdout: true} + streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams) + streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + streamTranslator.ServeHTTP(w, req) + })) + defer streamTranslatorServer.Close() + // Now create the websocket client (executor), and point it to the "streamTranslatorServer". + streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL) + if err != nil { + t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL) + } + exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL) + if err != nil { + t.Errorf("unexpected error creating websocket executor: %v", err) + } + errorChan := make(chan error) + go func() { + // Start the streaming on the WebSocket "exec" client. + // Should return "redirect not allowed" error. + errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{}) + }() - select { - case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("expect stream to be closed after connection is closed.") - case err := <-errorChan: - // Must return "redirect now allowed" error. - if err == nil { - t.Fatalf("expected error, but received none") + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("expect stream to be closed after connection is closed.") + case err := <-errorChan: + // Must return "redirect now allowed" error. + if err == nil { + t.Fatalf("expected error, but received none") + } + if !strings.Contains(err.Error(), "redirect not allowed") { + t.Errorf("expected redirect not allowed error, got (%s)", err) + } } - if !strings.Contains(err.Error(), "redirect not allowed") { - t.Errorf("expected redirect not allowed error, got (%s)", err) - } - } - // Validate the streamtranslator metrics; should have one 500 failure each loop. - metricNames := []string{"apiserver_stream_translator_requests_total"} - expected := ` + // Validate the streamtranslator metrics; should have one 500 failure each loop. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` # HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 # TYPE apiserver_stream_translator_requests_total counter apiserver_stream_translator_requests_total{code="500"} 1 ` - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { - t.Fatal(err) - } - metrics.ResetForTest() // Clear metrics each loop + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } + }) } } From 7ba22700bfd12cd32ff72db022e4b53ccda1deca Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 28 Jul 2025 21:59:08 +0000 Subject: [PATCH 2/3] websocket streamtranslator increament metrics before writing status Test that assert on metrics use the returned status to sync the test, if the metric is updated AFTER the state is returned the test will flake because the assertion on the metric will race. Change-Id: I317708a22cb47256c37dac3cab0463a2f925ad6b --- .../apiserver/pkg/util/proxy/streamtranslator.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go index 6593a3ed966..33bbdc32526 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go @@ -73,14 +73,14 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req // Creating SPDY executor, ensuring redirects are not followed. spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport, PingPeriod: 5 * time.Second}) if err != nil { - websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) + websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck return } spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location) if err != nil { - websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) + websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck return } @@ -121,27 +121,27 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req if err != nil { //nolint:errcheck // Ignore writeStatus returned error if statusErr, ok := err.(*apierrors.StatusError); ok { - websocketStreams.writeStatus(statusErr) // Increment status code returned within status error. metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code))) + websocketStreams.writeStatus(statusErr) } else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() { - websocketStreams.writeStatus(codeExitToStatusError(exitErr)) // Returned an exit code from the container, so not an error in // stream translator--add StatusOK to metrics. metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) + websocketStreams.writeStatus(codeExitToStatusError(exitErr)) } else { - websocketStreams.writeStatus(apierrors.NewInternalError(err)) metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) + websocketStreams.writeStatus(apierrors.NewInternalError(err)) } return } + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) // Write the success status back to the WebSocket client. //nolint:errcheck websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusSuccess, }}) - metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) } // translatorSizeQueue feeds the size events from the WebSocket From f07dcd443d7335d09dc0de7a47485e2e6c87d725 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 24 Jul 2025 21:56:42 +0000 Subject: [PATCH 3/3] fix flake on TestStreamTranslator_WebSocketServerErrors The metrics assertion race with the metric update, and since this happens at the serverside, we use an active look to check the metrics instead of expecting to be updated immidiatly. Change-Id: I9a64b66301d5f4ac3df0c0a01de10602a20f89ea --- .../pkg/util/proxy/streamtranslator_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go index 89f82a7f0d9..c3452202236 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go @@ -755,15 +755,21 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) { t.Errorf("expected websocket bad handshake error, got (%s)", err) } } - // Validate the streamtranslator metrics; should have one 500 failure. + // Validate the streamtranslator metrics; should have one 400 failure. + // Use polling to wait for the metric to be updated asynchronously. metricNames := []string{"apiserver_stream_translator_requests_total"} expected := ` # HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 # TYPE apiserver_stream_translator_requests_total counter apiserver_stream_translator_requests_total{code="400"} 1 ` - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { - t.Fatal(err) + if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { + if testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) == nil { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to observe metric after waiting 2 seconds: %v", err) } }