mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Frame decoder was checking cap(), not len()
Resulted in bytes being missing from the streaming decoder. Update both parts.
This commit is contained in:
		@@ -17,8 +17,10 @@ limitations under the License.
 | 
			
		||||
package api_test
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"encoding/hex"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strings"
 | 
			
		||||
@@ -36,8 +38,11 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/diff"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/sets"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/watch"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/watch/versioned"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var fuzzIters = flag.Int("fuzz-iters", 20, "How many fuzzing iterations to do.")
 | 
			
		||||
@@ -269,6 +274,86 @@ func TestUnversionedTypes(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestObjectWatchFraming(t *testing.T) {
 | 
			
		||||
	f := apitesting.FuzzerFor(nil, api.SchemeGroupVersion, rand.NewSource(benchmarkSeed))
 | 
			
		||||
	secret := &api.Secret{}
 | 
			
		||||
	f.Fuzz(secret)
 | 
			
		||||
	secret.Data["binary"] = []byte{0x00, 0x10, 0x30, 0x55, 0xff, 0x00}
 | 
			
		||||
	secret.Data["utf8"] = []byte("a string with \u0345 characters")
 | 
			
		||||
	secret.Data["long"] = bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x00}, 1000)
 | 
			
		||||
	converted, _ := api.Scheme.ConvertToVersion(secret, "v1")
 | 
			
		||||
	v1secret := converted.(*v1.Secret)
 | 
			
		||||
	for _, streamingMediaType := range api.Codecs.SupportedStreamingMediaTypes() {
 | 
			
		||||
		s, framer, mediaType, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil)
 | 
			
		||||
		// TODO: remove this when the runtime.SerializerInfo PR lands
 | 
			
		||||
		if mediaType == "application/vnd.kubernetes.protobuf;stream=watch" {
 | 
			
		||||
			mediaType = "application/vnd.kubernetes.protobuf"
 | 
			
		||||
		}
 | 
			
		||||
		embedded, ok := api.Codecs.SerializerForMediaType(mediaType, nil)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			t.Logf("no embedded serializer for %s", mediaType)
 | 
			
		||||
			embedded = s
 | 
			
		||||
		}
 | 
			
		||||
		innerDecode := api.Codecs.DecoderToVersion(embedded, api.SchemeGroupVersion)
 | 
			
		||||
		//innerEncode := api.Codecs.EncoderForVersion(embedded, api.SchemeGroupVersion)
 | 
			
		||||
 | 
			
		||||
		// write a single object through the framer and back out
 | 
			
		||||
		obj := &bytes.Buffer{}
 | 
			
		||||
		if err := s.EncodeToStream(v1secret, obj); err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		out := &bytes.Buffer{}
 | 
			
		||||
		w := framer.NewFrameWriter(out)
 | 
			
		||||
		if n, err := w.Write(obj.Bytes()); err != nil || n != len(obj.Bytes()) {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		sr := streaming.NewDecoder(framer.NewFrameReader(ioutil.NopCloser(out)), s)
 | 
			
		||||
		resultSecret := &v1.Secret{}
 | 
			
		||||
		res, _, err := sr.Decode(nil, resultSecret)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("%v:\n%s", err, hex.Dump(obj.Bytes()))
 | 
			
		||||
		}
 | 
			
		||||
		resultSecret.Kind = "Secret"
 | 
			
		||||
		resultSecret.APIVersion = "v1"
 | 
			
		||||
		if !api.Semantic.DeepEqual(v1secret, res) {
 | 
			
		||||
			t.Fatalf("objects did not match: %s", diff.ObjectGoPrintDiff(v1secret, res))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// write a watch event through and back out
 | 
			
		||||
		obj = &bytes.Buffer{}
 | 
			
		||||
		if err := embedded.EncodeToStream(v1secret, obj); err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		event := &versioned.Event{Type: string(watch.Added)}
 | 
			
		||||
		event.Object.Raw = obj.Bytes()
 | 
			
		||||
		obj = &bytes.Buffer{}
 | 
			
		||||
		if err := s.EncodeToStream(event, obj); err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		out = &bytes.Buffer{}
 | 
			
		||||
		w = framer.NewFrameWriter(out)
 | 
			
		||||
		if n, err := w.Write(obj.Bytes()); err != nil || n != len(obj.Bytes()) {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		sr = streaming.NewDecoder(framer.NewFrameReader(ioutil.NopCloser(out)), s)
 | 
			
		||||
		outEvent := &versioned.Event{}
 | 
			
		||||
		res, _, err = sr.Decode(nil, outEvent)
 | 
			
		||||
		if err != nil || outEvent.Type != string(watch.Added) {
 | 
			
		||||
			t.Fatalf("%v: %#v", err, outEvent)
 | 
			
		||||
		}
 | 
			
		||||
		if outEvent.Object.Object == nil && outEvent.Object.Raw != nil {
 | 
			
		||||
			outEvent.Object.Object, err = runtime.Decode(innerDecode, outEvent.Object.Raw)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("%v:\n%s", err, hex.Dump(outEvent.Object.Raw))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if !api.Semantic.DeepEqual(secret, outEvent.Object.Object) {
 | 
			
		||||
			t.Fatalf("%s: did not match after frame decoding: %s", streamingMediaType, diff.ObjectGoPrintDiff(secret, outEvent.Object.Object))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const benchmarkSeed = 100
 | 
			
		||||
 | 
			
		||||
func benchmarkItems() []v1.Pod {
 | 
			
		||||
 
 | 
			
		||||
@@ -83,9 +83,9 @@ func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Ob
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// double the buffer size up to maxBytes
 | 
			
		||||
			if cap(d.buf) < d.maxBytes {
 | 
			
		||||
			if len(d.buf) < d.maxBytes {
 | 
			
		||||
				base += n
 | 
			
		||||
				d.buf = append(d.buf, make([]byte, cap(d.buf))...)
 | 
			
		||||
				d.buf = append(d.buf, make([]byte, len(d.buf))...)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// must read the rest of the frame (until we stop getting ErrShortBuffer)
 | 
			
		||||
 
 | 
			
		||||
@@ -131,13 +131,13 @@ func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser {
 | 
			
		||||
func (r *jsonFrameReader) Read(data []byte) (int, error) {
 | 
			
		||||
	// Return whatever remaining data exists from an in progress frame
 | 
			
		||||
	if n := len(r.remaining); n > 0 {
 | 
			
		||||
		if n <= cap(data) {
 | 
			
		||||
		if n <= len(data) {
 | 
			
		||||
			data = append(data[0:0], r.remaining...)
 | 
			
		||||
			r.remaining = nil
 | 
			
		||||
			return n, nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		n = cap(data)
 | 
			
		||||
		n = len(data)
 | 
			
		||||
		data = append(data[0:0], r.remaining[:n]...)
 | 
			
		||||
		r.remaining = r.remaining[n:]
 | 
			
		||||
		return n, io.ErrShortBuffer
 | 
			
		||||
 
 | 
			
		||||
@@ -141,7 +141,7 @@ func TestJSONFrameReader(t *testing.T) {
 | 
			
		||||
func TestJSONFrameReaderShortBuffer(t *testing.T) {
 | 
			
		||||
	b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]")
 | 
			
		||||
	r := NewJSONFramedReader(ioutil.NopCloser(b))
 | 
			
		||||
	buf := make([]byte, 0, 3)
 | 
			
		||||
	buf := make([]byte, 3)
 | 
			
		||||
 | 
			
		||||
	if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` {
 | 
			
		||||
		t.Fatalf("unexpected: %v %d %q", err, n, buf)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user