Merge pull request #133193 from aojea/deflake_TestStreamTranslator_WebSocketServerErrors

Deflake test stream translator web socket server errors
This commit is contained in:
Kubernetes Prow Robot
2025-07-29 11:24:26 -07:00
committed by GitHub
2 changed files with 71 additions and 63 deletions

View File

@@ -73,14 +73,14 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
// Creating SPDY executor, ensuring redirects are not followed.
spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport, PingPeriod: 5 * time.Second})
if err != nil {
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
return
}
spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location)
if err != nil {
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
return
}
@@ -121,27 +121,27 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
if err != nil {
//nolint:errcheck // Ignore writeStatus returned error
if statusErr, ok := err.(*apierrors.StatusError); ok {
websocketStreams.writeStatus(statusErr)
// Increment status code returned within status error.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code)))
websocketStreams.writeStatus(statusErr)
} else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() {
websocketStreams.writeStatus(codeExitToStatusError(exitErr))
// Returned an exit code from the container, so not an error in
// stream translator--add StatusOK to metrics.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
websocketStreams.writeStatus(codeExitToStatusError(exitErr))
} else {
websocketStreams.writeStatus(apierrors.NewInternalError(err))
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
websocketStreams.writeStatus(apierrors.NewInternalError(err))
}
return
}
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
// Write the success status back to the WebSocket client.
//nolint:errcheck
websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusSuccess,
}})
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
}
// translatorSizeQueue feeds the size events from the WebSocket

View File

@@ -755,15 +755,21 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
t.Errorf("expected websocket bad handshake error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure.
// Validate the streamtranslator metrics; should have one 400 failure.
// Use polling to wait for the metric to be updated asynchronously.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="400"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) {
if testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) == nil {
return true, nil
}
return false, nil
}); err != nil {
t.Fatalf("Failed to observe metric after waiting 2 seconds: %v", err)
}
}
@@ -771,8 +777,8 @@ apiserver_stream_translator_requests_total{code="400"} 1
// redirects; it will thrown an error instead.
func TestStreamTranslator_BlockRedirects(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
for _, statusCode := range []int{
http.StatusMovedPermanently, // 301
http.StatusFound, // 302
@@ -780,65 +786,67 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) {
http.StatusTemporaryRedirect, // 307
http.StatusPermanentRedirect, // 308
} {
// Create upstream fake SPDY server which returns a redirect.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Location", "/")
w.WriteHeader(statusCode)
}))
defer spdyServer.Close()
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdout: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
// Should return "redirect not allowed" error.
errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{})
}()
t.Run(fmt.Sprintf("statusCode=%d", statusCode), func(t *testing.T) {
metrics.ResetForTest()
// Create upstream fake SPDY server which returns a redirect.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Location", "/")
w.WriteHeader(statusCode)
}))
defer spdyServer.Close()
spdyLocation, err := url.Parse(spdyServer.URL)
if err != nil {
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
}
spdyTransport, err := fakeTransport()
if err != nil {
t.Fatalf("Unexpected error creating transport: %v", err)
}
streams := Options{Stdout: true}
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamTranslator.ServeHTTP(w, req)
}))
defer streamTranslatorServer.Close()
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
if err != nil {
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
}
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
if err != nil {
t.Errorf("unexpected error creating websocket executor: %v", err)
}
errorChan := make(chan error)
go func() {
// Start the streaming on the WebSocket "exec" client.
// Should return "redirect not allowed" error.
errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{})
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
// Must return "redirect now allowed" error.
if err == nil {
t.Fatalf("expected error, but received none")
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expect stream to be closed after connection is closed.")
case err := <-errorChan:
// Must return "redirect now allowed" error.
if err == nil {
t.Fatalf("expected error, but received none")
}
if !strings.Contains(err.Error(), "redirect not allowed") {
t.Errorf("expected redirect not allowed error, got (%s)", err)
}
}
if !strings.Contains(err.Error(), "redirect not allowed") {
t.Errorf("expected redirect not allowed error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure each loop.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
// Validate the streamtranslator metrics; should have one 500 failure each loop.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="500"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
metrics.ResetForTest() // Clear metrics each loop
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
})
}
}