Merge pull request #130372 from aojea/e2e_websocket

E2e websocket
This commit is contained in:
Kubernetes Prow Robot
2025-02-24 18:48:29 -08:00
committed by GitHub
2 changed files with 38 additions and 4 deletions

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/framework"
)
// NewTransport creates a transport which uses the port forward dialer.
@@ -91,6 +92,20 @@ func (d *Dialer) DialContainerPort(ctx context.Context, addr Addr) (conn net.Con
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(req.URL(), restConfig)
if err != nil {
return nil, err
}
// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, func(err error) bool {
if httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) {
framework.Logf("fallback to secondary dialer from primary dialer err: %v", err)
return true
}
framework.Logf("unexpected error trying to use websockets for portforward: %v", err)
return false
})
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %w", err)

View File

@@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
@@ -80,8 +81,8 @@ func ExecWithOptionsContext(ctx context.Context, f *framework.Framework, options
}, scheme.ParameterCodec)
var stdout, stderr bytes.Buffer
framework.Logf("ExecWithOptions: execute(POST %s)", req.URL())
err := execute(ctx, "POST", req.URL(), f.ClientConfig(), options.Stdin, &stdout, &stderr, tty)
framework.Logf("ExecWithOptions: execute(%s)", req.URL())
err := execute(ctx, req.URL(), f.ClientConfig(), options.Stdin, &stdout, &stderr, tty)
if options.PreserveWhitespace {
return stdout.String(), stderr.String(), err
@@ -181,11 +182,29 @@ func VerifyExecInPodFail(ctx context.Context, f *framework.Framework, pod *v1.Po
return fmt.Errorf("%q should fail with exit code %d, but exit without error", shExec, exitCode)
}
func execute(ctx context.Context, method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
func execute(ctx context.Context, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
// WebSocketExecutor executor is default
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
spdyExec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return err
}
exec, err := remotecommand.NewFallbackExecutor(websocketExec, spdyExec, func(err error) bool {
if httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) {
framework.Logf("fallback to secondary dialer from primary dialer err: %v", err)
return true
}
framework.Logf("unexpected error trying to use websockets for pod exec: %v", err)
return false
})
if err != nil {
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,