apiserver/handlers/watch: stop encoding initialEventsListBlueprint (#132326)

* apiserver/handlers/get: remove constructing versionedList

* endpoints/handlers/response: rm watchListTransformer

* endpoints/handlers/watch: unwire watchListTransformer

* storage/cacher: rm documentation about caching the serialization of bookmark events
This commit is contained in:
Lukasz Szaszkiewicz
2025-06-16 16:02:59 +02:00
committed by GitHub
parent 0154f8a222
commit fc198b92c0
5 changed files with 12 additions and 254 deletions

View File

@@ -266,15 +266,6 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
var emptyVersionedList runtime.Object
if isListWatchRequest(opts) {
emptyVersionedList, err = scope.Convertor.ConvertToVersion(r.NewList(), scope.Kind.GroupVersion())
if err != nil {
scope.err(errors.NewInternalError(err), w, req)
return
}
}
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer func() { cancel() }()
@@ -283,7 +274,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
scope.err(err, w, req)
return
}
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts), emptyVersionedList)
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts))
if err != nil {
scope.err(err, w, req)
return

View File

@@ -18,7 +18,6 @@ package handlers
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
@@ -39,7 +38,6 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/util/apihelpers"
"k8s.io/klog/v2"
)
@@ -149,8 +147,6 @@ type watchEncoder struct {
encoder runtime.Encoder
framer io.Writer
watchListTransformerFn watchListTransformerFunction
buffer runtime.Splice
eventBuffer runtime.Splice
@@ -158,16 +154,15 @@ type watchEncoder struct {
identifiers map[watch.EventType]runtime.Identifier
}
func newWatchEncoder(ctx context.Context, gvr schema.GroupVersionResource, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer, watchListTransformerFn watchListTransformerFunction) *watchEncoder {
func newWatchEncoder(ctx context.Context, gvr schema.GroupVersionResource, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder {
return &watchEncoder{
ctx: ctx,
groupVersionResource: gvr,
embeddedEncoder: embeddedEncoder,
encoder: encoder,
framer: framer,
watchListTransformerFn: watchListTransformerFn,
buffer: runtime.NewSpliceBuffer(),
eventBuffer: runtime.NewSpliceBuffer(),
ctx: ctx,
groupVersionResource: gvr,
embeddedEncoder: embeddedEncoder,
encoder: encoder,
framer: framer,
buffer: runtime.NewSpliceBuffer(),
eventBuffer: runtime.NewSpliceBuffer(),
}
}
@@ -179,12 +174,6 @@ func (e *watchEncoder) Encode(event watch.Event) error {
encodeFunc := func(obj runtime.Object, w io.Writer) error {
return e.doEncode(obj, event, w)
}
if event.Type == watch.Bookmark {
// Bookmark objects are small, and we don't yet support serialization for them.
// Additionally, we need to additionally transform them to support watch-list feature
event = e.watchListTransformerFn(event)
return encodeFunc(event.Object, e.framer)
}
if co, ok := event.Object.(runtime.CacheableObject); ok {
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
}
@@ -490,94 +479,3 @@ func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.Grou
return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion))
}
}
// watchListTransformerFunction an optional function
// applied to watchlist bookmark events that transforms
// the embedded object before sending it to a client.
type watchListTransformerFunction func(watch.Event) watch.Event
// watchListTransformer performs transformation of
// a special watchList bookmark event.
//
// The bookmark is annotated with InitialEventsListBlueprintAnnotationKey
// and contains an empty, versioned list that we must encode in the requested format
// (e.g., protobuf, JSON, CBOR) and then store as a base64-encoded string.
type watchListTransformer struct {
initialEventsListBlueprint runtime.Object
targetGVK *schema.GroupVersionKind
negotiatedEncoder runtime.Encoder
buffer runtime.Splice
}
// createWatchListTransformerIfRequested returns a transformer function for watchlist bookmark event.
func newWatchListTransformer(initialEventsListBlueprint runtime.Object, targetGVK *schema.GroupVersionKind, negotiatedEncoder runtime.Encoder) *watchListTransformer {
return &watchListTransformer{
initialEventsListBlueprint: initialEventsListBlueprint,
targetGVK: targetGVK,
negotiatedEncoder: negotiatedEncoder,
buffer: runtime.NewSpliceBuffer(),
}
}
func (e *watchListTransformer) transform(event watch.Event) watch.Event {
if e.initialEventsListBlueprint == nil {
return event
}
hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object)
if err != nil {
return newWatchEventErrorFor(err)
}
if !hasAnnotation {
return event
}
if err = e.encodeInitialEventsListBlueprint(event.Object); err != nil {
return newWatchEventErrorFor(err)
}
return event
}
func (e *watchListTransformer) encodeInitialEventsListBlueprint(object runtime.Object) error {
initialEventsListBlueprint, err := e.transformInitialEventsListBlueprint()
if err != nil {
return err
}
defer e.buffer.Reset()
if err = e.negotiatedEncoder.Encode(initialEventsListBlueprint, e.buffer); err != nil {
return err
}
encodedInitialEventsListBlueprint := e.buffer.Bytes()
// the storage layer creates a deep copy of the obj before modifying it.
// since the object has the annotation, we can modify it directly.
objectMeta, err := meta.Accessor(object)
if err != nil {
return err
}
annotations := objectMeta.GetAnnotations()
annotations[metav1.InitialEventsListBlueprintAnnotationKey] = base64.StdEncoding.EncodeToString(encodedInitialEventsListBlueprint)
objectMeta.SetAnnotations(annotations)
return nil
}
func (e *watchListTransformer) transformInitialEventsListBlueprint() (runtime.Object, error) {
if e.targetGVK != nil && e.targetGVK.Kind == "PartialObjectMetadata" {
return asPartialObjectMetadataList(e.initialEventsListBlueprint, e.targetGVK.GroupVersion())
}
return e.initialEventsListBlueprint, nil
}
func newWatchEventErrorFor(err error) watch.Event {
return watch.Event{
Type: watch.Error,
Object: &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Reason: metav1.StatusReasonInternalError,
Code: http.StatusInternalServerError,
},
}
}

