mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Fix sending oversided data frames to spdy stream
This commit is contained in:
		@@ -108,7 +108,7 @@ func (ex *fakeExecutor) run(name string, uid types.UID, container string, cmd []
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func fakeServer(t *testing.T, testName string, exec bool, stdinData, stdoutData, stderrData, errorData string, tty bool, messageCount int, serverProtocols []string) http.HandlerFunc {
 | 
					func fakeServer(t *testing.T, requestReceived chan struct{}, testName string, exec bool, stdinData, stdoutData, stderrData, errorData string, tty bool, messageCount int, serverProtocols []string) http.HandlerFunc {
 | 
				
			||||||
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
						return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
				
			||||||
		executor := &fakeExecutor{
 | 
							executor := &fakeExecutor{
 | 
				
			||||||
			t:            t,
 | 
								t:            t,
 | 
				
			||||||
@@ -134,6 +134,7 @@ func fakeServer(t *testing.T, testName string, exec bool, stdinData, stdoutData,
 | 
				
			|||||||
		if e, a := strings.Repeat(stdinData, messageCount), executor.stdinReceived.String(); e != a {
 | 
							if e, a := strings.Repeat(stdinData, messageCount), executor.stdinReceived.String(); e != a {
 | 
				
			||||||
			t.Errorf("%s: stdin: expected %q, got %q", testName, e, a)
 | 
								t.Errorf("%s: stdin: expected %q, got %q", testName, e, a)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							close(requestReceived)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -165,6 +166,15 @@ func TestStream(t *testing.T) {
 | 
				
			|||||||
			ClientProtocols: []string{remotecommandconsts.StreamProtocolV2Name},
 | 
								ClientProtocols: []string{remotecommandconsts.StreamProtocolV2Name},
 | 
				
			||||||
			ServerProtocols: []string{remotecommandconsts.StreamProtocolV2Name},
 | 
								ServerProtocols: []string{remotecommandconsts.StreamProtocolV2Name},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								TestName:        "oversized stdin",
 | 
				
			||||||
 | 
								Stdin:           strings.Repeat("a", 20*1024*1024),
 | 
				
			||||||
 | 
								Stdout:          "b",
 | 
				
			||||||
 | 
								Stderr:          "",
 | 
				
			||||||
 | 
								MessageCount:    1,
 | 
				
			||||||
 | 
								ClientProtocols: []string{remotecommandconsts.StreamProtocolV2Name},
 | 
				
			||||||
 | 
								ServerProtocols: []string{remotecommandconsts.StreamProtocolV2Name},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			TestName:        "in/out/tty",
 | 
								TestName:        "in/out/tty",
 | 
				
			||||||
			Stdin:           "a",
 | 
								Stdin:           "a",
 | 
				
			||||||
@@ -218,7 +228,8 @@ func TestStream(t *testing.T) {
 | 
				
			|||||||
			localOut := &bytes.Buffer{}
 | 
								localOut := &bytes.Buffer{}
 | 
				
			||||||
			localErr := &bytes.Buffer{}
 | 
								localErr := &bytes.Buffer{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
 | 
								requestReceived := make(chan struct{})
 | 
				
			||||||
 | 
								server := httptest.NewServer(fakeServer(t, requestReceived, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			url, _ := url.ParseRequestURI(server.URL)
 | 
								url, _ := url.ParseRequestURI(server.URL)
 | 
				
			||||||
			config := restclient.ContentConfig{
 | 
								config := restclient.ContentConfig{
 | 
				
			||||||
@@ -305,6 +316,12 @@ func TestStream(t *testing.T) {
 | 
				
			|||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case <-requestReceived:
 | 
				
			||||||
 | 
								case <-time.After(time.Minute):
 | 
				
			||||||
 | 
									t.Errorf("%s: expected fakeServerInstance to receive request", name)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			server.Close()
 | 
								server.Close()
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,6 +25,7 @@ go_library(
 | 
				
			|||||||
    srcs = [
 | 
					    srcs = [
 | 
				
			||||||
        "doc.go",
 | 
					        "doc.go",
 | 
				
			||||||
        "errorstream.go",
 | 
					        "errorstream.go",
 | 
				
			||||||
 | 
					        "reader.go",
 | 
				
			||||||
        "remotecommand.go",
 | 
					        "remotecommand.go",
 | 
				
			||||||
        "resize.go",
 | 
					        "resize.go",
 | 
				
			||||||
        "v1.go",
 | 
					        "v1.go",
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										41
									
								
								staging/src/k8s.io/client-go/tools/remotecommand/reader.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										41
									
								
								staging/src/k8s.io/client-go/tools/remotecommand/reader.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,41 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2018 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package remotecommand
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// readerWrapper delegates to an io.Reader so that only the io.Reader interface is implemented,
 | 
				
			||||||
 | 
					// to keep io.Copy from doing things we don't want when copying from the reader to the data stream.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// If the Stdin io.Reader provided to remotecommand implements a WriteTo function (like bytes.Buffer does[1]),
 | 
				
			||||||
 | 
					// io.Copy calls that method[2] to attempt to write the entire buffer to the stream in one call.
 | 
				
			||||||
 | 
					// That results in an oversized call to spdystream.Stream#Write [3],
 | 
				
			||||||
 | 
					// which results in a single oversized data frame[4] that is too large.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// [1] https://golang.org/pkg/bytes/#Buffer.WriteTo
 | 
				
			||||||
 | 
					// [2] https://golang.org/pkg/io/#Copy
 | 
				
			||||||
 | 
					// [3] https://github.com/kubernetes/kubernetes/blob/90295640ef87db9daa0144c5617afe889e7992b2/vendor/github.com/docker/spdystream/stream.go#L66-L73
 | 
				
			||||||
 | 
					// [4] https://github.com/kubernetes/kubernetes/blob/90295640ef87db9daa0144c5617afe889e7992b2/vendor/github.com/docker/spdystream/spdy/write.go#L302-L304
 | 
				
			||||||
 | 
					type readerWrapper struct {
 | 
				
			||||||
 | 
						reader io.Reader
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r readerWrapper) Read(p []byte) (int, error) {
 | 
				
			||||||
 | 
						return r.reader.Read(p)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -127,7 +127,7 @@ func (p *streamProtocolV1) stream(conn streamCreator) error {
 | 
				
			|||||||
		// because stdin is not closed until the process exits. If we try to call
 | 
							// because stdin is not closed until the process exits. If we try to call
 | 
				
			||||||
		// stdin.Close(), it returns no error but doesn't unblock the copy. It will
 | 
							// stdin.Close(), it returns no error but doesn't unblock the copy. It will
 | 
				
			||||||
		// exit when the process exits, instead.
 | 
							// exit when the process exits, instead.
 | 
				
			||||||
		go cp(v1.StreamTypeStdin, p.remoteStdin, p.Stdin)
 | 
							go cp(v1.StreamTypeStdin, p.remoteStdin, readerWrapper{p.Stdin})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	waitCount := 0
 | 
						waitCount := 0
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -101,7 +101,7 @@ func (p *streamProtocolV2) copyStdin() {
 | 
				
			|||||||
			// the executed command will remain running.
 | 
								// the executed command will remain running.
 | 
				
			||||||
			defer once.Do(func() { p.remoteStdin.Close() })
 | 
								defer once.Do(func() { p.remoteStdin.Close() })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil {
 | 
								if _, err := io.Copy(p.remoteStdin, readerWrapper{p.Stdin}); err != nil {
 | 
				
			||||||
				runtime.HandleError(err)
 | 
									runtime.HandleError(err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user