mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #23288 from smarterclayton/refactor_codec
Auto commit by PR queue bot
This commit is contained in:
		@@ -59,8 +59,6 @@ func fuzzInternalObject(t *testing.T, forVersion unversioned.GroupVersion, item
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) {
 | 
			
		||||
	//t.Logf("codec: %#v", codec)
 | 
			
		||||
 | 
			
		||||
	printer := spew.ConfigState{DisableMethods: true}
 | 
			
		||||
 | 
			
		||||
	name := reflect.TypeOf(item).Elem().Name()
 | 
			
		||||
@@ -118,9 +116,6 @@ func roundTripSame(t *testing.T, group testapi.TestGroup, item runtime.Object, e
 | 
			
		||||
 | 
			
		||||
// For debugging problems
 | 
			
		||||
func TestSpecificKind(t *testing.T) {
 | 
			
		||||
	// api.Scheme.Log(t)
 | 
			
		||||
	// defer api.Scheme.Log(nil)
 | 
			
		||||
 | 
			
		||||
	kind := "DaemonSet"
 | 
			
		||||
	for i := 0; i < *fuzzIters; i++ {
 | 
			
		||||
		doRoundTripTest(testapi.Groups["extensions"], kind, t)
 | 
			
		||||
@@ -131,9 +126,6 @@ func TestSpecificKind(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestList(t *testing.T) {
 | 
			
		||||
	// api.Scheme.Log(t)
 | 
			
		||||
	// defer api.Scheme.Log(nil)
 | 
			
		||||
 | 
			
		||||
	kind := "List"
 | 
			
		||||
	item, err := api.Scheme.New(api.SchemeGroupVersion.WithKind(kind))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -149,9 +141,6 @@ var nonInternalRoundTrippableTypes = sets.NewString("List", "ListOptions", "Expo
 | 
			
		||||
var nonRoundTrippableTypesByVersion = map[string][]string{}
 | 
			
		||||
 | 
			
		||||
func TestRoundTripTypes(t *testing.T) {
 | 
			
		||||
	// api.Scheme.Log(t)
 | 
			
		||||
	// defer api.Scheme.Log(nil)
 | 
			
		||||
 | 
			
		||||
	for groupKey, group := range testapi.Groups {
 | 
			
		||||
		for kind := range group.InternalTypes() {
 | 
			
		||||
			t.Logf("working on %v in %v", kind, groupKey)
 | 
			
		||||
@@ -286,6 +275,26 @@ func BenchmarkEncodeCodec(b *testing.B) {
 | 
			
		||||
	b.StopTimer()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BenchmarkEncodeCodecFromInternal measures the cost of performing a codec encode,
 | 
			
		||||
// including conversions.
 | 
			
		||||
func BenchmarkEncodeCodecFromInternal(b *testing.B) {
 | 
			
		||||
	items := benchmarkItems()
 | 
			
		||||
	width := len(items)
 | 
			
		||||
	encodable := make([]api.Pod, width)
 | 
			
		||||
	for i := range items {
 | 
			
		||||
		if err := api.Scheme.Convert(&items[i], &encodable[i]); err != nil {
 | 
			
		||||
			b.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	b.ResetTimer()
 | 
			
		||||
	for i := 0; i < b.N; i++ {
 | 
			
		||||
		if _, err := runtime.Encode(testapi.Default.Codec(), &encodable[i%width]); err != nil {
 | 
			
		||||
			b.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	b.StopTimer()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BenchmarkEncodeJSONMarshal provides a baseline for regular JSON encode performance
 | 
			
		||||
func BenchmarkEncodeJSONMarshal(b *testing.B) {
 | 
			
		||||
	items := benchmarkItems()
 | 
			
		||||
 
 | 
			
		||||
@@ -235,6 +235,16 @@ func (g TestGroup) RESTMapper() meta.RESTMapper {
 | 
			
		||||
	return registered.RESTMapper()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ExternalGroupVersions returns all external group versions allowed for the server.
 | 
			
		||||
func ExternalGroupVersions() []unversioned.GroupVersion {
 | 
			
		||||
	versions := []unversioned.GroupVersion{}
 | 
			
		||||
	for _, g := range Groups {
 | 
			
		||||
		gv := g.GroupVersion()
 | 
			
		||||
		versions = append(versions, *gv)
 | 
			
		||||
	}
 | 
			
		||||
	return versions
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get codec based on runtime.Object
 | 
			
		||||
func GetCodecForObject(obj runtime.Object) (runtime.Codec, error) {
 | 
			
		||||
	kind, err := api.Scheme.ObjectKind(obj)
 | 
			
		||||
 
 | 
			
		||||
@@ -40,9 +40,15 @@ const OldestVersion = "v1"
 | 
			
		||||
// with a set of versions to choose.
 | 
			
		||||
var Versions = []string{"v1"}
 | 
			
		||||
 | 
			
		||||
var Codec = versioning.NewCodecForScheme(
 | 
			
		||||
	api.Scheme,
 | 
			
		||||
	json.NewYAMLSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme)),
 | 
			
		||||
	[]unversioned.GroupVersion{{Version: Version}},
 | 
			
		||||
	[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
 | 
			
		||||
)
 | 
			
		||||
var Codec runtime.Codec
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	yamlSerializer := json.NewYAMLSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme))
 | 
			
		||||
	Codec = versioning.NewCodecForScheme(
 | 
			
		||||
		api.Scheme,
 | 
			
		||||
		yamlSerializer,
 | 
			
		||||
		yamlSerializer,
 | 
			
		||||
		[]unversioned.GroupVersion{{Version: Version}},
 | 
			
		||||
		[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -17,36 +17,44 @@ limitations under the License.
 | 
			
		||||
package serializer
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime/serializer/json"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// serializerExtensions are for serializers that are conditionally compiled in
 | 
			
		||||
var serializerExtensions = []func(*runtime.Scheme) (serializerType, bool){}
 | 
			
		||||
 | 
			
		||||
type serializerType struct {
 | 
			
		||||
	AcceptContentTypes []string
 | 
			
		||||
	ContentType        string
 | 
			
		||||
	FileExtensions     []string
 | 
			
		||||
	Serializer         runtime.Serializer
 | 
			
		||||
	PrettySerializer   runtime.Serializer
 | 
			
		||||
 | 
			
		||||
	Serializer       runtime.Serializer
 | 
			
		||||
	PrettySerializer runtime.Serializer
 | 
			
		||||
	// RawSerializer serializes an object without adding a type wrapper. Some serializers, like JSON
 | 
			
		||||
	// automatically include identifying type information with the JSON. Others, like Protobuf, need
 | 
			
		||||
	// a wrapper object that includes type information. This serializer should be set if the serializer
 | 
			
		||||
	// can serialize / deserialize objects without type info. Note that this serializer will always
 | 
			
		||||
	// be expected to pass into or a gvk to Decode, since no type information will be available on
 | 
			
		||||
	// the object itself.
 | 
			
		||||
	RawSerializer runtime.Serializer
 | 
			
		||||
 | 
			
		||||
	// Specialize gives the type the opportunity to return a different serializer implementation if
 | 
			
		||||
	// the content type contains alternate operations. Here it is used to implement "pretty" as an
 | 
			
		||||
	// option to application/json, but could also be used to allow serializers to perform type
 | 
			
		||||
	// defaulting or alter output.
 | 
			
		||||
	Specialize func(map[string]string) (runtime.Serializer, bool)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
 | 
			
		||||
// and conversion wrappers to define preferred internal and external versions. In the future,
 | 
			
		||||
// as the internal version is used less, callers may instead use a defaulting serializer and
 | 
			
		||||
// only convert objects which are shared internally (Status, common API machinery).
 | 
			
		||||
// TODO: allow other codecs to be compiled in?
 | 
			
		||||
// TODO: accept a scheme interface
 | 
			
		||||
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
 | 
			
		||||
	return newCodecFactory(scheme, json.DefaultMetaFactory)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newCodecFactory is a helper for testing that allows a different metafactory to be specified.
 | 
			
		||||
func newCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory {
 | 
			
		||||
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
 | 
			
		||||
	jsonSerializer := json.NewSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme), false)
 | 
			
		||||
	jsonPrettySerializer := json.NewSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme), true)
 | 
			
		||||
	yamlSerializer := json.NewYAMLSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme))
 | 
			
		||||
	serializers := []serializerType{
 | 
			
		||||
		{
 | 
			
		||||
			AcceptContentTypes: []string{"application/json"},
 | 
			
		||||
@@ -55,34 +63,21 @@ func newCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory {
 | 
			
		||||
			Serializer:         jsonSerializer,
 | 
			
		||||
			PrettySerializer:   jsonPrettySerializer,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			AcceptContentTypes: []string{"application/yaml"},
 | 
			
		||||
			ContentType:        "application/yaml",
 | 
			
		||||
			FileExtensions:     []string{"yaml"},
 | 
			
		||||
			Serializer:         yamlSerializer,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	decoders := make([]runtime.Decoder, 0, len(serializers))
 | 
			
		||||
	accepts := []string{}
 | 
			
		||||
	alreadyAccepted := make(map[string]struct{})
 | 
			
		||||
	for _, d := range serializers {
 | 
			
		||||
		decoders = append(decoders, d.Serializer)
 | 
			
		||||
		for _, mediaType := range d.AcceptContentTypes {
 | 
			
		||||
			if _, ok := alreadyAccepted[mediaType]; ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			alreadyAccepted[mediaType] = struct{}{}
 | 
			
		||||
			accepts = append(accepts, mediaType)
 | 
			
		||||
	yamlSerializer := json.NewYAMLSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme))
 | 
			
		||||
	serializers = append(serializers, serializerType{
 | 
			
		||||
		AcceptContentTypes: []string{"application/yaml"},
 | 
			
		||||
		ContentType:        "application/yaml",
 | 
			
		||||
		FileExtensions:     []string{"yaml"},
 | 
			
		||||
		Serializer:         yamlSerializer,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	for _, fn := range serializerExtensions {
 | 
			
		||||
		if serializer, ok := fn(scheme); ok {
 | 
			
		||||
			serializers = append(serializers, serializer)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return CodecFactory{
 | 
			
		||||
		scheme:      scheme,
 | 
			
		||||
		serializers: serializers,
 | 
			
		||||
		universal:   recognizer.NewDecoder(decoders...),
 | 
			
		||||
		accepts:     accepts,
 | 
			
		||||
 | 
			
		||||
		legacySerializer: jsonSerializer,
 | 
			
		||||
	}
 | 
			
		||||
	return serializers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CodecFactory provides methods for retrieving codecs and serializers for specific
 | 
			
		||||
@@ -96,6 +91,78 @@ type CodecFactory struct {
 | 
			
		||||
	legacySerializer runtime.Serializer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
 | 
			
		||||
// and conversion wrappers to define preferred internal and external versions. In the future,
 | 
			
		||||
// as the internal version is used less, callers may instead use a defaulting serializer and
 | 
			
		||||
// only convert objects which are shared internally (Status, common API machinery).
 | 
			
		||||
// TODO: allow other codecs to be compiled in?
 | 
			
		||||
// TODO: accept a scheme interface
 | 
			
		||||
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
 | 
			
		||||
	serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
 | 
			
		||||
	return newCodecFactory(scheme, serializers)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewStreamingCodecFactory returns serializers that support the streaming.Serializer interface.
 | 
			
		||||
// TODO: determine whether this returns a streaming.Serializer AND runtime.Serializer, or whether
 | 
			
		||||
// streaming should be added to the CodecFactory interface.
 | 
			
		||||
func NewStreamingCodecFactory(scheme *runtime.Scheme) CodecFactory {
 | 
			
		||||
	return newStreamingCodecFactory(scheme, json.DefaultMetaFactory)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newStreamingCodecFactory handles providing streaming codecs
 | 
			
		||||
func newStreamingCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory {
 | 
			
		||||
	serializers := newSerializersForScheme(scheme, mf)
 | 
			
		||||
	streamers := []serializerType{}
 | 
			
		||||
	for i := range serializers {
 | 
			
		||||
		if serializers[i].RawSerializer != nil {
 | 
			
		||||
			serializers[i].Serializer = serializers[i].RawSerializer
 | 
			
		||||
		}
 | 
			
		||||
		if s, ok := serializers[i].Serializer.(streaming.Framer); ok {
 | 
			
		||||
			// TODO: more elegant option?
 | 
			
		||||
			// TODO: add tests and assertions for which serializers should
 | 
			
		||||
			//   have framers. We need to answer whether all Serializers
 | 
			
		||||
			//   are streaming serializers or not.
 | 
			
		||||
			if s.NewFrameWriter(ioutil.Discard) == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			streamers = append(streamers, serializers[i])
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return newCodecFactory(scheme, streamers)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newCodecFactory is a helper for testing that allows a different metafactory to be specified.
 | 
			
		||||
func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory {
 | 
			
		||||
	decoders := make([]runtime.Decoder, 0, len(serializers))
 | 
			
		||||
	accepts := []string{}
 | 
			
		||||
	alreadyAccepted := make(map[string]struct{})
 | 
			
		||||
	var legacySerializer runtime.Serializer
 | 
			
		||||
	for _, d := range serializers {
 | 
			
		||||
		decoders = append(decoders, d.Serializer)
 | 
			
		||||
		for _, mediaType := range d.AcceptContentTypes {
 | 
			
		||||
			if _, ok := alreadyAccepted[mediaType]; ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			alreadyAccepted[mediaType] = struct{}{}
 | 
			
		||||
			accepts = append(accepts, mediaType)
 | 
			
		||||
			if mediaType == "application/json" {
 | 
			
		||||
				legacySerializer = d.Serializer
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if legacySerializer == nil {
 | 
			
		||||
		legacySerializer = serializers[0].Serializer
 | 
			
		||||
	}
 | 
			
		||||
	return CodecFactory{
 | 
			
		||||
		scheme:      scheme,
 | 
			
		||||
		serializers: serializers,
 | 
			
		||||
		universal:   recognizer.NewDecoder(decoders...),
 | 
			
		||||
		accepts:     accepts,
 | 
			
		||||
 | 
			
		||||
		legacySerializer: legacySerializer,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ runtime.NegotiatedSerializer = &CodecFactory{}
 | 
			
		||||
 | 
			
		||||
// SupportedMediaTypes returns the RFC2046 media types that this factory has serializers for.
 | 
			
		||||
@@ -109,7 +176,7 @@ func (f CodecFactory) SupportedMediaTypes() []string {
 | 
			
		||||
// This method is deprecated - clients and servers should negotiate a serializer by mime-type and
 | 
			
		||||
// invoke CodecForVersions. Callers that need only to read data should use UniversalDecoder().
 | 
			
		||||
func (f CodecFactory) LegacyCodec(version ...unversioned.GroupVersion) runtime.Codec {
 | 
			
		||||
	return f.CodecForVersions(runtime.NewCodec(f.legacySerializer, f.universal), version, nil)
 | 
			
		||||
	return versioning.NewCodecForScheme(f.scheme, f.legacySerializer, f.universal, version, nil)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UniversalDeserializer can convert any stored data recognized by this factory into a Go object that satisfies
 | 
			
		||||
@@ -134,7 +201,7 @@ func (f CodecFactory) UniversalDecoder(versions ...unversioned.GroupVersion) run
 | 
			
		||||
// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not
 | 
			
		||||
// converted. If encode or decode are nil, no conversion is performed.
 | 
			
		||||
func (f CodecFactory) CodecForVersions(serializer runtime.Serializer, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec {
 | 
			
		||||
	return versioning.NewCodecForScheme(f.scheme, serializer, encode, decode)
 | 
			
		||||
	return versioning.NewCodecForScheme(f.scheme, serializer, serializer, encode, decode)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DecoderToVersion returns a decoder that targets the provided group version.
 | 
			
		||||
@@ -153,6 +220,10 @@ func (f CodecFactory) SerializerForMediaType(mediaType string, options map[strin
 | 
			
		||||
	for _, s := range f.serializers {
 | 
			
		||||
		for _, accepted := range s.AcceptContentTypes {
 | 
			
		||||
			if accepted == mediaType {
 | 
			
		||||
				if s.Specialize != nil && len(options) > 0 {
 | 
			
		||||
					serializer, ok := s.Specialize(options)
 | 
			
		||||
					return serializer, ok
 | 
			
		||||
				}
 | 
			
		||||
				if v, ok := options["pretty"]; ok && v == "1" && s.PrettySerializer != nil {
 | 
			
		||||
					return s.PrettySerializer, true
 | 
			
		||||
				}
 | 
			
		||||
 
 | 
			
		||||
@@ -173,7 +173,7 @@ func GetTestScheme() (*runtime.Scheme, runtime.Codec) {
 | 
			
		||||
 | 
			
		||||
	s.AddUnversionedTypes(externalGV, &unversioned.Status{})
 | 
			
		||||
 | 
			
		||||
	cf := newCodecFactory(s, testMetaFactory{})
 | 
			
		||||
	cf := newCodecFactory(s, newSerializersForScheme(s, testMetaFactory{}))
 | 
			
		||||
	codec := cf.LegacyCodec(unversioned.GroupVersion{Version: "v1"})
 | 
			
		||||
	return s, codec
 | 
			
		||||
}
 | 
			
		||||
@@ -263,7 +263,7 @@ func TestVersionedEncoding(t *testing.T) {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cf := newCodecFactory(s, testMetaFactory{})
 | 
			
		||||
	cf := newCodecFactory(s, newSerializersForScheme(s, testMetaFactory{}))
 | 
			
		||||
	encoder, _ := cf.SerializerForFileExtension("json")
 | 
			
		||||
 | 
			
		||||
	// codec that is unversioned uses the target version
 | 
			
		||||
@@ -326,7 +326,7 @@ func TestConvertTypesWhenDefaultNamesMatch(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	expect := &TestType1{A: "test"}
 | 
			
		||||
 | 
			
		||||
	codec := newCodecFactory(s, testMetaFactory{}).LegacyCodec(unversioned.GroupVersion{Version: "v1"})
 | 
			
		||||
	codec := newCodecFactory(s, newSerializersForScheme(s, testMetaFactory{})).LegacyCodec(unversioned.GroupVersion{Version: "v1"})
 | 
			
		||||
 | 
			
		||||
	obj, err := runtime.Decode(codec, data)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										106
									
								
								pkg/runtime/serializer/streaming/streaming.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										106
									
								
								pkg/runtime/serializer/streaming/streaming.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,106 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// Package streaming implements encoder and decoder for streams
 | 
			
		||||
// of runtime.Objects over io.Writer/Readers.
 | 
			
		||||
package streaming
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Framer is a factory for creating readers and writers that obey a particular framing pattern.
 | 
			
		||||
type Framer interface {
 | 
			
		||||
	NewFrameReader(r io.Reader) io.Reader
 | 
			
		||||
	NewFrameWriter(w io.Writer) io.Writer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Encoder is a runtime.Encoder on a stream.
 | 
			
		||||
type Encoder interface {
 | 
			
		||||
	// Encode will write the provided object to the stream or return an error. It obeys the same
 | 
			
		||||
	// contract as runtime.Encoder.
 | 
			
		||||
	Encode(obj runtime.Object, overrides ...unversioned.GroupVersion) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Decoder is a runtime.Decoder from a stream.
 | 
			
		||||
type Decoder interface {
 | 
			
		||||
	// Decode will return io.EOF when no more objects are available.
 | 
			
		||||
	Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Serializer is a factory for creating encoders and decoders that work over streams.
 | 
			
		||||
type Serializer interface {
 | 
			
		||||
	NewEncoder(w io.Writer) Encoder
 | 
			
		||||
	NewDecoder(r io.Reader) Decoder
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type decoder struct {
 | 
			
		||||
	reader  io.Reader
 | 
			
		||||
	decoder runtime.Decoder
 | 
			
		||||
	buf     []byte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d.
 | 
			
		||||
// The reader is expected to return ErrShortRead if the provided buffer is not large enough to read
 | 
			
		||||
// an entire object.
 | 
			
		||||
func NewDecoder(r io.Reader, d runtime.Decoder) Decoder {
 | 
			
		||||
	return &decoder{
 | 
			
		||||
		reader:  r,
 | 
			
		||||
		decoder: d,
 | 
			
		||||
		buf:     make([]byte, 1024*1024),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Decode reads the next object from the stream and decodes it.
 | 
			
		||||
func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
 | 
			
		||||
	// TODO: instead of depending on a fixed sized buffer, we should handle ErrShortRead specially and
 | 
			
		||||
	// grow the buffer capacity up to a maximum amount. Requires the framer to allow repeated reads to
 | 
			
		||||
	// the stream until the frame is finished.
 | 
			
		||||
	n, err := d.reader.Read(d.buf)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return d.decoder.Decode(d.buf[:n], defaults, into)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type encoder struct {
 | 
			
		||||
	writer  io.Writer
 | 
			
		||||
	encoder runtime.Encoder
 | 
			
		||||
	buf     *bytes.Buffer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewEncoder returns a new streaming encoder.
 | 
			
		||||
func NewEncoder(w io.Writer, e runtime.Encoder) Encoder {
 | 
			
		||||
	return &encoder{
 | 
			
		||||
		writer:  w,
 | 
			
		||||
		encoder: e,
 | 
			
		||||
		buf:     &bytes.Buffer{},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Encode writes the provided object to the nested writer.
 | 
			
		||||
func (e *encoder) Encode(obj runtime.Object, overrides ...unversioned.GroupVersion) error {
 | 
			
		||||
	if err := e.encoder.EncodeToStream(obj, e.buf, overrides...); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	_, err := e.writer.Write(e.buf.Bytes())
 | 
			
		||||
	e.buf.Reset()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
@@ -64,18 +64,20 @@ func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string)
 | 
			
		||||
func NewCodecForScheme(
 | 
			
		||||
	// TODO: I should be a scheme interface?
 | 
			
		||||
	scheme *runtime.Scheme,
 | 
			
		||||
	serializer runtime.Serializer,
 | 
			
		||||
	encoder runtime.Encoder,
 | 
			
		||||
	decoder runtime.Decoder,
 | 
			
		||||
	encodeVersion []unversioned.GroupVersion,
 | 
			
		||||
	decodeVersion []unversioned.GroupVersion,
 | 
			
		||||
) runtime.Codec {
 | 
			
		||||
	return NewCodec(serializer, scheme, scheme, scheme, runtime.ObjectTyperToTyper(scheme), encodeVersion, decodeVersion)
 | 
			
		||||
	return NewCodec(encoder, decoder, scheme, scheme, scheme, runtime.ObjectTyperToTyper(scheme), encodeVersion, decodeVersion)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewCodec takes objects in their internal versions and converts them to external versions before
 | 
			
		||||
// serializing them. It assumes the serializer provided to it only deals with external versions.
 | 
			
		||||
// This class is also a serializer, but is generally used with a specific version.
 | 
			
		||||
func NewCodec(
 | 
			
		||||
	serializer runtime.Serializer,
 | 
			
		||||
	encoder runtime.Encoder,
 | 
			
		||||
	decoder runtime.Decoder,
 | 
			
		||||
	convertor runtime.ObjectConvertor,
 | 
			
		||||
	creater runtime.ObjectCreater,
 | 
			
		||||
	copier runtime.ObjectCopier,
 | 
			
		||||
@@ -84,11 +86,12 @@ func NewCodec(
 | 
			
		||||
	decodeVersion []unversioned.GroupVersion,
 | 
			
		||||
) runtime.Codec {
 | 
			
		||||
	internal := &codec{
 | 
			
		||||
		serializer: serializer,
 | 
			
		||||
		convertor:  convertor,
 | 
			
		||||
		creater:    creater,
 | 
			
		||||
		copier:     copier,
 | 
			
		||||
		typer:      typer,
 | 
			
		||||
		encoder:   encoder,
 | 
			
		||||
		decoder:   decoder,
 | 
			
		||||
		convertor: convertor,
 | 
			
		||||
		creater:   creater,
 | 
			
		||||
		copier:    copier,
 | 
			
		||||
		typer:     typer,
 | 
			
		||||
	}
 | 
			
		||||
	if encodeVersion != nil {
 | 
			
		||||
		internal.encodeVersion = make(map[string]unversioned.GroupVersion)
 | 
			
		||||
@@ -115,11 +118,12 @@ func NewCodec(
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type codec struct {
 | 
			
		||||
	serializer runtime.Serializer
 | 
			
		||||
	convertor  runtime.ObjectConvertor
 | 
			
		||||
	creater    runtime.ObjectCreater
 | 
			
		||||
	copier     runtime.ObjectCopier
 | 
			
		||||
	typer      runtime.Typer
 | 
			
		||||
	encoder   runtime.Encoder
 | 
			
		||||
	decoder   runtime.Decoder
 | 
			
		||||
	convertor runtime.ObjectConvertor
 | 
			
		||||
	creater   runtime.ObjectCreater
 | 
			
		||||
	copier    runtime.ObjectCopier
 | 
			
		||||
	typer     runtime.Typer
 | 
			
		||||
 | 
			
		||||
	encodeVersion map[string]unversioned.GroupVersion
 | 
			
		||||
	decodeVersion map[string]unversioned.GroupVersion
 | 
			
		||||
@@ -134,7 +138,7 @@ func (c *codec) Decode(data []byte, defaultGVK *unversioned.GroupVersionKind, in
 | 
			
		||||
		into = versioned.Last()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	obj, gvk, err := c.serializer.Decode(data, defaultGVK, into)
 | 
			
		||||
	obj, gvk, err := c.decoder.Decode(data, defaultGVK, into)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, gvk, err
 | 
			
		||||
	}
 | 
			
		||||
@@ -213,7 +217,7 @@ func (c *codec) Decode(data []byte, defaultGVK *unversioned.GroupVersionKind, in
 | 
			
		||||
// encoding the object the first override that matches the object's group is used. Other overrides are ignored.
 | 
			
		||||
func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unversioned.GroupVersion) error {
 | 
			
		||||
	if _, ok := obj.(*runtime.Unknown); ok {
 | 
			
		||||
		return c.serializer.EncodeToStream(obj, w, overrides...)
 | 
			
		||||
		return c.encoder.EncodeToStream(obj, w, overrides...)
 | 
			
		||||
	}
 | 
			
		||||
	gvk, isUnversioned, err := c.typer.ObjectKind(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -224,7 +228,7 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv
 | 
			
		||||
		old := obj.GetObjectKind().GroupVersionKind()
 | 
			
		||||
		obj.GetObjectKind().SetGroupVersionKind(gvk)
 | 
			
		||||
		defer obj.GetObjectKind().SetGroupVersionKind(old)
 | 
			
		||||
		return c.serializer.EncodeToStream(obj, w, overrides...)
 | 
			
		||||
		return c.encoder.EncodeToStream(obj, w, overrides...)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	targetGV, ok := c.encodeVersion[gvk.Group]
 | 
			
		||||
@@ -270,7 +274,7 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv
 | 
			
		||||
		obj.GetObjectKind().SetGroupVersionKind(&unversioned.GroupVersionKind{Group: targetGV.Group, Version: targetGV.Version, Kind: gvk.Kind})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return c.serializer.EncodeToStream(obj, w, overrides...)
 | 
			
		||||
	return c.encoder.EncodeToStream(obj, w, overrides...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// promoteOrPrependGroupVersion finds the group version in the provided group versions that has the same group as target.
 | 
			
		||||
 
 | 
			
		||||
@@ -169,7 +169,7 @@ func TestDecode(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	for i, test := range testCases {
 | 
			
		||||
		t.Logf("%d", i)
 | 
			
		||||
		s := NewCodec(test.serializer, test.convertor, test.creater, test.copier, test.typer, test.encodes, test.decodes)
 | 
			
		||||
		s := NewCodec(test.serializer, test.serializer, test.convertor, test.creater, test.copier, test.typer, test.encodes, test.decodes)
 | 
			
		||||
		obj, gvk, err := s.Decode([]byte(`{}`), test.defaultGVK, test.into)
 | 
			
		||||
 | 
			
		||||
		if !reflect.DeepEqual(test.expectedGVK, gvk) {
 | 
			
		||||
 
 | 
			
		||||
@@ -86,7 +86,7 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false)
 | 
			
		||||
	clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, encodeVersions, decodeVersions)
 | 
			
		||||
	clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
 | 
			
		||||
 | 
			
		||||
	restClient, err := restclient.UnversionedRESTClientFor(clientConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -39,9 +39,15 @@ var Versions = []string{"v1"}
 | 
			
		||||
 | 
			
		||||
// Codec is the default codec for serializing input that should use
 | 
			
		||||
// the latest supported version. It supports JSON by default.
 | 
			
		||||
var Codec = versioning.NewCodecForScheme(
 | 
			
		||||
	api.Scheme,
 | 
			
		||||
	json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), true),
 | 
			
		||||
	[]unversioned.GroupVersion{{Version: Version}},
 | 
			
		||||
	[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
 | 
			
		||||
)
 | 
			
		||||
var Codec runtime.Codec
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	jsonSerializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), true)
 | 
			
		||||
	Codec = versioning.NewCodecForScheme(
 | 
			
		||||
		api.Scheme,
 | 
			
		||||
		jsonSerializer,
 | 
			
		||||
		jsonSerializer,
 | 
			
		||||
		[]unversioned.GroupVersion{{Version: Version}},
 | 
			
		||||
		[]unversioned.GroupVersion{{Version: runtime.APIVersionInternal}},
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user