mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-12-05 07:35:38 +00:00
Refactor watch event serialization to allow caching
This commit is contained in:
@@ -2361,6 +2361,19 @@ func TestWatchTransformCaching(t *testing.T) {
|
||||
}
|
||||
defer wTableIncludeObject.Close()
|
||||
|
||||
wTableIncludeObjectFiltered, err := clientSet.CoreV1().RESTClient().Get().
|
||||
AbsPath("/api/v1/namespaces/watch-transform/configmaps").
|
||||
SetHeader("Accept", "application/json;as=Table;g=meta.k8s.io;v=v1beta1").
|
||||
VersionedParams(listOptions, metav1.ParameterCodec).
|
||||
Param("includeObject", string(metav1.IncludeObject)).
|
||||
Param("labelSelector", "foo=bar").
|
||||
Timeout(timeout).
|
||||
Stream(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start table object watch: %v", err)
|
||||
}
|
||||
defer wTableIncludeObjectFiltered.Close()
|
||||
|
||||
configMap, err := clientSet.CoreV1().ConfigMaps("watch-transform").Create(ctx, &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test1"},
|
||||
Data: map[string]string{
|
||||
@@ -2397,6 +2410,30 @@ func TestWatchTransformCaching(t *testing.T) {
|
||||
t.Fatalf("Failed to create a second configMap: %v", err)
|
||||
}
|
||||
|
||||
// Now update both configmaps so that filtering watch can observe them.
|
||||
// This is needed to validate whether events caching done by apiserver
|
||||
// distinguished objects by type.
|
||||
|
||||
configMapUpdated, err := clientSet.CoreV1().ConfigMaps("watch-transform").Update(ctx, &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test1", Labels: map[string]string{"foo": "bar"}},
|
||||
Data: map[string]string{
|
||||
"foo": "baz",
|
||||
},
|
||||
}, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update a configMap: %v", err)
|
||||
}
|
||||
|
||||
configMap2Updated, err := clientSet.CoreV1().ConfigMaps("watch-transform").Update(ctx, &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test2", Labels: map[string]string{"foo": "bar"}},
|
||||
Data: map[string]string{
|
||||
"foo": "baz",
|
||||
},
|
||||
}, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update a second configMap: %v", err)
|
||||
}
|
||||
|
||||
metaChecks := []partialObjectMetadataCheck{
|
||||
func(res *metav1beta1.PartialObjectMetadata) {
|
||||
if !apiequality.Semantic.DeepEqual(configMap.ObjectMeta, res.ObjectMeta) {
|
||||
@@ -2408,6 +2445,16 @@ func TestWatchTransformCaching(t *testing.T) {
|
||||
t.Errorf("expected object: %#v, got: %#v", configMap2.ObjectMeta, res.ObjectMeta)
|
||||
}
|
||||
},
|
||||
func(res *metav1beta1.PartialObjectMetadata) {
|
||||
if !apiequality.Semantic.DeepEqual(configMapUpdated.ObjectMeta, res.ObjectMeta) {
|
||||
t.Errorf("expected object: %#v, got: %#v", configMapUpdated.ObjectMeta, res.ObjectMeta)
|
||||
}
|
||||
},
|
||||
func(res *metav1beta1.PartialObjectMetadata) {
|
||||
if !apiequality.Semantic.DeepEqual(configMap2Updated.ObjectMeta, res.ObjectMeta) {
|
||||
t.Errorf("expected object: %#v, got: %#v", configMap2Updated.ObjectMeta, res.ObjectMeta)
|
||||
}
|
||||
},
|
||||
}
|
||||
expectPartialObjectMetaEventsProtobufChecks(t, wMeta, metaChecks)
|
||||
|
||||
@@ -2421,39 +2468,67 @@ func TestWatchTransformCaching(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
objectMetas := expectTableWatchEvents(t, 2, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta))
|
||||
objectMetas := expectTableWatchEvents(t, 4, 3, metav1.IncludeMetadata, json.NewDecoder(wTableIncludeMeta))
|
||||
tableMetaCheck(configMap, objectMetas[0])
|
||||
tableMetaCheck(configMap2, objectMetas[1])
|
||||
tableMetaCheck(configMapUpdated, objectMetas[2])
|
||||
tableMetaCheck(configMap2Updated, objectMetas[3])
|
||||
|
||||
tableObjectCheck := func(expected *v1.ConfigMap, got []byte) {
|
||||
tableObjectCheck := func(expectedType watch.EventType, expectedObj *v1.ConfigMap, got streamedEvent) {
|
||||
var obj *v1.ConfigMap
|
||||
if err := json.Unmarshal(got, &obj); err != nil {
|
||||
if err := json.Unmarshal(got.rawObject, &obj); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if expectedType != watch.EventType(got.eventType) {
|
||||
t.Errorf("expected type: %#v, got: %#v", expectedType, got.eventType)
|
||||
}
|
||||
obj.TypeMeta = metav1.TypeMeta{}
|
||||
if !apiequality.Semantic.DeepEqual(expected, obj) {
|
||||
t.Errorf("expected object: %#v, got: %#v", expected, obj)
|
||||
if !apiequality.Semantic.DeepEqual(expectedObj, obj) {
|
||||
t.Errorf("expected object: %#v, got: %#v", expectedObj, obj)
|
||||
}
|
||||
}
|
||||
|
||||
objects := expectTableWatchEvents(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject))
|
||||
tableObjectCheck(configMap, objects[0])
|
||||
tableObjectCheck(configMap2, objects[1])
|
||||
objects := expectTableWatchEventsWithTypes(t, 4, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObject))
|
||||
tableObjectCheck(watch.Added, configMap, objects[0])
|
||||
tableObjectCheck(watch.Added, configMap2, objects[1])
|
||||
tableObjectCheck(watch.Modified, configMapUpdated, objects[2])
|
||||
tableObjectCheck(watch.Modified, configMap2Updated, objects[3])
|
||||
|
||||
delayedObjects := expectTableWatchEvents(t, 1, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed))
|
||||
tableObjectCheck(configMap2, delayedObjects[0])
|
||||
filteredObjects := expectTableWatchEventsWithTypes(t, 2, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectFiltered))
|
||||
tableObjectCheck(watch.Added, configMapUpdated, filteredObjects[0])
|
||||
tableObjectCheck(watch.Added, configMap2Updated, filteredObjects[1])
|
||||
|
||||
delayedObjects := expectTableWatchEventsWithTypes(t, 3, 3, metav1.IncludeObject, json.NewDecoder(wTableIncludeObjectDelayed))
|
||||
tableObjectCheck(watch.Added, configMap2, delayedObjects[0])
|
||||
tableObjectCheck(watch.Modified, configMapUpdated, delayedObjects[1])
|
||||
tableObjectCheck(watch.Modified, configMap2Updated, delayedObjects[2])
|
||||
}
|
||||
|
||||
func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) [][]byte {
|
||||
events := expectTableWatchEventsWithTypes(t, count, columns, policy, d)
|
||||
var objects [][]byte
|
||||
for _, event := range events {
|
||||
objects = append(objects, event.rawObject)
|
||||
}
|
||||
return objects
|
||||
}
|
||||
|
||||
type streamedEvent struct {
|
||||
eventType string
|
||||
rawObject []byte
|
||||
}
|
||||
|
||||
func expectTableWatchEventsWithTypes(t *testing.T, count, columns int, policy metav1.IncludeObjectPolicy, d *json.Decoder) []streamedEvent {
|
||||
t.Helper()
|
||||
|
||||
var objects [][]byte
|
||||
var events []streamedEvent
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
var evt metav1.WatchEvent
|
||||
if err := d.Decode(&evt); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var table metav1beta1.Table
|
||||
if err := json.Unmarshal(evt.Object.Raw, &table); err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -2484,7 +2559,7 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl
|
||||
if meta.TypeMeta != partialObj {
|
||||
t.Fatalf("expected partial object: %#v", meta)
|
||||
}
|
||||
objects = append(objects, row.Object.Raw)
|
||||
events = append(events, streamedEvent{eventType: evt.Type, rawObject: row.Object.Raw})
|
||||
case metav1.IncludeNone:
|
||||
if len(row.Object.Raw) != 0 {
|
||||
t.Fatalf("Expected no object: %s", string(row.Object.Raw))
|
||||
@@ -2493,10 +2568,10 @@ func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1.Incl
|
||||
if len(row.Object.Raw) == 0 {
|
||||
t.Fatalf("Expected object: %s", string(row.Object.Raw))
|
||||
}
|
||||
objects = append(objects, row.Object.Raw)
|
||||
events = append(events, streamedEvent{eventType: evt.Type, rawObject: row.Object.Raw})
|
||||
}
|
||||
}
|
||||
return objects
|
||||
return events
|
||||
}
|
||||
|
||||
func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...string) {
|
||||
|
||||
Reference in New Issue
Block a user