From fc198b92c0d5cece06fd4ecc24f6142532beff37 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 16 Jun 2025 16:02:59 +0200 Subject: [PATCH] apiserver/handlers/watch: stop encoding initialEventsListBlueprint (#132326) * apiserver/handlers/get: remove constructing versionedList * endpoints/handlers/response: rm watchListTransformer * endpoints/handlers/watch: unwire watchListTransformer * storage/cacher: rm documentation about caching the serialization of bookmark events --- .../apiserver/pkg/endpoints/handlers/get.go | 11 +- .../pkg/endpoints/handlers/response.go | 118 ++--------------- .../pkg/endpoints/handlers/response_test.go | 122 ------------------ .../apiserver/pkg/endpoints/handlers/watch.go | 12 +- .../apiserver/pkg/storage/cacher/cacher.go | 3 - 5 files changed, 12 insertions(+), 254 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index 94a44c80234..423925ea982 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -266,15 +266,6 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) } - var emptyVersionedList runtime.Object - if isListWatchRequest(opts) { - emptyVersionedList, err = scope.Convertor.ConvertToVersion(r.NewList(), scope.Kind.GroupVersion()) - if err != nil { - scope.err(errors.NewInternalError(err), w, req) - return - } - } - klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout) ctx, cancel := context.WithTimeout(ctx, timeout) defer func() { cancel() }() @@ -283,7 +274,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc scope.err(err, w, req) return } - handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts), emptyVersionedList) + handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) if err != nil { scope.err(err, w, req) return diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go index fda524cee3a..edcf5b07f6d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response.go @@ -18,7 +18,6 @@ package handlers import ( "context" - "encoding/base64" "encoding/json" "fmt" "io" @@ -39,7 +38,6 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/metrics" endpointsrequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/util/apihelpers" "k8s.io/klog/v2" ) @@ -149,8 +147,6 @@ type watchEncoder struct { encoder runtime.Encoder framer io.Writer - watchListTransformerFn watchListTransformerFunction - buffer runtime.Splice eventBuffer runtime.Splice @@ -158,16 +154,15 @@ type watchEncoder struct { identifiers map[watch.EventType]runtime.Identifier } -func newWatchEncoder(ctx context.Context, gvr schema.GroupVersionResource, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer, watchListTransformerFn watchListTransformerFunction) *watchEncoder { +func newWatchEncoder(ctx context.Context, gvr schema.GroupVersionResource, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder { return &watchEncoder{ - ctx: ctx, - groupVersionResource: gvr, - embeddedEncoder: embeddedEncoder, - encoder: encoder, - framer: framer, - watchListTransformerFn: watchListTransformerFn, - buffer: runtime.NewSpliceBuffer(), - eventBuffer: runtime.NewSpliceBuffer(), + ctx: ctx, + groupVersionResource: gvr, + embeddedEncoder: embeddedEncoder, + encoder: encoder, + framer: framer, + buffer: runtime.NewSpliceBuffer(), + eventBuffer: runtime.NewSpliceBuffer(), } } @@ -179,12 +174,6 @@ func (e *watchEncoder) Encode(event watch.Event) error { encodeFunc := func(obj runtime.Object, w io.Writer) error { return e.doEncode(obj, event, w) } - if event.Type == watch.Bookmark { - // Bookmark objects are small, and we don't yet support serialization for them. - // Additionally, we need to additionally transform them to support watch-list feature - event = e.watchListTransformerFn(event) - return encodeFunc(event.Object, e.framer) - } if co, ok := event.Object.(runtime.CacheableObject); ok { return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer) } @@ -490,94 +479,3 @@ func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.Grou return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion)) } } - -// watchListTransformerFunction an optional function -// applied to watchlist bookmark events that transforms -// the embedded object before sending it to a client. -type watchListTransformerFunction func(watch.Event) watch.Event - -// watchListTransformer performs transformation of -// a special watchList bookmark event. -// -// The bookmark is annotated with InitialEventsListBlueprintAnnotationKey -// and contains an empty, versioned list that we must encode in the requested format -// (e.g., protobuf, JSON, CBOR) and then store as a base64-encoded string. -type watchListTransformer struct { - initialEventsListBlueprint runtime.Object - targetGVK *schema.GroupVersionKind - negotiatedEncoder runtime.Encoder - buffer runtime.Splice -} - -// createWatchListTransformerIfRequested returns a transformer function for watchlist bookmark event. -func newWatchListTransformer(initialEventsListBlueprint runtime.Object, targetGVK *schema.GroupVersionKind, negotiatedEncoder runtime.Encoder) *watchListTransformer { - return &watchListTransformer{ - initialEventsListBlueprint: initialEventsListBlueprint, - targetGVK: targetGVK, - negotiatedEncoder: negotiatedEncoder, - buffer: runtime.NewSpliceBuffer(), - } -} - -func (e *watchListTransformer) transform(event watch.Event) watch.Event { - if e.initialEventsListBlueprint == nil { - return event - } - hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object) - if err != nil { - return newWatchEventErrorFor(err) - } - if !hasAnnotation { - return event - } - - if err = e.encodeInitialEventsListBlueprint(event.Object); err != nil { - return newWatchEventErrorFor(err) - } - - return event -} - -func (e *watchListTransformer) encodeInitialEventsListBlueprint(object runtime.Object) error { - initialEventsListBlueprint, err := e.transformInitialEventsListBlueprint() - if err != nil { - return err - } - - defer e.buffer.Reset() - if err = e.negotiatedEncoder.Encode(initialEventsListBlueprint, e.buffer); err != nil { - return err - } - encodedInitialEventsListBlueprint := e.buffer.Bytes() - - // the storage layer creates a deep copy of the obj before modifying it. - // since the object has the annotation, we can modify it directly. - objectMeta, err := meta.Accessor(object) - if err != nil { - return err - } - annotations := objectMeta.GetAnnotations() - annotations[metav1.InitialEventsListBlueprintAnnotationKey] = base64.StdEncoding.EncodeToString(encodedInitialEventsListBlueprint) - objectMeta.SetAnnotations(annotations) - - return nil -} - -func (e *watchListTransformer) transformInitialEventsListBlueprint() (runtime.Object, error) { - if e.targetGVK != nil && e.targetGVK.Kind == "PartialObjectMetadata" { - return asPartialObjectMetadataList(e.initialEventsListBlueprint, e.targetGVK.GroupVersion()) - } - return e.initialEventsListBlueprint, nil -} - -func newWatchEventErrorFor(err error) watch.Event { - return watch.Event{ - Type: watch.Error, - Object: &metav1.Status{ - Status: metav1.StatusFailure, - Message: err.Error(), - Reason: metav1.StatusReasonInternalError, - Code: http.StatusInternalServerError, - }, - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go index a8973f81ccb..03410ccd106 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/response_test.go @@ -17,9 +17,7 @@ limitations under the License. package handlers import ( - "bytes" "context" - "encoding/base64" "fmt" "io" "net/http" @@ -27,18 +25,13 @@ import ( "testing" "time" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json" - "k8s.io/apimachinery/pkg/watch" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" ) var _ runtime.CacheableObject = &mockCacheableObject{} @@ -229,118 +222,3 @@ func TestWatchEncoderIdentifier(t *testing.T) { t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier") } } - -func TestWatchListEncoder(t *testing.T) { - makePartialObjectMetadataListWithoutKind := func(rv string) *metav1.PartialObjectMetadataList { - return &metav1.PartialObjectMetadataList{ - // do not set the type info to match - // newWatchListTransformer - ListMeta: metav1.ListMeta{ResourceVersion: rv}, - } - } - makePodListWithKind := func(rv string) *v1.PodList { - return &v1.PodList{ - TypeMeta: metav1.TypeMeta{ - // set the type info so - // that it differs from - // PartialObjectMetadataList - Kind: "PodList", - }, - ListMeta: metav1.ListMeta{ - ResourceVersion: rv, - }, - } - } - makeBookmarkEventFor := func(pod *v1.Pod) watch.Event { - return watch.Event{ - Type: watch.Bookmark, - Object: pod, - } - } - makePod := func(name string) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "ns", - Annotations: map[string]string{}, - }, - } - } - makePodWithInitialEventsAnnotation := func(name string) *v1.Pod { - p := makePod(name) - p.Annotations[metav1.InitialEventsAnnotationKey] = "true" - return p - } - - scenarios := []struct { - name string - negotiatedEncoder runtime.Serializer - targetGVK *schema.GroupVersionKind - - actualEvent watch.Event - listBlueprint runtime.Object - - expectedBase64ListBlueprint string - }{ - { - name: "pass through, an obj without the annotation received", - actualEvent: makeBookmarkEventFor(makePod("1")), - negotiatedEncoder: newJSONSerializer(), - }, - { - name: "encodes the initialEventsListBlueprint if an obj with the annotation is passed", - actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("1")), - listBlueprint: makePodListWithKind("100"), - expectedBase64ListBlueprint: encodeObjectToBase64String(makePodListWithKind("100"), t), - negotiatedEncoder: newJSONSerializer(), - }, - { - name: "encodes the initialEventsListBlueprint as PartialObjectMetadata when requested", - targetGVK: &schema.GroupVersionKind{Group: "meta.k8s.io", Version: "v1", Kind: "PartialObjectMetadata"}, - actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("2")), - listBlueprint: makePodListWithKind("101"), - expectedBase64ListBlueprint: encodeObjectToBase64String(makePartialObjectMetadataListWithoutKind("101"), t), - negotiatedEncoder: newJSONSerializer(), - }, - } - - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - target := newWatchListTransformer(scenario.listBlueprint, scenario.targetGVK, scenario.negotiatedEncoder) - transformedEvent := target.transform(scenario.actualEvent) - - actualObjectMeta, err := meta.Accessor(transformedEvent.Object) - if err != nil { - t.Fatal(err) - } - - base64ListBlueprint, ok := actualObjectMeta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey] - if !ok && len(scenario.expectedBase64ListBlueprint) != 0 { - t.Fatalf("the encoded obj doesn't have %q", metav1.InitialEventsListBlueprintAnnotationKey) - } - if base64ListBlueprint != scenario.expectedBase64ListBlueprint { - t.Fatalf("unexpected base64ListBlueprint = %s, expected = %s", base64ListBlueprint, scenario.expectedBase64ListBlueprint) - } - }) - } -} - -func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string { - e := newJSONSerializer() - - var buf bytes.Buffer - err := e.Encode(obj, &buf) - if err != nil { - t.Fatal(err) - } - return base64.StdEncoding.EncodeToString(buf.Bytes()) -} - -func newJSONSerializer() runtime.Serializer { - return runtimejson.NewSerializerWithOptions( - runtimejson.DefaultMetaFactory, - clientgoscheme.Scheme, - clientgoscheme.Scheme, - runtimejson.SerializerOptions{}, - ) -} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index b297b3c252a..7bf702c6188 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -64,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { // serveWatchHandler returns a handle to serve a watch response. // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. -func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string, initialEventsListBlueprint runtime.Object) (http.Handler, error) { +func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) { options, err := optionsForTransform(mediaTypeOptions, req) if err != nil { return nil, err @@ -167,8 +167,6 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp Encoder: encoder, EmbeddedEncoder: embeddedEncoder, - watchListTransformerFn: newWatchListTransformer(initialEventsListBlueprint, mediaTypeOptions.Convert, negotiatedEncoder).transform, - MemoryAllocator: memoryAllocator, TimeoutFactory: &realTimeoutFactory{timeout}, ServerShuttingDownCh: serverShuttingDownCh, @@ -198,10 +196,6 @@ type WatchServer struct { Encoder runtime.Encoder // used to encode the nested object in the watch stream EmbeddedEncoder runtime.Encoder - // watchListTransformerFn a function applied - // to watchlist bookmark events that transforms - // the embedded object before sending it to a client. - watchListTransformerFn watchListTransformerFunction MemoryAllocator runtime.MemoryAllocator TimeoutFactory TimeoutFactory @@ -247,7 +241,7 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) { flusher.Flush() gvr := s.Scope.Resource - watchEncoder := newWatchEncoder(req.Context(), gvr, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn) + watchEncoder := newWatchEncoder(req.Context(), gvr, s.EmbeddedEncoder, s.Encoder, framer) ch := s.Watching.ResultChan() done := req.Context().Done() @@ -316,7 +310,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { framer := newWebsocketFramer(ws, s.UseTextFraming) gvr := s.Scope.Resource - watchEncoder := newWatchEncoder(context.TODO(), gvr, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn) + watchEncoder := newWatchEncoder(context.TODO(), gvr, s.EmbeddedEncoder, s.Encoder, framer) ch := s.Watching.ResultChan() for { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 6b476884940..74f9d776b4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -949,9 +949,6 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // Since add() can block, we explicitly add when cacher is unlocked. // Dispatching event in nonblocking way first, which make faster watchers // not be blocked by slower ones. - // - // Note: if we ever decide to cache the serialization of bookmark events, - // we will also need to modify the watchEncoder encoder if event.Type == watch.Bookmark { for _, watcher := range c.watchersBuffer { watcher.nonblockingAdd(event)