mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #7705 from bprashanth/rollingupdate_2
Reliable updates in rollingupdate
This commit is contained in:
		@@ -546,8 +546,6 @@ __EOF__
 | 
				
			|||||||
  kubectl delete service frontend{,-2,-3} "${kube_flags[@]}"
 | 
					  kubectl delete service frontend{,-2,-3} "${kube_flags[@]}"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  ### Perform a rolling update with --image
 | 
					  ### Perform a rolling update with --image
 | 
				
			||||||
  # Pre-condition status.Replicas is 3, otherwise the rcmanager could update it and interfere with the rolling update
 | 
					 | 
				
			||||||
  kube::test::get_object_assert 'rc frontend' "{{$rc_status_replicas_field}}" '3'
 | 
					 | 
				
			||||||
  # Command
 | 
					  # Command
 | 
				
			||||||
  kubectl rolling-update frontend --image=kubernetes/pause --update-period=10ns --poll-interval=10ms "${kube_flags[@]}"
 | 
					  kubectl rolling-update frontend --image=kubernetes/pause --update-period=10ns --poll-interval=10ms "${kube_flags[@]}"
 | 
				
			||||||
  # Post-condition: current image IS kubernetes/pause
 | 
					  # Post-condition: current image IS kubernetes/pause
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,6 +34,7 @@ import (
 | 
				
			|||||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource"
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource"
 | 
				
			||||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
 | 
				
			||||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
 | 
				
			||||||
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
 | 
				
			||||||
	"github.com/spf13/cobra"
 | 
						"github.com/spf13/cobra"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -287,6 +288,34 @@ func hashObject(obj runtime.Object, codec runtime.Codec) (string, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const MaxRetries = 3
 | 
					const MaxRetries = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type updateFunc func(controller *api.ReplicationController)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// updateWithRetries updates applies the given rc as an update.
 | 
				
			||||||
 | 
					func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
 | 
				
			||||||
 | 
						// Each update could take ~100ms, so give it 0.5 second
 | 
				
			||||||
 | 
						var err error
 | 
				
			||||||
 | 
						oldRc := rc
 | 
				
			||||||
 | 
						err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) {
 | 
				
			||||||
 | 
							// Apply the update, then attempt to push it to the apiserver.
 | 
				
			||||||
 | 
							applyUpdate(rc)
 | 
				
			||||||
 | 
							if rc, err = rcClient.Update(rc); err == nil {
 | 
				
			||||||
 | 
								// rc contains the latest controller post update
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// Update the controller with the latest resource version, if the update failed we
 | 
				
			||||||
 | 
							// can't trust rc so use oldRc.Name.
 | 
				
			||||||
 | 
							if rc, err = rcClient.Get(oldRc.Name); err != nil {
 | 
				
			||||||
 | 
								// The Get failed: Value in rc cannot be trusted.
 | 
				
			||||||
 | 
								rc = oldRc
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// The Get passed: rc contains the latest controller, expect a poll for the update.
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
 | 
				
			||||||
 | 
						// controller contains the applied update.
 | 
				
			||||||
 | 
						return rc, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client *client.Client, deploymentKey, namespace string, out io.Writer) (*api.ReplicationController, error) {
 | 
					func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client *client.Client, deploymentKey, namespace string, out io.Writer) (*api.ReplicationController, error) {
 | 
				
			||||||
	oldHash, err := hashObject(oldRc, client.Codec)
 | 
						oldHash, err := hashObject(oldRc, client.Codec)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -296,8 +325,9 @@ func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
 | 
				
			|||||||
	if oldRc.Spec.Template.Labels == nil {
 | 
						if oldRc.Spec.Template.Labels == nil {
 | 
				
			||||||
		oldRc.Spec.Template.Labels = map[string]string{}
 | 
							oldRc.Spec.Template.Labels = map[string]string{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	oldRc.Spec.Template.Labels[deploymentKey] = oldHash
 | 
						if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
 | 
				
			||||||
	if oldRc, err = client.ReplicationControllers(namespace).Update(oldRc); err != nil {
 | 
							rc.Spec.Template.Labels[deploymentKey] = oldHash
 | 
				
			||||||
 | 
						}); err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -341,10 +371,11 @@ func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
 | 
				
			|||||||
	for k, v := range oldRc.Spec.Selector {
 | 
						for k, v := range oldRc.Spec.Selector {
 | 
				
			||||||
		selectorCopy[k] = v
 | 
							selectorCopy[k] = v
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	oldRc.Spec.Selector[deploymentKey] = oldHash
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Update the selector of the rc so it manages all the pods we updated above
 | 
						// Update the selector of the rc so it manages all the pods we updated above
 | 
				
			||||||
	if oldRc, err = client.ReplicationControllers(namespace).Update(oldRc); err != nil {
 | 
						if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
 | 
				
			||||||
 | 
							rc.Spec.Selector[deploymentKey] = oldHash
 | 
				
			||||||
 | 
						}); err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,6 +20,7 @@ import (
 | 
				
			|||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"io/ioutil"
 | 
						"io/ioutil"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
				
			||||||
@@ -183,6 +184,96 @@ func TestAddDeploymentHash(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestUpdateWithRetries(t *testing.T) {
 | 
				
			||||||
 | 
						f, tf, codec := NewAPIFactory()
 | 
				
			||||||
 | 
						rc := &api.ReplicationController{
 | 
				
			||||||
 | 
							ObjectMeta: api.ObjectMeta{Name: "rc",
 | 
				
			||||||
 | 
								Labels: map[string]string{
 | 
				
			||||||
 | 
									"foo": "bar",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: api.ReplicationControllerSpec{
 | 
				
			||||||
 | 
								Selector: map[string]string{
 | 
				
			||||||
 | 
									"foo": "bar",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								Template: &api.PodTemplateSpec{
 | 
				
			||||||
 | 
									ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
										Labels: map[string]string{
 | 
				
			||||||
 | 
											"foo": "bar",
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: api.PodSpec{
 | 
				
			||||||
 | 
										RestartPolicy: api.RestartPolicyAlways,
 | 
				
			||||||
 | 
										DNSPolicy:     api.DNSClusterFirst,
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Test end to end updating of the rc with retries. Essentially make sure the update handler
 | 
				
			||||||
 | 
						// sees the right updates, failures in update/get are handled properly, and that the updated
 | 
				
			||||||
 | 
						// rc with new resource version is returned to the caller. Without any of these rollingupdate
 | 
				
			||||||
 | 
						// will fail cryptically.
 | 
				
			||||||
 | 
						newRc := *rc
 | 
				
			||||||
 | 
						newRc.ResourceVersion = "2"
 | 
				
			||||||
 | 
						newRc.Spec.Selector["baz"] = "foobar"
 | 
				
			||||||
 | 
						updates := []*http.Response{
 | 
				
			||||||
 | 
							{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
 | 
				
			||||||
 | 
							{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
 | 
				
			||||||
 | 
							{StatusCode: 200, Body: objBody(codec, &newRc)},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						gets := []*http.Response{
 | 
				
			||||||
 | 
							{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
 | 
				
			||||||
 | 
							{StatusCode: 200, Body: objBody(codec, rc)},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tf.Client = &client.FakeRESTClient{
 | 
				
			||||||
 | 
							Codec: codec,
 | 
				
			||||||
 | 
							Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
 | 
								switch p, m := req.URL.Path, req.Method; {
 | 
				
			||||||
 | 
								case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
 | 
				
			||||||
 | 
									update := updates[0]
 | 
				
			||||||
 | 
									updates = updates[1:]
 | 
				
			||||||
 | 
									// We should always get an update with a valid rc even when the get fails. The rc should always
 | 
				
			||||||
 | 
									// contain the update.
 | 
				
			||||||
 | 
									if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) {
 | 
				
			||||||
 | 
										t.Errorf("Unexpected update body, got %+v expected %+v", c, rc)
 | 
				
			||||||
 | 
									} else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" {
 | 
				
			||||||
 | 
										t.Errorf("Expected selector label update, got %+v", c.Spec.Selector)
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										delete(c.Spec.Selector, "baz")
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return update, nil
 | 
				
			||||||
 | 
								case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET":
 | 
				
			||||||
 | 
									get := gets[0]
 | 
				
			||||||
 | 
									gets = gets[1:]
 | 
				
			||||||
 | 
									return get, nil
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
				
			||||||
 | 
									return nil, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tf.ClientConfig = &client.Config{Version: latest.Version}
 | 
				
			||||||
 | 
						client, err := f.Client()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
 | 
							t.Fail()
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if rc, err := updateWithRetries(
 | 
				
			||||||
 | 
							client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) {
 | 
				
			||||||
 | 
								c.Spec.Selector["baz"] = "foobar"
 | 
				
			||||||
 | 
							}); err != nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
 | 
						} else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" {
 | 
				
			||||||
 | 
							t.Errorf("Expected updated rc, got %+v", rc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(updates) != 0 || len(gets) != 0 {
 | 
				
			||||||
 | 
							t.Errorf("Remaining updates %+v gets %+v", updates, gets)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object {
 | 
					func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object {
 | 
				
			||||||
	data, err := ioutil.ReadAll(req.Body)
 | 
						data, err := ioutil.ReadAll(req.Body)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user