mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Deflake remote command execution test
This commit is contained in:
		@@ -18,271 +18,175 @@ package remotecommand
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"io"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type fakeUpgrader struct {
 | 
			
		||||
	conn *fakeUpgradeConnection
 | 
			
		||||
	err  error
 | 
			
		||||
func fakeExecServer(t *testing.T, i int, stdinData, stdoutData, stderrData, errorData string, tty bool) http.HandlerFunc {
 | 
			
		||||
	// error + stdin + stdout
 | 
			
		||||
	expectedStreams := 3
 | 
			
		||||
	if !tty {
 | 
			
		||||
		// stderr
 | 
			
		||||
		expectedStreams++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func (u *fakeUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
 | 
			
		||||
	return u.conn, u.err
 | 
			
		||||
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
		streamCh := make(chan httpstream.Stream)
 | 
			
		||||
 | 
			
		||||
		upgrader := spdy.NewResponseUpgrader()
 | 
			
		||||
		conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream) error {
 | 
			
		||||
			streamCh <- stream
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
		// from this point on, we can no longer call methods on w
 | 
			
		||||
		if conn == nil {
 | 
			
		||||
			// The upgrader is responsible for notifying the client of any errors that
 | 
			
		||||
			// occurred during upgrading. All we can do is return here at this point
 | 
			
		||||
			// if we weren't successful in upgrading.
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		defer conn.Close()
 | 
			
		||||
 | 
			
		||||
type fakeUpgradeConnection struct {
 | 
			
		||||
	closeCalled bool
 | 
			
		||||
	lock        sync.Mutex
 | 
			
		||||
 | 
			
		||||
	stdin                   *fakeUpgradeStream
 | 
			
		||||
	stdout                  *fakeUpgradeStream
 | 
			
		||||
	stdoutData              string
 | 
			
		||||
	stderr                  *fakeUpgradeStream
 | 
			
		||||
	stderrData              string
 | 
			
		||||
	errorStream             *fakeUpgradeStream
 | 
			
		||||
	errorData               string
 | 
			
		||||
	unexpectedStreamCreated bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newFakeUpgradeConnection() *fakeUpgradeConnection {
 | 
			
		||||
	return &fakeUpgradeConnection{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *fakeUpgradeConnection) CreateStream(headers http.Header) (httpstream.Stream, error) {
 | 
			
		||||
	c.lock.Lock()
 | 
			
		||||
	defer c.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	stream := &fakeUpgradeStream{}
 | 
			
		||||
	switch headers.Get(api.StreamType) {
 | 
			
		||||
	case api.StreamTypeStdin:
 | 
			
		||||
		c.stdin = stream
 | 
			
		||||
	case api.StreamTypeStdout:
 | 
			
		||||
		c.stdout = stream
 | 
			
		||||
		stream.data = c.stdoutData
 | 
			
		||||
	case api.StreamTypeStderr:
 | 
			
		||||
		c.stderr = stream
 | 
			
		||||
		stream.data = c.stderrData
 | 
			
		||||
		var errorStream, stdinStream, stdoutStream, stderrStream httpstream.Stream
 | 
			
		||||
		receivedStreams := 0
 | 
			
		||||
	WaitForStreams:
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case stream := <-streamCh:
 | 
			
		||||
				streamType := stream.Headers().Get(api.StreamType)
 | 
			
		||||
				switch streamType {
 | 
			
		||||
				case api.StreamTypeError:
 | 
			
		||||
		c.errorStream = stream
 | 
			
		||||
		stream.data = c.errorData
 | 
			
		||||
					errorStream = stream
 | 
			
		||||
					receivedStreams++
 | 
			
		||||
				case api.StreamTypeStdin:
 | 
			
		||||
					stdinStream = stream
 | 
			
		||||
					stdinStream.Close()
 | 
			
		||||
					receivedStreams++
 | 
			
		||||
				case api.StreamTypeStdout:
 | 
			
		||||
					stdoutStream = stream
 | 
			
		||||
					receivedStreams++
 | 
			
		||||
				case api.StreamTypeStderr:
 | 
			
		||||
					stderrStream = stream
 | 
			
		||||
					receivedStreams++
 | 
			
		||||
				default:
 | 
			
		||||
		c.unexpectedStreamCreated = true
 | 
			
		||||
					t.Errorf("%d: unexpected stream type: %q", i, streamType)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
	return stream, nil
 | 
			
		||||
				defer stream.Reset()
 | 
			
		||||
 | 
			
		||||
				if receivedStreams == expectedStreams {
 | 
			
		||||
					break WaitForStreams
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
func (c *fakeUpgradeConnection) Close() error {
 | 
			
		||||
	c.lock.Lock()
 | 
			
		||||
	defer c.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	c.closeCalled = true
 | 
			
		||||
	return nil
 | 
			
		||||
		if len(errorData) > 0 {
 | 
			
		||||
			fmt.Fprint(errorStream, errorData)
 | 
			
		||||
			errorStream.Close()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
func (c *fakeUpgradeConnection) CloseChan() <-chan bool {
 | 
			
		||||
	return make(chan bool)
 | 
			
		||||
		if len(stdoutData) > 0 {
 | 
			
		||||
			fmt.Fprint(stdoutStream, stdoutData)
 | 
			
		||||
			stdoutStream.Close()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
func (c *fakeUpgradeConnection) SetIdleTimeout(timeout time.Duration) {
 | 
			
		||||
		if len(stderrData) > 0 {
 | 
			
		||||
			fmt.Fprint(stderrStream, stderrData)
 | 
			
		||||
			stderrStream.Close()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
type fakeUpgradeStream struct {
 | 
			
		||||
	readCalled  bool
 | 
			
		||||
	writeCalled bool
 | 
			
		||||
	dataWritten []byte
 | 
			
		||||
	closeCalled bool
 | 
			
		||||
	resetCalled bool
 | 
			
		||||
	data        string
 | 
			
		||||
	lock        sync.Mutex
 | 
			
		||||
		if len(stdinData) > 0 {
 | 
			
		||||
			data, err := ioutil.ReadAll(stdinStream)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("%d: error reading stdin stream: %v", i, err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
func (s *fakeUpgradeStream) Read(p []byte) (int, error) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	s.readCalled = true
 | 
			
		||||
	b := []byte(s.data)
 | 
			
		||||
	n := copy(p, b)
 | 
			
		||||
	return n, io.EOF
 | 
			
		||||
			if e, a := stdinData, string(data); e != a {
 | 
			
		||||
				t.Errorf("%d: stdin: expected %q, got %q", i, e, a)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
func (s *fakeUpgradeStream) Write(p []byte) (int, error) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	s.writeCalled = true
 | 
			
		||||
	s.dataWritten = make([]byte, len(p))
 | 
			
		||||
	copy(s.dataWritten, p)
 | 
			
		||||
	return len(p), io.EOF
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
func (s *fakeUpgradeStream) Close() error {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	s.closeCalled = true
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *fakeUpgradeStream) Reset() error {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	s.resetCalled = true
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *fakeUpgradeStream) Headers() http.Header {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
	return http.Header{}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRequestExecuteRemoteCommand(t *testing.T) {
 | 
			
		||||
	testCases := []struct {
 | 
			
		||||
		Upgrader    *fakeUpgrader
 | 
			
		||||
		Stdin  string
 | 
			
		||||
		Stdout string
 | 
			
		||||
		Stderr string
 | 
			
		||||
		Error  string
 | 
			
		||||
		Tty    bool
 | 
			
		||||
		ShouldError bool
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			Upgrader:    &fakeUpgrader{err: errors.New("bail")},
 | 
			
		||||
			ShouldError: true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			Upgrader:    &fakeUpgrader{conn: newFakeUpgradeConnection()},
 | 
			
		||||
			Stdin:       "a",
 | 
			
		||||
			Stdout:      "b",
 | 
			
		||||
			Stderr:      "c",
 | 
			
		||||
			Error: "bail",
 | 
			
		||||
			ShouldError: true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			Upgrader: &fakeUpgrader{conn: newFakeUpgradeConnection()},
 | 
			
		||||
			Stdin:  "a",
 | 
			
		||||
			Stdout: "b",
 | 
			
		||||
			Stderr: "c",
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			Upgrader: &fakeUpgrader{conn: newFakeUpgradeConnection()},
 | 
			
		||||
			Stdin:  "a",
 | 
			
		||||
			Stdout: "b",
 | 
			
		||||
			Stderr:   "c",
 | 
			
		||||
			Tty:    true,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i, testCase := range testCases {
 | 
			
		||||
		if testCase.Error != "" {
 | 
			
		||||
			testCase.Upgrader.conn.errorData = testCase.Error
 | 
			
		||||
		localOut := &bytes.Buffer{}
 | 
			
		||||
		localErr := &bytes.Buffer{}
 | 
			
		||||
 | 
			
		||||
		server := httptest.NewServer(fakeExecServer(t, i, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty))
 | 
			
		||||
 | 
			
		||||
		url, _ := url.ParseRequestURI(server.URL)
 | 
			
		||||
		c := client.NewRESTClient(url, "x", nil, -1, -1)
 | 
			
		||||
		req := c.Post().Resource("testing")
 | 
			
		||||
 | 
			
		||||
		conf := &client.Config{
 | 
			
		||||
			Host: server.URL,
 | 
			
		||||
		}
 | 
			
		||||
		if testCase.Stdout != "" {
 | 
			
		||||
			testCase.Upgrader.conn.stdoutData = testCase.Stdout
 | 
			
		||||
		}
 | 
			
		||||
		if testCase.Stderr != "" {
 | 
			
		||||
			testCase.Upgrader.conn.stderrData = testCase.Stderr
 | 
			
		||||
		}
 | 
			
		||||
		var localOut, localErr *bytes.Buffer
 | 
			
		||||
		if testCase.Stdout != "" {
 | 
			
		||||
			localOut = &bytes.Buffer{}
 | 
			
		||||
		}
 | 
			
		||||
		if testCase.Stderr != "" {
 | 
			
		||||
			localErr = &bytes.Buffer{}
 | 
			
		||||
		}
 | 
			
		||||
		e := New(&client.Request{}, &client.Config{}, []string{"ls", "/"}, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
 | 
			
		||||
		e.upgrader = testCase.Upgrader
 | 
			
		||||
		e := New(req, conf, []string{"ls", "/"}, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
 | 
			
		||||
		//e.upgrader = testCase.Upgrader
 | 
			
		||||
		err := e.Execute()
 | 
			
		||||
		hasErr := err != nil
 | 
			
		||||
		if hasErr != testCase.ShouldError {
 | 
			
		||||
			t.Fatalf("%d: expected %t, got %t: %v", i, testCase.ShouldError, hasErr, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		conn := testCase.Upgrader.conn
 | 
			
		||||
		if testCase.Error != "" {
 | 
			
		||||
			if conn.errorStream == nil {
 | 
			
		||||
				t.Fatalf("%d: expected error stream creation", i)
 | 
			
		||||
			}
 | 
			
		||||
			if !conn.errorStream.readCalled {
 | 
			
		||||
				t.Fatalf("%d: expected error stream read", i)
 | 
			
		||||
			}
 | 
			
		||||
		if len(testCase.Error) > 0 {
 | 
			
		||||
			if !hasErr {
 | 
			
		||||
				t.Errorf("%d: expected an error", i)
 | 
			
		||||
			} else {
 | 
			
		||||
				if e, a := testCase.Error, err.Error(); !strings.Contains(a, e) {
 | 
			
		||||
				t.Fatalf("%d: expected error stream read '%v', got '%v'", i, e, a)
 | 
			
		||||
			}
 | 
			
		||||
			if !conn.errorStream.resetCalled {
 | 
			
		||||
				t.Fatalf("%d: expected error reset", i)
 | 
			
		||||
					t.Errorf("%d: expected error stream read '%v', got '%v'", i, e, a)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		if testCase.ShouldError {
 | 
			
		||||
			server.Close()
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if testCase.Stdin != "" {
 | 
			
		||||
			if conn.stdin == nil {
 | 
			
		||||
				t.Fatalf("%d: expected stdin stream creation", i)
 | 
			
		||||
			}
 | 
			
		||||
			if !conn.stdin.writeCalled {
 | 
			
		||||
				t.Fatalf("%d: expected stdin stream write", i)
 | 
			
		||||
			}
 | 
			
		||||
			if e, a := testCase.Stdin, string(conn.stdin.dataWritten); e != a {
 | 
			
		||||
				t.Fatalf("%d: expected stdin write %v, got %v", i, e, a)
 | 
			
		||||
			}
 | 
			
		||||
			if !conn.stdin.resetCalled {
 | 
			
		||||
				t.Fatalf("%d: expected stdin reset", i)
 | 
			
		||||
			}
 | 
			
		||||
		if hasErr {
 | 
			
		||||
			t.Errorf("%d: unexpected error: %v", i, err)
 | 
			
		||||
			server.Close()
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if testCase.Stdout != "" {
 | 
			
		||||
			if conn.stdout == nil {
 | 
			
		||||
				t.Fatalf("%d: expected stdout stream creation", i)
 | 
			
		||||
			}
 | 
			
		||||
			if !conn.stdout.readCalled {
 | 
			
		||||
				t.Fatalf("%d: expected stdout stream read", i)
 | 
			
		||||
			}
 | 
			
		||||
		if len(testCase.Stdout) > 0 {
 | 
			
		||||
			if e, a := testCase.Stdout, localOut; e != a.String() {
 | 
			
		||||
				t.Fatalf("%d: expected stdout data '%s', got '%s'", i, e, a)
 | 
			
		||||
			}
 | 
			
		||||
			if !conn.stdout.resetCalled {
 | 
			
		||||
				t.Fatalf("%d: expected stdout reset", i)
 | 
			
		||||
				t.Errorf("%d: expected stdout data '%s', got '%s'", i, e, a)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if testCase.Stderr != "" {
 | 
			
		||||
			if testCase.Tty {
 | 
			
		||||
				if conn.stderr != nil {
 | 
			
		||||
					t.Fatalf("%d: unexpected stderr stream creation", i)
 | 
			
		||||
				}
 | 
			
		||||
				if localErr.String() != "" {
 | 
			
		||||
					t.Fatalf("%d: unexpected stderr data '%s'", i, localErr)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				if conn.stderr == nil {
 | 
			
		||||
					t.Fatalf("%d: expected stderr stream creation", i)
 | 
			
		||||
				}
 | 
			
		||||
				if !conn.stderr.readCalled {
 | 
			
		||||
					t.Fatalf("%d: expected stderr stream read", i)
 | 
			
		||||
				}
 | 
			
		||||
			if e, a := testCase.Stderr, localErr; e != a.String() {
 | 
			
		||||
					t.Fatalf("%d: expected stderr data '%s', got '%s'", i, e, a)
 | 
			
		||||
				}
 | 
			
		||||
				if !conn.stderr.resetCalled {
 | 
			
		||||
					t.Fatalf("%d: expected stderr reset", i)
 | 
			
		||||
				}
 | 
			
		||||
				t.Errorf("%d: expected stderr data '%s', got '%s'", i, e, a)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if !conn.closeCalled {
 | 
			
		||||
			t.Fatalf("%d: expected upgraded connection to get closed", i)
 | 
			
		||||
		}
 | 
			
		||||
		server.Close()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user