mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Streaming JSON encoder for List
This commit is contained in:
		
				
					committed by
					
						
						Marek Siarkowicz
					
				
			
			
				
	
			
			
			
						parent
						
							a18b4a8d97
						
					
				
				
					commit
					e7c743b2eb
				
			@@ -667,6 +667,10 @@ const (
 | 
				
			|||||||
	// Enables support for the StorageVersionMigrator controller.
 | 
						// Enables support for the StorageVersionMigrator controller.
 | 
				
			||||||
	StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
 | 
						StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// owner: @serathius
 | 
				
			||||||
 | 
						// Allow API server to encode collections item by item, instead of all at once.
 | 
				
			||||||
 | 
						StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// owner: @robscott
 | 
						// owner: @robscott
 | 
				
			||||||
	// kep: https://kep.k8s.io/2433
 | 
						// kep: https://kep.k8s.io/2433
 | 
				
			||||||
	//
 | 
						//
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -741,6 +741,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
 | 
				
			|||||||
		{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
 | 
							{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
	},
 | 
						},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						StreamingCollectionEncodingToJSON: {
 | 
				
			||||||
 | 
							{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
 | 
				
			||||||
 | 
						},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	SupplementalGroupsPolicy: {
 | 
						SupplementalGroupsPolicy: {
 | 
				
			||||||
		{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
 | 
							{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
	},
 | 
						},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -73,8 +73,15 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
 | 
				
			|||||||
		ParameterCodec:               legacyscheme.ParameterCodec,
 | 
							ParameterCodec:               legacyscheme.ParameterCodec,
 | 
				
			||||||
		NegotiatedSerializer:         legacyscheme.Codecs,
 | 
							NegotiatedSerializer:         legacyscheme.Codecs,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						opts := []serializer.CodecFactoryOptionsMutator{}
 | 
				
			||||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
 | 
				
			||||||
		apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
 | 
							opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
 | 
				
			||||||
 | 
							opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(opts) != 0 {
 | 
				
			||||||
 | 
							apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
 | 
						eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -851,6 +851,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
 | 
				
			|||||||
		clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
 | 
							clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// CRDs explicitly do not support protobuf, but some objects returned by the API server do
 | 
							// CRDs explicitly do not support protobuf, but some objects returned by the API server do
 | 
				
			||||||
 | 
							streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
 | 
				
			||||||
		negotiatedSerializer := unstructuredNegotiatedSerializer{
 | 
							negotiatedSerializer := unstructuredNegotiatedSerializer{
 | 
				
			||||||
			typer:                 typer,
 | 
								typer:                 typer,
 | 
				
			||||||
			creator:               creator,
 | 
								creator:               creator,
 | 
				
			||||||
@@ -864,10 +865,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
 | 
				
			|||||||
					MediaTypeType:    "application",
 | 
										MediaTypeType:    "application",
 | 
				
			||||||
					MediaTypeSubType: "json",
 | 
										MediaTypeSubType: "json",
 | 
				
			||||||
					EncodesAsText:    true,
 | 
										EncodesAsText:    true,
 | 
				
			||||||
					Serializer:       json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}),
 | 
										Serializer:       json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
 | 
				
			||||||
					PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
 | 
										PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
 | 
				
			||||||
					StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
 | 
										StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
 | 
				
			||||||
						Strict: true,
 | 
											Strict:                       true,
 | 
				
			||||||
 | 
											StreamingCollectionsEncoding: streamingCollections,
 | 
				
			||||||
					}),
 | 
										}),
 | 
				
			||||||
					StreamSerializer: &runtime.StreamSerializerInfo{
 | 
										StreamSerializer: &runtime.StreamSerializerInfo{
 | 
				
			||||||
						EncodesAsText: true,
 | 
											EncodesAsText: true,
 | 
				
			||||||
@@ -970,6 +972,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
 | 
				
			|||||||
		if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
 | 
							if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
 | 
				
			||||||
			opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
 | 
								opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
 | 
				
			||||||
 | 
								opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
 | 
							scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
 | 
				
			||||||
		scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
 | 
							scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
 | 
				
			||||||
		scaleScope.Namer = handlers.ContextBasedNaming{
 | 
							scaleScope.Namer = handlers.ContextBasedNaming{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if items.IsNil() {
 | 
				
			||||||
 | 
							return nil, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	list := make([]runtime.Object, items.Len())
 | 
						list := make([]runtime.Object, items.Len())
 | 
				
			||||||
	if len(list) == 0 {
 | 
						if len(list) == 0 {
 | 
				
			||||||
		return list, nil
 | 
							return list, nil
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,7 +28,7 @@ import (
 | 
				
			|||||||
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo {
 | 
					func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo {
 | 
				
			||||||
	jsonSerializer := json.NewSerializerWithOptions(
 | 
						jsonSerializer := json.NewSerializerWithOptions(
 | 
				
			||||||
		mf, scheme, scheme,
 | 
							mf, scheme, scheme,
 | 
				
			||||||
		json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
 | 
							json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	jsonSerializerType := runtime.SerializerInfo{
 | 
						jsonSerializerType := runtime.SerializerInfo{
 | 
				
			||||||
		MediaType:        runtime.ContentTypeJSON,
 | 
							MediaType:        runtime.ContentTypeJSON,
 | 
				
			||||||
@@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
 | 
				
			|||||||
		Serializer:       jsonSerializer,
 | 
							Serializer:       jsonSerializer,
 | 
				
			||||||
		StrictSerializer: json.NewSerializerWithOptions(
 | 
							StrictSerializer: json.NewSerializerWithOptions(
 | 
				
			||||||
			mf, scheme, scheme,
 | 
								mf, scheme, scheme,
 | 
				
			||||||
			json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
 | 
								json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
 | 
				
			||||||
		),
 | 
							),
 | 
				
			||||||
		StreamSerializer: &runtime.StreamSerializerInfo{
 | 
							StreamSerializer: &runtime.StreamSerializerInfo{
 | 
				
			||||||
			EncodesAsText: true,
 | 
								EncodesAsText: true,
 | 
				
			||||||
@@ -113,6 +113,8 @@ type CodecFactoryOptions struct {
 | 
				
			|||||||
	// Pretty includes a pretty serializer along with the non-pretty one
 | 
						// Pretty includes a pretty serializer along with the non-pretty one
 | 
				
			||||||
	Pretty bool
 | 
						Pretty bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						StreamingCollectionsEncodingToJSON bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
 | 
						serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -147,6 +149,12 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
 | 
				
			||||||
 | 
						return func(options *CodecFactoryOptions) {
 | 
				
			||||||
 | 
							options.StreamingCollectionsEncodingToJSON = true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
 | 
					// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
 | 
				
			||||||
// and conversion wrappers to define preferred internal and external versions. In the future,
 | 
					// and conversion wrappers to define preferred internal and external versions. In the future,
 | 
				
			||||||
// as the internal version is used less, callers may instead use a defaulting serializer and
 | 
					// as the internal version is used less, callers may instead use a defaulting serializer and
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -0,0 +1,230 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2025 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package json
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
						"maps"
 | 
				
			||||||
 | 
						"slices"
 | 
				
			||||||
 | 
						"sort"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/api/meta"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/conversion"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) {
 | 
				
			||||||
 | 
						list, ok := obj.(*unstructured.UnstructuredList)
 | 
				
			||||||
 | 
						if ok {
 | 
				
			||||||
 | 
							return true, streamingEncodeUnstructuredList(w, list)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if _, ok := obj.(json.Marshaler); ok {
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						typeMeta, listMeta, items, err := getListMeta(obj)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							return true, streamingEncodeList(w, typeMeta, listMeta, items)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return false, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getListMeta implements list extraction logic for json stream serialization.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Reason for a custom logic instead of reusing accessors from meta package:
 | 
				
			||||||
 | 
					// * Validate json tags to prevent incompatibility with json standard package.
 | 
				
			||||||
 | 
					// * ListMetaAccessor doesn't distinguish empty from nil value.
 | 
				
			||||||
 | 
					// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}"
 | 
				
			||||||
 | 
					func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) {
 | 
				
			||||||
 | 
						listValue, err := conversion.EnforcePtr(list)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						listType := listValue.Type()
 | 
				
			||||||
 | 
						if listType.NumField() != 3 {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// TypeMeta
 | 
				
			||||||
 | 
						typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if listType.Field(0).Tag.Get("json") != ",inline" {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// ListMeta
 | 
				
			||||||
 | 
						listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if listType.Field(1).Tag.Get("json") != "metadata,omitempty" {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Items
 | 
				
			||||||
 | 
						items, err := meta.ExtractList(list)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if listType.Field(2).Tag.Get("json") != "items" {
 | 
				
			||||||
 | 
							return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return typeMeta, listMeta, items, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error {
 | 
				
			||||||
 | 
						// Start
 | 
				
			||||||
 | 
						if _, err := w.Write([]byte(`{`)); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// TypeMeta
 | 
				
			||||||
 | 
						if typeMeta.Kind != "" {
 | 
				
			||||||
 | 
							if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if typeMeta.APIVersion != "" {
 | 
				
			||||||
 | 
							if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ListMeta
 | 
				
			||||||
 | 
						if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Items
 | 
				
			||||||
 | 
						if err := encodeItemsObjectSlice(w, items); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// End
 | 
				
			||||||
 | 
						_, err := w.Write([]byte("}\n"))
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) {
 | 
				
			||||||
 | 
						if items == nil {
 | 
				
			||||||
 | 
							err := encodeKeyValuePair(w, "items", nil, nil)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = w.Write([]byte(`"items":[`))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						suffix := []byte(",")
 | 
				
			||||||
 | 
						for i, item := range items {
 | 
				
			||||||
 | 
							if i == len(items)-1 {
 | 
				
			||||||
 | 
								suffix = nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err := encodeValue(w, item, suffix)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = w.Write([]byte("]"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error {
 | 
				
			||||||
 | 
						_, err := w.Write([]byte(`{`))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						keys := slices.Collect(maps.Keys(list.Object))
 | 
				
			||||||
 | 
						if _, exists := list.Object["items"]; !exists {
 | 
				
			||||||
 | 
							keys = append(keys, "items")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						sort.Strings(keys)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						suffix := []byte(",")
 | 
				
			||||||
 | 
						for i, key := range keys {
 | 
				
			||||||
 | 
							if i == len(keys)-1 {
 | 
				
			||||||
 | 
								suffix = nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if key == "items" {
 | 
				
			||||||
 | 
								err = encodeItemsUnstructuredSlice(w, list.Items, suffix)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								err = encodeKeyValuePair(w, key, list.Object[key], suffix)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = w.Write([]byte("}\n"))
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) {
 | 
				
			||||||
 | 
						_, err = w.Write([]byte(`"items":[`))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						comma := []byte(",")
 | 
				
			||||||
 | 
						for i, item := range items {
 | 
				
			||||||
 | 
							if i == len(items)-1 {
 | 
				
			||||||
 | 
								comma = nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err := encodeValue(w, item.Object, comma)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = w.Write([]byte("]"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(suffix) > 0 {
 | 
				
			||||||
 | 
							_, err = w.Write(suffix)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) {
 | 
				
			||||||
 | 
						err = encodeValue(w, key, []byte(":"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = encodeValue(w, value, suffix)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func encodeValue(w io.Writer, value any, suffix []byte) error {
 | 
				
			||||||
 | 
						data, err := json.Marshal(value)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = w.Write(data)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(suffix) > 0 {
 | 
				
			||||||
 | 
							_, err = w.Write(suffix)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -18,9 +18,11 @@ package json
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/google/go-cmp/cmp"
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
 | 
						fuzz "github.com/google/gofuzz"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
						"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
				
			||||||
@@ -30,21 +32,24 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestCollectionsEncoding(t *testing.T) {
 | 
					func TestCollectionsEncoding(t *testing.T) {
 | 
				
			||||||
	t.Run("Normal", func(t *testing.T) {
 | 
						t.Run("Normal", func(t *testing.T) {
 | 
				
			||||||
		testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}))
 | 
							testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}), false)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						t.Run("Streaming", func(t *testing.T) {
 | 
				
			||||||
 | 
							testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	// Leave place for testing streaming collection serializer proposed as part of KEP-5116
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder.
 | 
					// testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder.
 | 
				
			||||||
func testCollectionsEncoding(t *testing.T, s *Serializer) {
 | 
					func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) {
 | 
				
			||||||
	var buf bytes.Buffer
 | 
						var buf writeCountingBuffer
 | 
				
			||||||
	var remainingItems int64 = 1
 | 
						var remainingItems int64 = 1
 | 
				
			||||||
	// As defined in KEP-5116 we it should include the following scenarios:
 | 
						// As defined in KEP-5116 we it should include the following scenarios:
 | 
				
			||||||
	// Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests
 | 
						// Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests
 | 
				
			||||||
	for _, tc := range []struct {
 | 
						for _, tc := range []struct {
 | 
				
			||||||
		name   string
 | 
							name         string
 | 
				
			||||||
		in     runtime.Object
 | 
							in           runtime.Object
 | 
				
			||||||
		expect string
 | 
							cannotStream bool
 | 
				
			||||||
 | 
							expect       string
 | 
				
			||||||
	}{
 | 
						}{
 | 
				
			||||||
		// Preserving the distinction between integers and floating-point numbers
 | 
							// Preserving the distinction between integers and floating-point numbers
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
@@ -307,9 +312,10 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		// Handling structs implementing MarshallJSON method, especially built-in collection types.
 | 
							// Handling structs implementing MarshallJSON method, especially built-in collection types.
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:   "List with MarshallJSON",
 | 
								name:         "List with MarshallJSON cannot be streamed",
 | 
				
			||||||
			in:     &ListWithMarshalJSONList{},
 | 
								in:           &ListWithMarshalJSONList{},
 | 
				
			||||||
			expect: "\"marshallJSON\"\n",
 | 
								expect:       "\"marshallJSON\"\n",
 | 
				
			||||||
 | 
								cannotStream: true,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name: "Struct with MarshallJSON",
 | 
								name: "Struct with MarshallJSON",
 | 
				
			||||||
@@ -435,6 +441,32 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
 | 
				
			|||||||
			expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]}
 | 
								expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]}
 | 
				
			||||||
`,
 | 
					`,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "List with extra field cannot be streamed",
 | 
				
			||||||
 | 
								in: &ListWithAdditionalFields{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{
 | 
				
			||||||
 | 
										Kind:       "List",
 | 
				
			||||||
 | 
										APIVersion: "v1",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									ListMeta: metav1.ListMeta{
 | 
				
			||||||
 | 
										ResourceVersion: "2345",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Items: []testapigroupv1.Carp{},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								cannotStream: true,
 | 
				
			||||||
 | 
								expect:       "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[],\"AdditionalField\":0}\n",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "Not a collection cannot be streamed",
 | 
				
			||||||
 | 
								in: &testapigroupv1.Carp{
 | 
				
			||||||
 | 
									TypeMeta: metav1.TypeMeta{
 | 
				
			||||||
 | 
										Kind:       "List",
 | 
				
			||||||
 | 
										APIVersion: "v1",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								cannotStream: true,
 | 
				
			||||||
 | 
								expect:       "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}\n",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:   "UnstructuredList empty",
 | 
								name:   "UnstructuredList empty",
 | 
				
			||||||
			in:     &unstructured.UnstructuredList{},
 | 
								in:     &unstructured.UnstructuredList{},
 | 
				
			||||||
@@ -543,10 +575,17 @@ func testCollectionsEncoding(t *testing.T, s *Serializer) {
 | 
				
			|||||||
			if err := s.Encode(tc.in, &buf); err != nil {
 | 
								if err := s.Encode(tc.in, &buf); err != nil {
 | 
				
			||||||
				t.Fatalf("unexpected error: %v", err)
 | 
									t.Fatalf("unexpected error: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			t.Logf("normal: %s", buf.String())
 | 
								t.Logf("encoded: %s", buf.String())
 | 
				
			||||||
			if diff := cmp.Diff(buf.String(), tc.expect); diff != "" {
 | 
								if diff := cmp.Diff(buf.String(), tc.expect); diff != "" {
 | 
				
			||||||
				t.Errorf("not matching:\n%s", diff)
 | 
									t.Errorf("not matching:\n%s", diff)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								expectStreaming := !tc.cannotStream && streamingEnabled
 | 
				
			||||||
 | 
								if expectStreaming && buf.writeCount <= 1 {
 | 
				
			||||||
 | 
									t.Errorf("expected streaming but Write was called only: %d", buf.writeCount)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !expectStreaming && buf.writeCount > 1 {
 | 
				
			||||||
 | 
									t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -653,3 +692,103 @@ type StructWithRawBytes struct {
 | 
				
			|||||||
func (s *StructWithRawBytes) DeepCopyObject() runtime.Object {
 | 
					func (s *StructWithRawBytes) DeepCopyObject() runtime.Object {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type ListWithAdditionalFields struct {
 | 
				
			||||||
 | 
						metav1.TypeMeta `json:",inline"`
 | 
				
			||||||
 | 
						metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
 | 
				
			||||||
 | 
						Items           []testapigroupv1.Carp `json:"items" protobuf:"bytes,2,rep,name=items"`
 | 
				
			||||||
 | 
						AdditionalField int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *ListWithAdditionalFields) DeepCopyObject() runtime.Object {
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type writeCountingBuffer struct {
 | 
				
			||||||
 | 
						writeCount int
 | 
				
			||||||
 | 
						bytes.Buffer
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (b *writeCountingBuffer) Write(data []byte) (int, error) {
 | 
				
			||||||
 | 
						b.writeCount++
 | 
				
			||||||
 | 
						return b.Buffer.Write(data)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (b *writeCountingBuffer) Reset() {
 | 
				
			||||||
 | 
						b.writeCount = 0
 | 
				
			||||||
 | 
						b.Buffer.Reset()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestFuzzCollectionsEncoding(t *testing.T) {
 | 
				
			||||||
 | 
						disableFuzzFieldsV1 := func(field *metav1.FieldsV1, c fuzz.Continue) {}
 | 
				
			||||||
 | 
						fuzzUnstructuredList := func(list *unstructured.UnstructuredList, c fuzz.Continue) {
 | 
				
			||||||
 | 
							list.Object = map[string]interface{}{
 | 
				
			||||||
 | 
								"kind":         "List",
 | 
				
			||||||
 | 
								"apiVersion":   "v1",
 | 
				
			||||||
 | 
								c.RandString(): c.RandString(),
 | 
				
			||||||
 | 
								c.RandString(): c.RandUint64(),
 | 
				
			||||||
 | 
								c.RandString(): c.RandBool(),
 | 
				
			||||||
 | 
								"metadata": map[string]interface{}{
 | 
				
			||||||
 | 
									"resourceVersion":    fmt.Sprintf("%d", c.RandUint64()),
 | 
				
			||||||
 | 
									"continue":           c.RandString(),
 | 
				
			||||||
 | 
									"remainingItemCount": fmt.Sprintf("%d", c.RandUint64()),
 | 
				
			||||||
 | 
									c.RandString():       c.RandString(),
 | 
				
			||||||
 | 
								}}
 | 
				
			||||||
 | 
							c.Fuzz(&list.Items)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						fuzzMap := func(kvs map[string]interface{}, c fuzz.Continue) {
 | 
				
			||||||
 | 
							kvs[c.RandString()] = c.RandBool()
 | 
				
			||||||
 | 
							kvs[c.RandString()] = c.RandUint64()
 | 
				
			||||||
 | 
							kvs[c.RandString()] = c.RandString()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						f := fuzz.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap)
 | 
				
			||||||
 | 
						streamingBuffer := &bytes.Buffer{}
 | 
				
			||||||
 | 
						normalSerializer := NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: false})
 | 
				
			||||||
 | 
						normalBuffer := &bytes.Buffer{}
 | 
				
			||||||
 | 
						t.Run("CarpList", func(t *testing.T) {
 | 
				
			||||||
 | 
							for i := 0; i < 1000; i++ {
 | 
				
			||||||
 | 
								list := &testapigroupv1.CarpList{}
 | 
				
			||||||
 | 
								f.Fuzz(list)
 | 
				
			||||||
 | 
								streamingBuffer.Reset()
 | 
				
			||||||
 | 
								normalBuffer.Reset()
 | 
				
			||||||
 | 
								ok, err := streamEncodeCollections(list, streamingBuffer)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("unexpected error: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									t.Fatalf("expected streaming encoder to encode %T", list)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if err := normalSerializer.Encode(list, normalBuffer); err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" {
 | 
				
			||||||
 | 
									t.Logf("normal: %s", normalBuffer.String())
 | 
				
			||||||
 | 
									t.Logf("streaming: %s", streamingBuffer.String())
 | 
				
			||||||
 | 
									t.Errorf("not matching:\n%s", diff)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						t.Run("UnstructuredList", func(t *testing.T) {
 | 
				
			||||||
 | 
							for i := 0; i < 1000; i++ {
 | 
				
			||||||
 | 
								list := &unstructured.UnstructuredList{}
 | 
				
			||||||
 | 
								f.Fuzz(list)
 | 
				
			||||||
 | 
								streamingBuffer.Reset()
 | 
				
			||||||
 | 
								normalBuffer.Reset()
 | 
				
			||||||
 | 
								ok, err := streamEncodeCollections(list, streamingBuffer)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("unexpected error: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									t.Fatalf("expected streaming encoder to encode %T", list)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if err := normalSerializer.Encode(list, normalBuffer); err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" {
 | 
				
			||||||
 | 
									t.Logf("normal: %s", normalBuffer.String())
 | 
				
			||||||
 | 
									t.Logf("streaming: %s", streamingBuffer.String())
 | 
				
			||||||
 | 
									t.Errorf("not matching:\n%s", diff)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -36,7 +36,7 @@ import (
 | 
				
			|||||||
// is not nil, the object has the group, version, and kind fields set.
 | 
					// is not nil, the object has the group, version, and kind fields set.
 | 
				
			||||||
// Deprecated: use NewSerializerWithOptions instead.
 | 
					// Deprecated: use NewSerializerWithOptions instead.
 | 
				
			||||||
func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper, pretty bool) *Serializer {
 | 
					func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper, pretty bool) *Serializer {
 | 
				
			||||||
	return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false})
 | 
						return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false, false})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer
 | 
					// NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer
 | 
				
			||||||
@@ -44,7 +44,7 @@ func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtim
 | 
				
			|||||||
// matches JSON, and will error if constructs are used that do not serialize to JSON.
 | 
					// matches JSON, and will error if constructs are used that do not serialize to JSON.
 | 
				
			||||||
// Deprecated: use NewSerializerWithOptions instead.
 | 
					// Deprecated: use NewSerializerWithOptions instead.
 | 
				
			||||||
func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
 | 
					func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
 | 
				
			||||||
	return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false})
 | 
						return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false, false})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML
 | 
					// NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML
 | 
				
			||||||
@@ -93,6 +93,9 @@ type SerializerOptions struct {
 | 
				
			|||||||
	// Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML.
 | 
						// Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML.
 | 
				
			||||||
	// Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths.
 | 
						// Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths.
 | 
				
			||||||
	Strict bool
 | 
						Strict bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed.
 | 
				
			||||||
 | 
						StreamingCollectionsEncoding bool
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Serializer handles encoding versioned objects into the proper JSON form
 | 
					// Serializer handles encoding versioned objects into the proper JSON form
 | 
				
			||||||
@@ -242,6 +245,15 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
 | 
				
			|||||||
		_, err = w.Write(data)
 | 
							_, err = w.Write(data)
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if s.options.StreamingCollectionsEncoding {
 | 
				
			||||||
 | 
							ok, err := streamEncodeCollections(obj, w)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if ok {
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	encoder := json.NewEncoder(w)
 | 
						encoder := json.NewEncoder(w)
 | 
				
			||||||
	return encoder.Encode(obj)
 | 
						return encoder.Encode(obj)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,8 +39,11 @@ import (
 | 
				
			|||||||
	"github.com/google/go-cmp/cmp"
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	kerrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						kerrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
 | 
						jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json"
 | 
				
			||||||
	rand2 "k8s.io/apimachinery/pkg/util/rand"
 | 
						rand2 "k8s.io/apimachinery/pkg/util/rand"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
						"k8s.io/apimachinery/pkg/util/uuid"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/features"
 | 
						"k8s.io/apiserver/pkg/features"
 | 
				
			||||||
@@ -804,3 +807,80 @@ func gzipContent(data []byte, level int) []byte {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return buf.Bytes()
 | 
						return buf.Bytes()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestStreamingGzipIntegration(t *testing.T) {
 | 
				
			||||||
 | 
						largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1)
 | 
				
			||||||
 | 
						tcs := []struct {
 | 
				
			||||||
 | 
							name            string
 | 
				
			||||||
 | 
							serializer      runtime.Encoder
 | 
				
			||||||
 | 
							object          runtime.Object
 | 
				
			||||||
 | 
							expectGzip      bool
 | 
				
			||||||
 | 
							expectStreaming bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "JSON, small object, default -> no gzip",
 | 
				
			||||||
 | 
								serializer:      jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}),
 | 
				
			||||||
 | 
								object:          &testapigroupv1.CarpList{},
 | 
				
			||||||
 | 
								expectGzip:      false,
 | 
				
			||||||
 | 
								expectStreaming: false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "JSON, small object, streaming -> no gzip",
 | 
				
			||||||
 | 
								serializer:      jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}),
 | 
				
			||||||
 | 
								object:          &testapigroupv1.CarpList{},
 | 
				
			||||||
 | 
								expectGzip:      false,
 | 
				
			||||||
 | 
								expectStreaming: true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "JSON, large object, default -> gzip",
 | 
				
			||||||
 | 
								serializer:      jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}),
 | 
				
			||||||
 | 
								object:          &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
 | 
				
			||||||
 | 
								expectGzip:      true,
 | 
				
			||||||
 | 
								expectStreaming: false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "JSON, large object, streaming -> gzip",
 | 
				
			||||||
 | 
								serializer:      jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}),
 | 
				
			||||||
 | 
								object:          &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
 | 
				
			||||||
 | 
								expectGzip:      true,
 | 
				
			||||||
 | 
								expectStreaming: true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, tc := range tcs {
 | 
				
			||||||
 | 
							t.Run(tc.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								mockResponseWriter := httptest.NewRecorder()
 | 
				
			||||||
 | 
								drw := &deferredResponseWriter{
 | 
				
			||||||
 | 
									mediaType:       "text/plain",
 | 
				
			||||||
 | 
									statusCode:      200,
 | 
				
			||||||
 | 
									contentEncoding: "gzip",
 | 
				
			||||||
 | 
									hw:              mockResponseWriter,
 | 
				
			||||||
 | 
									ctx:             context.Background(),
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								counter := &writeCounter{Writer: drw}
 | 
				
			||||||
 | 
								err := tc.serializer.Encode(tc.object, counter)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatal(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								encoding := mockResponseWriter.Header().Get("Content-Encoding")
 | 
				
			||||||
 | 
								if (encoding == "gzip") != tc.expectGzip {
 | 
				
			||||||
 | 
									t.Errorf("Expect gzip: %v, got: %q", tc.expectGzip, encoding)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if counter.writeCount < 1 {
 | 
				
			||||||
 | 
									t.Fatalf("Expect at least 1 write")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if (counter.writeCount > 1) != tc.expectStreaming {
 | 
				
			||||||
 | 
									t.Errorf("Expect streaming: %v, got write count: %d", tc.expectStreaming, counter.writeCount)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type writeCounter struct {
 | 
				
			||||||
 | 
						writeCount int
 | 
				
			||||||
 | 
						io.Writer
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (b *writeCounter) Write(data []byte) (int, error) {
 | 
				
			||||||
 | 
						b.writeCount++
 | 
				
			||||||
 | 
						return b.Writer.Write(data)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -217,6 +217,10 @@ const (
 | 
				
			|||||||
	// document.
 | 
						// document.
 | 
				
			||||||
	StorageVersionHash featuregate.Feature = "StorageVersionHash"
 | 
						StorageVersionHash featuregate.Feature = "StorageVersionHash"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// owner: @serathius
 | 
				
			||||||
 | 
						// Allow API server to encode collections item by item, instead of all at once.
 | 
				
			||||||
 | 
						StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// owner: @aramase, @enj, @nabokihms
 | 
						// owner: @aramase, @enj, @nabokihms
 | 
				
			||||||
	// kep: https://kep.k8s.io/3331
 | 
						// kep: https://kep.k8s.io/3331
 | 
				
			||||||
	//
 | 
						//
 | 
				
			||||||
@@ -387,6 +391,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
 | 
				
			|||||||
		{Version: version.MustParse("1.15"), Default: true, PreRelease: featuregate.Beta},
 | 
							{Version: version.MustParse("1.15"), Default: true, PreRelease: featuregate.Beta},
 | 
				
			||||||
	},
 | 
						},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						StreamingCollectionEncodingToJSON: {
 | 
				
			||||||
 | 
							{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
 | 
				
			||||||
 | 
						},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	StrictCostEnforcementForVAP: {
 | 
						StrictCostEnforcementForVAP: {
 | 
				
			||||||
		{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta},
 | 
							{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta},
 | 
				
			||||||
		{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
							{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -991,8 +991,15 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
 | 
				
			|||||||
// NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values
 | 
					// NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values
 | 
				
			||||||
// exposed for easier composition from other packages
 | 
					// exposed for easier composition from other packages
 | 
				
			||||||
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
 | 
					func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
 | 
				
			||||||
 | 
						opts := []serializer.CodecFactoryOptionsMutator{}
 | 
				
			||||||
	if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
 | 
				
			||||||
		codecs = serializer.NewCodecFactory(scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
 | 
							opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
 | 
				
			||||||
 | 
							opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(opts) != 0 {
 | 
				
			||||||
 | 
							codecs = serializer.NewCodecFactory(scheme, opts...)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return APIGroupInfo{
 | 
						return APIGroupInfo{
 | 
				
			||||||
		PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),
 | 
							PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1320,6 +1320,12 @@
 | 
				
			|||||||
    lockToDefault: false
 | 
					    lockToDefault: false
 | 
				
			||||||
    preRelease: Alpha
 | 
					    preRelease: Alpha
 | 
				
			||||||
    version: "1.30"
 | 
					    version: "1.30"
 | 
				
			||||||
 | 
					- name: StreamingCollectionEncodingToJSON
 | 
				
			||||||
 | 
					  versionedSpecs:
 | 
				
			||||||
 | 
					  - default: true
 | 
				
			||||||
 | 
					    lockToDefault: false
 | 
				
			||||||
 | 
					    preRelease: Beta
 | 
				
			||||||
 | 
					    version: "1.33"
 | 
				
			||||||
- name: StrictCostEnforcementForVAP
 | 
					- name: StrictCostEnforcementForVAP
 | 
				
			||||||
  versionedSpecs:
 | 
					  versionedSpecs:
 | 
				
			||||||
  - default: false
 | 
					  - default: false
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user