View File

@@ -17,9 +17,7 @@ limitations under the License.
package handlers
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
@@ -27,18 +25,13 @@ import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
)
var _ runtime.CacheableObject = &mockCacheableObject{}
@@ -229,118 +222,3 @@ func TestWatchEncoderIdentifier(t *testing.T) {
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
}
}
func TestWatchListEncoder(t *testing.T) {
makePartialObjectMetadataListWithoutKind := func(rv string) *metav1.PartialObjectMetadataList {
return &metav1.PartialObjectMetadataList{
// do not set the type info to match
// newWatchListTransformer
ListMeta: metav1.ListMeta{ResourceVersion: rv},
}
}
makePodListWithKind := func(rv string) *v1.PodList {
return &v1.PodList{
TypeMeta: metav1.TypeMeta{
// set the type info so
// that it differs from
// PartialObjectMetadataList
Kind: "PodList",
},
ListMeta: metav1.ListMeta{
ResourceVersion: rv,
},
}
}
makeBookmarkEventFor := func(pod *v1.Pod) watch.Event {
return watch.Event{
Type: watch.Bookmark,
Object: pod,
}
}
makePod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
Annotations: map[string]string{},
},
}
}
makePodWithInitialEventsAnnotation := func(name string) *v1.Pod {
p := makePod(name)
p.Annotations[metav1.InitialEventsAnnotationKey] = "true"
return p
}
scenarios := []struct {
name string
negotiatedEncoder runtime.Serializer
targetGVK *schema.GroupVersionKind
actualEvent watch.Event
listBlueprint runtime.Object
expectedBase64ListBlueprint string
}{
{
name: "pass through, an obj without the annotation received",
actualEvent: makeBookmarkEventFor(makePod("1")),
negotiatedEncoder: newJSONSerializer(),
},
{
name: "encodes the initialEventsListBlueprint if an obj with the annotation is passed",
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("1")),
listBlueprint: makePodListWithKind("100"),
expectedBase64ListBlueprint: encodeObjectToBase64String(makePodListWithKind("100"), t),
negotiatedEncoder: newJSONSerializer(),
},
{
name: "encodes the initialEventsListBlueprint as PartialObjectMetadata when requested",
targetGVK: &schema.GroupVersionKind{Group: "meta.k8s.io", Version: "v1", Kind: "PartialObjectMetadata"},
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("2")),
listBlueprint: makePodListWithKind("101"),
expectedBase64ListBlueprint: encodeObjectToBase64String(makePartialObjectMetadataListWithoutKind("101"), t),
negotiatedEncoder: newJSONSerializer(),
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
target := newWatchListTransformer(scenario.listBlueprint, scenario.targetGVK, scenario.negotiatedEncoder)
transformedEvent := target.transform(scenario.actualEvent)
actualObjectMeta, err := meta.Accessor(transformedEvent.Object)
if err != nil {
t.Fatal(err)
}
base64ListBlueprint, ok := actualObjectMeta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
if !ok && len(scenario.expectedBase64ListBlueprint) != 0 {
t.Fatalf("the encoded obj doesn't have %q", metav1.InitialEventsListBlueprintAnnotationKey)
}
if base64ListBlueprint != scenario.expectedBase64ListBlueprint {
t.Fatalf("unexpected base64ListBlueprint = %s, expected = %s", base64ListBlueprint, scenario.expectedBase64ListBlueprint)
}
})
}
}
func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string {
e := newJSONSerializer()
var buf bytes.Buffer
err := e.Encode(obj, &buf)
if err != nil {
t.Fatal(err)
}
return base64.StdEncoding.EncodeToString(buf.Bytes())
}
func newJSONSerializer() runtime.Serializer {
return runtimejson.NewSerializerWithOptions(
runtimejson.DefaultMetaFactory,
clientgoscheme.Scheme,
clientgoscheme.Scheme,
runtimejson.SerializerOptions{},
)
}

