mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Fix original object mutation on patch retry
This commit is contained in:
		| @@ -553,6 +553,7 @@ func handleUnmarshal(j []byte) (map[string]interface{}, error) { | |||||||
| // StrategicMergePatch applies a strategic merge patch. The original and patch documents | // StrategicMergePatch applies a strategic merge patch. The original and patch documents | ||||||
| // must be JSONMap. A patch can be created from an original and modified document by | // must be JSONMap. A patch can be created from an original and modified document by | ||||||
| // calling CreateTwoWayMergeMapPatch. | // calling CreateTwoWayMergeMapPatch. | ||||||
|  | // Warning: the original and patch JSONMap objects are mutated by this function and should not be reused. | ||||||
| func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) { | func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) { | ||||||
| 	t, err := getTagStructType(dataStruct) | 	t, err := getTagStructType(dataStruct) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|   | |||||||
| @@ -86,21 +86,21 @@ func strategicPatchObject( | |||||||
| 	patchJS []byte, | 	patchJS []byte, | ||||||
| 	objToUpdate runtime.Object, | 	objToUpdate runtime.Object, | ||||||
| 	versionedObj runtime.Object, | 	versionedObj runtime.Object, | ||||||
| ) (originalObjMap map[string]interface{}, patchMap map[string]interface{}, retErr error) { | ) error { | ||||||
| 	originalObjMap = make(map[string]interface{}) | 	originalObjMap := make(map[string]interface{}) | ||||||
| 	if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil { | 	if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil { | ||||||
| 		return nil, nil, err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	patchMap = make(map[string]interface{}) | 	patchMap := make(map[string]interface{}) | ||||||
| 	if err := json.Unmarshal(patchJS, &patchMap); err != nil { | 	if err := json.Unmarshal(patchJS, &patchMap); err != nil { | ||||||
| 		return nil, nil, err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil { | 	if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil { | ||||||
| 		return nil, nil, err | 		return err | ||||||
| 	} | 	} | ||||||
| 	return | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // applyPatchToObject applies a strategic merge patch of <patchMap> to | // applyPatchToObject applies a strategic merge patch of <patchMap> to | ||||||
|   | |||||||
| @@ -570,7 +570,7 @@ func patchResource( | |||||||
| 		originalObjJS           []byte | 		originalObjJS           []byte | ||||||
| 		originalPatchedObjJS    []byte | 		originalPatchedObjJS    []byte | ||||||
| 		originalObjMap          map[string]interface{} | 		originalObjMap          map[string]interface{} | ||||||
| 		originalPatchMap        map[string]interface{} | 		getOriginalPatchMap     func() (map[string]interface{}, error) | ||||||
| 		lastConflictErr         error | 		lastConflictErr         error | ||||||
| 		originalResourceVersion string | 		originalResourceVersion string | ||||||
| 	) | 	) | ||||||
| @@ -610,6 +610,26 @@ func patchResource( | |||||||
| 					return nil, err | 					return nil, err | ||||||
| 				} | 				} | ||||||
| 				originalObjJS, originalPatchedObjJS = originalJS, patchedJS | 				originalObjJS, originalPatchedObjJS = originalJS, patchedJS | ||||||
|  |  | ||||||
|  | 				// Make a getter that can return a fresh strategic patch map if needed for conflict retries | ||||||
|  | 				// We have to rebuild it each time we need it, because the map gets mutated when being applied | ||||||
|  | 				var originalPatchBytes []byte | ||||||
|  | 				getOriginalPatchMap = func() (map[string]interface{}, error) { | ||||||
|  | 					if originalPatchBytes == nil { | ||||||
|  | 						// Compute once | ||||||
|  | 						originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj) | ||||||
|  | 						if err != nil { | ||||||
|  | 							return nil, err | ||||||
|  | 						} | ||||||
|  | 					} | ||||||
|  | 					// Return a fresh map every time | ||||||
|  | 					originalPatchMap := make(map[string]interface{}) | ||||||
|  | 					if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil { | ||||||
|  | 						return nil, err | ||||||
|  | 					} | ||||||
|  | 					return originalPatchMap, nil | ||||||
|  | 				} | ||||||
|  |  | ||||||
| 			case types.StrategicMergePatchType: | 			case types.StrategicMergePatchType: | ||||||
| 				// Since the patch is applied on versioned objects, we need to convert the | 				// Since the patch is applied on versioned objects, we need to convert the | ||||||
| 				// current object to versioned representation first. | 				// current object to versioned representation first. | ||||||
| @@ -621,8 +641,12 @@ func patchResource( | |||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					return nil, err | 					return nil, err | ||||||
| 				} | 				} | ||||||
| 				originalMap, patchMap, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) | 				// Capture the original object map and patch for possible retries. | ||||||
| 				if err != nil { | 				originalMap := make(map[string]interface{}) | ||||||
|  | 				if err := unstructured.DefaultConverter.ToUnstructured(currentVersionedObject, &originalMap); err != nil { | ||||||
|  | 					return nil, err | ||||||
|  | 				} | ||||||
|  | 				if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil { | ||||||
| 					return nil, err | 					return nil, err | ||||||
| 				} | 				} | ||||||
| 				// Convert the object back to unversioned. | 				// Convert the object back to unversioned. | ||||||
| @@ -632,8 +656,17 @@ func patchResource( | |||||||
| 					return nil, err | 					return nil, err | ||||||
| 				} | 				} | ||||||
| 				objToUpdate = unversionedObjToUpdate | 				objToUpdate = unversionedObjToUpdate | ||||||
| 				// Store unstructured representations for possible retries. | 				// Store unstructured representation for possible retries. | ||||||
| 				originalObjMap, originalPatchMap = originalMap, patchMap | 				originalObjMap = originalMap | ||||||
|  | 				// Make a getter that can return a fresh strategic patch map if needed for conflict retries | ||||||
|  | 				// We have to rebuild it each time we need it, because the map gets mutated when being applied | ||||||
|  | 				getOriginalPatchMap = func() (map[string]interface{}, error) { | ||||||
|  | 					patchMap := make(map[string]interface{}) | ||||||
|  | 					if err := json.Unmarshal(patchJS, &patchMap); err != nil { | ||||||
|  | 						return nil, err | ||||||
|  | 					} | ||||||
|  | 					return patchMap, nil | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
| 			if err := checkName(objToUpdate, name, namespace, namer); err != nil { | 			if err := checkName(objToUpdate, name, namespace, namer); err != nil { | ||||||
| 				return nil, err | 				return nil, err | ||||||
| @@ -669,17 +702,6 @@ func patchResource( | |||||||
| 					return nil, err | 					return nil, err | ||||||
| 				} | 				} | ||||||
| 			} else { | 			} else { | ||||||
| 				if originalPatchMap == nil { |  | ||||||
| 					// Compute original patch, if we already didn't do this in previous retries. |  | ||||||
| 					originalPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj) |  | ||||||
| 					if err != nil { |  | ||||||
| 						return nil, err |  | ||||||
| 					} |  | ||||||
| 					originalPatchMap = make(map[string]interface{}) |  | ||||||
| 					if err := json.Unmarshal(originalPatch, &originalPatchMap); err != nil { |  | ||||||
| 						return nil, err |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 				// Compute current patch. | 				// Compute current patch. | ||||||
| 				currentObjJS, err := runtime.Encode(codec, currentObject) | 				currentObjJS, err := runtime.Encode(codec, currentObject) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| @@ -695,6 +717,12 @@ func patchResource( | |||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | 			// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map | ||||||
|  | 			originalPatchMap, err := getOriginalPatchMap() | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
|  | 			} | ||||||
|  |  | ||||||
| 			hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap) | 			hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return nil, err | 				return nil, err | ||||||
|   | |||||||
| @@ -74,7 +74,7 @@ func TestPatchAnonymousField(t *testing.T) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	actual := &testPatchType{} | 	actual := &testPatchType{} | ||||||
| 	_, _, err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{}) | 	err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("unexpected error: %v", err) | 		t.Fatalf("unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
| @@ -314,7 +314,7 @@ func TestNumberConversion(t *testing.T) { | |||||||
|  |  | ||||||
| 	patchJS := []byte(`{"spec":{"ports":[{"port":80,"nodePort":31789}]}}`) | 	patchJS := []byte(`{"spec":{"ports":[{"port":80,"nodePort":31789}]}}`) | ||||||
|  |  | ||||||
| 	_, _, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) | 	err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|   | |||||||
							
								
								
									
										116
									
								
								test/integration/apiserver/patch_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								test/integration/apiserver/patch_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,116 @@ | |||||||
|  | // +build integration,!no-etcd | ||||||
|  |  | ||||||
|  | /* | ||||||
|  | Copyright 2017 The Kubernetes Authors. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package apiserver | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync" | ||||||
|  | 	"sync/atomic" | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"github.com/pborman/uuid" | ||||||
|  |  | ||||||
|  | 	"reflect" | ||||||
|  |  | ||||||
|  | 	"k8s.io/apimachinery/pkg/api/errors" | ||||||
|  | 	"k8s.io/apimachinery/pkg/api/meta" | ||||||
|  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	"k8s.io/apimachinery/pkg/types" | ||||||
|  | 	"k8s.io/apiserver/pkg/endpoints/handlers" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/v1" | ||||||
|  | 	"k8s.io/kubernetes/test/integration/framework" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Tests that the apiserver retries non-overlapping conflicts on patches | ||||||
|  | func TestPatchConflicts(t *testing.T) { | ||||||
|  | 	s, clientSet := setup(t) | ||||||
|  | 	defer s.Close() | ||||||
|  |  | ||||||
|  | 	ns := framework.CreateTestingNamespace("status-code", s, t) | ||||||
|  | 	defer framework.DeleteTestingNamespace(ns, s, t) | ||||||
|  |  | ||||||
|  | 	// Create the object we're going to conflict on | ||||||
|  | 	clientSet.Core().Secrets(ns.Name).Create(&v1.Secret{ | ||||||
|  | 		ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 			Name: "test", | ||||||
|  | 			// Populate annotations so the strategic patch descends, compares, and notices the $patch directive | ||||||
|  | 			Annotations: map[string]string{"initial": "value"}, | ||||||
|  | 		}, | ||||||
|  | 	}) | ||||||
|  | 	client := clientSet.Core().RESTClient() | ||||||
|  |  | ||||||
|  | 	successes := int32(0) | ||||||
|  |  | ||||||
|  | 	// Run a lot of simultaneous patch operations to exercise internal API server retry of patch application. | ||||||
|  | 	// Internally, a patch API call retries up to MaxRetryWhenPatchConflicts times if the resource version of the object has changed. | ||||||
|  | 	// If the resource version of the object changed between attempts, that means another one of our patch requests succeeded. | ||||||
|  | 	// That means if we run 2*MaxRetryWhenPatchConflicts patch attempts, we should see at least MaxRetryWhenPatchConflicts succeed. | ||||||
|  | 	wg := sync.WaitGroup{} | ||||||
|  | 	for i := 0; i < (2 * handlers.MaxRetryWhenPatchConflicts); i++ { | ||||||
|  | 		wg.Add(1) | ||||||
|  | 		go func(i int) { | ||||||
|  | 			defer wg.Done() | ||||||
|  | 			annotationName := fmt.Sprintf("annotation-%d", i) | ||||||
|  | 			labelName := fmt.Sprintf("label-%d", i) | ||||||
|  | 			value := uuid.NewRandom().String() | ||||||
|  |  | ||||||
|  | 			obj, err := client.Patch(types.StrategicMergePatchType). | ||||||
|  | 				Namespace(ns.Name). | ||||||
|  | 				Resource("secrets"). | ||||||
|  | 				Name("test"). | ||||||
|  | 				Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}, "annotations":{"$patch":"replace","%s":"%s"}}}`, labelName, value, annotationName, value))). | ||||||
|  | 				Do(). | ||||||
|  | 				Get() | ||||||
|  |  | ||||||
|  | 			if errors.IsConflict(err) { | ||||||
|  | 				t.Logf("tolerated conflict error patching %s: %v", "secrets", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			if err != nil { | ||||||
|  | 				t.Errorf("error patching %s: %v", "secrets", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			accessor, err := meta.Accessor(obj) | ||||||
|  | 			if err != nil { | ||||||
|  | 				t.Errorf("error getting object from %s: %v", "secrets", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			// make sure the label we wanted was effective | ||||||
|  | 			if accessor.GetLabels()[labelName] != value { | ||||||
|  | 				t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", "secrets", labelName, value, accessor.GetLabels()) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			// make sure the patch directive didn't get lost, and that the entire annotation map was replaced | ||||||
|  | 			if !reflect.DeepEqual(accessor.GetAnnotations(), map[string]string{annotationName: value}) { | ||||||
|  | 				t.Errorf("patch of %s with $patch directive was ineffective, didn't replace entire annotations map: %#v", "secrets", accessor.GetAnnotations()) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			atomic.AddInt32(&successes, 1) | ||||||
|  | 		}(i) | ||||||
|  | 	} | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | 	if successes < handlers.MaxRetryWhenPatchConflicts { | ||||||
|  | 		t.Errorf("Expected at least %d successful patches for %s, got %d", handlers.MaxRetryWhenPatchConflicts, "secrets", successes) | ||||||
|  | 	} else { | ||||||
|  | 		t.Logf("Got %d successful patches for %s", successes, "secrets") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 Jordan Liggitt
					Jordan Liggitt