diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go index 6593a3ed966..33bbdc32526 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go @@ -73,14 +73,14 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req // Creating SPDY executor, ensuring redirects are not followed. spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport, PingPeriod: 5 * time.Second}) if err != nil { - websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) + websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck return } spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location) if err != nil { - websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) + websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck return } @@ -121,27 +121,27 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req if err != nil { //nolint:errcheck // Ignore writeStatus returned error if statusErr, ok := err.(*apierrors.StatusError); ok { - websocketStreams.writeStatus(statusErr) // Increment status code returned within status error. metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code))) + websocketStreams.writeStatus(statusErr) } else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() { - websocketStreams.writeStatus(codeExitToStatusError(exitErr)) // Returned an exit code from the container, so not an error in // stream translator--add StatusOK to metrics. metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) + websocketStreams.writeStatus(codeExitToStatusError(exitErr)) } else { - websocketStreams.writeStatus(apierrors.NewInternalError(err)) metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) + websocketStreams.writeStatus(apierrors.NewInternalError(err)) } return } + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) // Write the success status back to the WebSocket client. //nolint:errcheck websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusSuccess, }}) - metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) } // translatorSizeQueue feeds the size events from the WebSocket diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go index 992eef77a18..c3452202236 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go @@ -755,15 +755,21 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) { t.Errorf("expected websocket bad handshake error, got (%s)", err) } } - // Validate the streamtranslator metrics; should have one 500 failure. + // Validate the streamtranslator metrics; should have one 400 failure. + // Use polling to wait for the metric to be updated asynchronously. metricNames := []string{"apiserver_stream_translator_requests_total"} expected := ` # HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 # TYPE apiserver_stream_translator_requests_total counter apiserver_stream_translator_requests_total{code="400"} 1 ` - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { - t.Fatal(err) + if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { + if testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) == nil { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Failed to observe metric after waiting 2 seconds: %v", err) } } @@ -771,8 +777,8 @@ apiserver_stream_translator_requests_total{code="400"} 1 // redirects; it will thrown an error instead. func TestStreamTranslator_BlockRedirects(t *testing.T) { metrics.Register() - metrics.ResetForTest() t.Cleanup(metrics.ResetForTest) + for _, statusCode := range []int{ http.StatusMovedPermanently, // 301 http.StatusFound, // 302 @@ -780,65 +786,67 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) { http.StatusTemporaryRedirect, // 307 http.StatusPermanentRedirect, // 308 } { - // Create upstream fake SPDY server which returns a redirect. - spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Location", "/") - w.WriteHeader(statusCode) - })) - defer spdyServer.Close() - spdyLocation, err := url.Parse(spdyServer.URL) - if err != nil { - t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL) - } - spdyTransport, err := fakeTransport() - if err != nil { - t.Fatalf("Unexpected error creating transport: %v", err) - } - streams := Options{Stdout: true} - streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams) - streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - streamTranslator.ServeHTTP(w, req) - })) - defer streamTranslatorServer.Close() - // Now create the websocket client (executor), and point it to the "streamTranslatorServer". - streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL) - if err != nil { - t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL) - } - exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL) - if err != nil { - t.Errorf("unexpected error creating websocket executor: %v", err) - } - errorChan := make(chan error) - go func() { - // Start the streaming on the WebSocket "exec" client. - // Should return "redirect not allowed" error. - errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{}) - }() + t.Run(fmt.Sprintf("statusCode=%d", statusCode), func(t *testing.T) { + metrics.ResetForTest() + // Create upstream fake SPDY server which returns a redirect. + spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Location", "/") + w.WriteHeader(statusCode) + })) + defer spdyServer.Close() + spdyLocation, err := url.Parse(spdyServer.URL) + if err != nil { + t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL) + } + spdyTransport, err := fakeTransport() + if err != nil { + t.Fatalf("Unexpected error creating transport: %v", err) + } + streams := Options{Stdout: true} + streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams) + streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + streamTranslator.ServeHTTP(w, req) + })) + defer streamTranslatorServer.Close() + // Now create the websocket client (executor), and point it to the "streamTranslatorServer". + streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL) + if err != nil { + t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL) + } + exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL) + if err != nil { + t.Errorf("unexpected error creating websocket executor: %v", err) + } + errorChan := make(chan error) + go func() { + // Start the streaming on the WebSocket "exec" client. + // Should return "redirect not allowed" error. + errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{}) + }() - select { - case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("expect stream to be closed after connection is closed.") - case err := <-errorChan: - // Must return "redirect now allowed" error. - if err == nil { - t.Fatalf("expected error, but received none") + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("expect stream to be closed after connection is closed.") + case err := <-errorChan: + // Must return "redirect now allowed" error. + if err == nil { + t.Fatalf("expected error, but received none") + } + if !strings.Contains(err.Error(), "redirect not allowed") { + t.Errorf("expected redirect not allowed error, got (%s)", err) + } } - if !strings.Contains(err.Error(), "redirect not allowed") { - t.Errorf("expected redirect not allowed error, got (%s)", err) - } - } - // Validate the streamtranslator metrics; should have one 500 failure each loop. - metricNames := []string{"apiserver_stream_translator_requests_total"} - expected := ` + // Validate the streamtranslator metrics; should have one 500 failure each loop. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` # HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 # TYPE apiserver_stream_translator_requests_total counter apiserver_stream_translator_requests_total{code="500"} 1 ` - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { - t.Fatal(err) - } - metrics.ResetForTest() // Clear metrics each loop + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } + }) } }