View File

@@ -64,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
// serveWatchHandler returns a handle to serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string, initialEventsListBlueprint runtime.Object) (http.Handler, error) {
func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) {
options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil {
return nil, err
@@ -167,8 +167,6 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
watchListTransformerFn: newWatchListTransformer(initialEventsListBlueprint, mediaTypeOptions.Convert, negotiatedEncoder).transform,
MemoryAllocator: memoryAllocator,
TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh,
@@ -198,10 +196,6 @@ type WatchServer struct {
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder
// watchListTransformerFn a function applied
// to watchlist bookmark events that transforms
// the embedded object before sending it to a client.
watchListTransformerFn watchListTransformerFunction
MemoryAllocator runtime.MemoryAllocator
TimeoutFactory TimeoutFactory
@@ -247,7 +241,7 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
flusher.Flush()
gvr := s.Scope.Resource
watchEncoder := newWatchEncoder(req.Context(), gvr, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
watchEncoder := newWatchEncoder(req.Context(), gvr, s.EmbeddedEncoder, s.Encoder, framer)
ch := s.Watching.ResultChan()
done := req.Context().Done()
@@ -316,7 +310,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
framer := newWebsocketFramer(ws, s.UseTextFraming)
gvr := s.Scope.Resource
watchEncoder := newWatchEncoder(context.TODO(), gvr, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
watchEncoder := newWatchEncoder(context.TODO(), gvr, s.EmbeddedEncoder, s.Encoder, framer)
ch := s.Watching.ResultChan()
for {

View File

@@ -949,9 +949,6 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
//
// Note: if we ever decide to cache the serialization of bookmark events,
// we will also need to modify the watchEncoder encoder
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)