Merge pull request #46669 from kow3ns/statefulset-update

Automatic merge from submit-queue (batch tested with PRs 46235, 44786, 46833, 46756, 46669)

implements StatefulSet update

**What this PR does / why we need it**:
1. Implements rolling update for StatefulSets
2. Implements controller history for StatefulSets.
3. Makes StatefulSet status reporting consistent with DaemonSet and ReplicaSet.

https://github.com/kubernetes/features/issues/188

**Special notes for your reviewer**:

**Release note**:
```release-note
Implements rolling update for StatefulSets. Updates can be performed using the RollingUpdate, Paritioned, or OnDelete strategies. OnDelete implements the manual behavior from 1.6. status now tracks 
replicas, readyReplicas, currentReplicas, and updatedReplicas. The semantics of replicas is now consistent with DaemonSet and ReplicaSet, and readyReplicas has the semantics that replicas did prior to this release.
```
This commit is contained in:
Kubernetes Submit Queue
2017-06-07 00:27:53 -07:00
committed by GitHub
56 changed files with 9132 additions and 864 deletions

View File

@@ -111,6 +111,7 @@ filegroup(
"//pkg/controller/disruption:all-srcs",
"//pkg/controller/endpoint:all-srcs",
"//pkg/controller/garbagecollector:all-srcs",
"//pkg/controller/history:all-srcs",
"//pkg/controller/job:all-srcs",
"//pkg/controller/namespace:all-srcs",
"//pkg/controller/node:all-srcs",

View File

@@ -0,0 +1,69 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["controller_history_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = ["controller_history.go"],
tags = ["automanaged"],
deps = [
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/hash:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

16
pkg/controller/history/OWNERS Executable file
View File

@@ -0,0 +1,16 @@
approvers:
- bprashanth
- enisoc
- foxish
- janetkuo
- kargakis
- kow3ns
- smarterclayton
reviewers:
- bprashanth
- enisoc
- foxish
- janetkuo
- kargakis
- kow3ns
- smarterclayton

View File

@@ -0,0 +1,490 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package history
import (
"bytes"
"fmt"
"hash/fnv"
"sort"
"strconv"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
hashutil "k8s.io/kubernetes/pkg/util/hash"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/client/retry"
)
// ControllerRevisionHashLabel is the label used to indicate the hash value of a ControllerRevision's Data.
const ControllerRevisionHashLabel = "controller.kubernetes.io/hash"
// ControllerRevisionName returns the Name for a ControllerRevision in the form prefix-hash. If the length
// of prefix is greater than 223 bytes, it is truncated to allow for a name that is no larger than 253 bytes.
func ControllerRevisionName(prefix string, hash uint32) string {
if len(prefix) > 223 {
prefix = prefix[:223]
}
return fmt.Sprintf("%s-%d", prefix, hash)
}
// NewControllerRevision returns the a ControllerRevision with a ControllerRef pointing parent and indicating that
// parent is of parentKind. The ControllerRevision has labels matching selector, contains Data equal to data, and
// has a Revision equal to revision. If the returned error is nil, the returned ControllerRevision is valid. If the
// returned error is not nil, the returned ControllerRevision is invalid for use.
func NewControllerRevision(parent metav1.Object,
parentKind schema.GroupVersionKind,
selector labels.Selector,
data runtime.RawExtension,
revision int64) (*apps.ControllerRevision, error) {
labelMap, err := labels.ConvertSelectorToLabelsMap(selector.String())
if err != nil {
return nil, err
}
blockOwnerDeletion := true
isController := true
cr := &apps.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Labels: labelMap,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: parentKind.GroupVersion().String(),
Kind: parentKind.Kind,
Name: parent.GetName(),
UID: parent.GetUID(),
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
},
},
},
Data: data,
Revision: revision,
}
hash := HashControllerRevision(cr, nil)
cr.Name = ControllerRevisionName(parent.GetName(), hash)
cr.Labels[ControllerRevisionHashLabel] = strconv.FormatInt(int64(hash), 10)
return cr, nil
}
// HashControllerRevision hashes the contents of revision's Data using FNV hashing. If probe is not nil, the byte value
// of probe is added written to the hash as well.
func HashControllerRevision(revision *apps.ControllerRevision, probe *uint32) uint32 {
hf := fnv.New32()
if len(revision.Data.Raw) > 0 {
hf.Write(revision.Data.Raw)
}
if revision.Data.Object != nil {
hashutil.DeepHashObject(hf, revision.Data.Object)
}
if probe != nil {
hf.Write([]byte(strconv.FormatInt(int64(*probe), 10)))
}
return hf.Sum32()
}
// SortControllerRevisions sorts revisions by their Revision.
func SortControllerRevisions(revisions []*apps.ControllerRevision) {
sort.Sort(byRevision(revisions))
}
// EqualRevision returns true if lhs and rhs are either both nil, or both point to non-nil ControllerRevisions that
// contain semantically equivalent data. Otherwise this method returns false.
func EqualRevision(lhs *apps.ControllerRevision, rhs *apps.ControllerRevision) bool {
var lhsHash, rhsHash *uint32
if lhs == nil || rhs == nil {
return lhs == rhs
}
if hs, found := lhs.Labels[ControllerRevisionHashLabel]; found {
hash, err := strconv.ParseInt(hs, 10, 32)
if err == nil {
lhsHash = new(uint32)
*lhsHash = uint32(hash)
}
}
if hs, found := rhs.Labels[ControllerRevisionHashLabel]; found {
hash, err := strconv.ParseInt(hs, 10, 32)
if err == nil {
rhsHash = new(uint32)
*rhsHash = uint32(hash)
}
}
if lhsHash != nil && rhsHash != nil && *lhsHash != *rhsHash {
return false
}
return bytes.Equal(lhs.Data.Raw, rhs.Data.Raw) && apiequality.Semantic.DeepEqual(lhs.Data.Object, rhs.Data.Object)
}
// FindEqualRevisions returns all ControllerRevisions in revisions that are equal to needle using EqualRevision as the
// equality test. The returned slice preserves the order of revisions.
func FindEqualRevisions(revisions []*apps.ControllerRevision, needle *apps.ControllerRevision) []*apps.ControllerRevision {
var eq []*apps.ControllerRevision
for i := range revisions {
if EqualRevision(revisions[i], needle) {
eq = append(eq, revisions[i])
}
}
return eq
}
// byRevision implements sort.Interface to allow ControllerRevisions to be sorted by Revision.
type byRevision []*apps.ControllerRevision
func (br byRevision) Len() int {
return len(br)
}
func (br byRevision) Less(i, j int) bool {
return br[i].Revision < br[j].Revision
}
func (br byRevision) Swap(i, j int) {
br[i], br[j] = br[j], br[i]
}
// Interface provides an interface allowing for management of a Controller's history as realized by recorded
// ControllerRevisions. An instance of Interface can be retrieved from NewHistory. Implementations must treat all
// pointer parameters as "in" parameter, and they must not be mutated.
type Interface interface {
// ListControllerRevisions lists all ControllerRevisions matching selector and owned by parent or no other
// controller. If the returned error is nil the returned slice of ControllerRevisions is valid. If the
// returned error is not nil, the returned slice is not valid.
ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error)
// CreateControllerRevision attempts to create the revision as owned by parent via a ControllerRef. If name
// collision occurs, a unique identifier is added to the hash of the revision and it is renamed using
// ControllerRevisionName. Implementations may cease to attempt to retry creation after some number of attempts
// and return an error. If the returned error is not nil, creation failed. If the returned error is nil, the
// returned ControllerRevision has been created.
CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error)
// DeleteControllerRevision attempts to delete revision. If the returned error is not nil, deletion has failed.
DeleteControllerRevision(revision *apps.ControllerRevision) error
// UpdateControllerRevision updates revision such that its Revision is equal to newRevision. Implementations
// may retry on conflict. If the returned error is nil, the update was successful and returned ControllerRevision
// is valid. If the returned error is not nil, the update failed and the returned ControllerRevision is invalid.
UpdateControllerRevision(revision *apps.ControllerRevision, newRevision int64) (*apps.ControllerRevision, error)
// AdoptControllerRevision attempts to adopt revision by adding a ControllerRef indicating that the parent
// Object of parentKind is the owner of revision. If revision is already owned, an error is returned. If the
// resource patch fails, an error is returned. If no error is returned, the returned ControllerRevision is
// valid.
AdoptControllerRevision(parent metav1.Object, parentKind schema.GroupVersionKind, revision *apps.ControllerRevision) (*apps.ControllerRevision, error)
// ReleaseControllerRevision attempts to release parent's ownership of revision by removing parent from the
// OwnerReferences of revision. If an error is returned, parent remains the owner of revision. If no error is
// returned, the returned ControllerRevision is valid.
ReleaseControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error)
}
// NewHistory returns an instance of Interface that uses client to communicate with the API Server and lister to list
// ControllerRevisions. This method should be used to create an Interface for all scenarios other than testing.
func NewHistory(client clientset.Interface, lister appslisters.ControllerRevisionLister) Interface {
return &realHistory{client, lister}
}
// NewFakeHistory returns an instance of Interface that uses informer to create, update, list, and delete
// ControllerRevisions. This method should be used to create an Interface for testing purposes.
func NewFakeHistory(informer appsinformers.ControllerRevisionInformer) Interface {
return &fakeHistory{informer.Informer().GetIndexer(), informer.Lister()}
}
type realHistory struct {
client clientset.Interface
lister appslisters.ControllerRevisionLister
}
func (rh *realHistory) ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error) {
// List all revisions in the namespace that match the selector
history, err := rh.lister.ControllerRevisions(parent.GetNamespace()).List(selector)
if err != nil {
return nil, err
}
var owned []*apps.ControllerRevision
for i := range history {
ref := controller.GetControllerOf(history[i])
if ref == nil || ref.UID == parent.GetUID() {
owned = append(owned, history[i])
}
}
return owned, err
}
func (rh *realHistory) CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Initialize the probe to 0
probe := uint32(0)
// Clone the input
any, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := any.(*apps.ControllerRevision)
// Continue to attempt to create the revision updating the name with a new hash on each iteration
for {
var hash uint32
// The first attempt uses no probe to resolve collisions
if probe > 0 {
hash = HashControllerRevision(revision, &probe)
} else {
hash = HashControllerRevision(revision, nil)
}
// Update the revisions name and labels
clone.Name = ControllerRevisionName(parent.GetName(), hash)
created, err := rh.client.Apps().ControllerRevisions(parent.GetNamespace()).Create(clone)
if errors.IsAlreadyExists(err) {
probe++
continue
}
return created, err
}
}
func (rh *realHistory) UpdateControllerRevision(revision *apps.ControllerRevision, newRevision int64) (*apps.ControllerRevision, error) {
obj, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj.(*apps.ControllerRevision)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if clone.Revision == newRevision {
return nil
}
clone.Revision = newRevision
updated, updateErr := rh.client.Apps().ControllerRevisions(clone.Namespace).Update(clone)
if updateErr == nil {
return nil
}
if updated != nil {
clone = updated
}
if updated, err := rh.lister.ControllerRevisions(clone.Namespace).Get(clone.Name); err == nil {
// make a copy so we don't mutate the shared cache
obj, err := scheme.Scheme.DeepCopy(updated)
if err != nil {
return err
}
clone = obj.(*apps.ControllerRevision)
}
return updateErr
})
return clone, err
}
func (rh *realHistory) DeleteControllerRevision(revision *apps.ControllerRevision) error {
return rh.client.Apps().ControllerRevisions(revision.Namespace).Delete(revision.Name, nil)
}
func (rh *realHistory) AdoptControllerRevision(parent metav1.Object, parentKind schema.GroupVersionKind, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Return an error if the parent does not own the revision
if owner := controller.GetControllerOf(revision); owner != nil {
return nil, fmt.Errorf("attempt to adopt revision owned by %v", owner)
}
// Use strategic merge patch to add an owner reference indicating a controller ref
return rh.client.Apps().ControllerRevisions(parent.GetNamespace()).Patch(revision.GetName(),
types.StrategicMergePatchType, []byte(fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
parentKind.GroupVersion().String(), parentKind.Kind,
parent.GetName(), parent.GetUID(), revision.UID)))
}
func (rh *realHistory) ReleaseControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Use strategic merge patch to add an owner reference indicating a controller ref
released, err := rh.client.Apps().ControllerRevisions(revision.GetNamespace()).Patch(revision.GetName(),
types.StrategicMergePatchType,
[]byte(fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, parent.GetUID(), revision.UID)))
if err != nil {
if errors.IsNotFound(err) {
// We ignore deleted revisions
return nil, nil
}
if errors.IsInvalid(err) {
// We ignore cases where the parent no longer owns the revision or where the revision has no
// owner.
return nil, nil
}
}
return released, err
}
type fakeHistory struct {
indexer cache.Indexer
lister appslisters.ControllerRevisionLister
}
func (fh *fakeHistory) ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error) {
history, err := fh.lister.ControllerRevisions(parent.GetNamespace()).List(selector)
if err != nil {
return nil, err
}
var owned []*apps.ControllerRevision
for i := range history {
ref := controller.GetControllerOf(history[i])
if ref == nil || ref.UID == parent.GetUID() {
owned = append(owned, history[i])
}
}
return owned, err
}
func (fh *fakeHistory) addRevision(revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return nil, err
}
obj, found, err := fh.indexer.GetByKey(key)
if err != nil {
return nil, err
}
if found {
foundRevision := obj.(*apps.ControllerRevision)
return foundRevision, errors.NewAlreadyExists(apps.Resource("controllerrevision"), revision.Name)
}
return revision, fh.indexer.Update(revision)
}
func (fh *fakeHistory) CreateControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
// Initialize the probe to 0
probe := uint32(0)
// Clone the input
any, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := any.(*apps.ControllerRevision)
clone.Namespace = parent.GetNamespace()
// Continue to attempt to create the revision updating the name with a new hash on each iteration
for {
var hash uint32
// The first attempt uses no probe to resolve collisions
if probe > 0 {
hash = HashControllerRevision(revision, &probe)
} else {
hash = HashControllerRevision(revision, nil)
}
// Update the revisions name and labels
clone.Name = ControllerRevisionName(parent.GetName(), hash)
created, err := fh.addRevision(clone)
if errors.IsAlreadyExists(err) {
probe++
continue
}
return created, err
}
}
func (fh *fakeHistory) DeleteControllerRevision(revision *apps.ControllerRevision) error {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return err
}
obj, found, err := fh.indexer.GetByKey(key)
if err != nil {
return err
}
if !found {
return errors.NewNotFound(apps.Resource("controllerrevisions"), revision.Name)
}
return fh.indexer.Delete(obj)
}
func (fh *fakeHistory) UpdateControllerRevision(revision *apps.ControllerRevision, newRevision int64) (*apps.ControllerRevision, error) {
obj, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj.(*apps.ControllerRevision)
clone.Revision = newRevision
return clone, fh.indexer.Update(clone)
}
func (fh *fakeHistory) AdoptControllerRevision(parent metav1.Object, parentKind schema.GroupVersionKind, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
blockOwnerDeletion := true
isController := true
if owner := controller.GetControllerOf(revision); owner != nil {
return nil, fmt.Errorf("attempt to adopt revision owned by %v", owner)
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return nil, err
}
_, found, err := fh.indexer.GetByKey(key)
if err != nil {
return nil, err
}
if !found {
return nil, errors.NewNotFound(apps.Resource("controllerrevisions"), revision.Name)
}
obj2, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj2.(*apps.ControllerRevision)
clone.OwnerReferences = append(clone.OwnerReferences, metav1.OwnerReference{
APIVersion: parentKind.GroupVersion().String(),
Kind: parentKind.Kind,
Name: parent.GetName(),
UID: parent.GetUID(),
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
})
return clone, fh.indexer.Update(clone)
}
func (fh *fakeHistory) ReleaseControllerRevision(parent metav1.Object, revision *apps.ControllerRevision) (*apps.ControllerRevision, error) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(revision)
if err != nil {
return nil, err
}
_, found, err := fh.indexer.GetByKey(key)
if err != nil {
return nil, err
}
if !found {
return nil, nil
}
obj2, err := scheme.Scheme.DeepCopy(revision)
if err != nil {
return nil, err
}
clone := obj2.(*apps.ControllerRevision)
refs := clone.OwnerReferences
clone.OwnerReferences = nil
for i := range refs {
if refs[i].UID != parent.GetUID() {
clone.OwnerReferences = append(clone.OwnerReferences, refs[i])
}
}
return clone, fh.indexer.Update(clone)
}

File diff suppressed because it is too large Load Diff

View File

@@ -14,6 +14,7 @@ go_library(
"stateful_pod_control.go",
"stateful_set.go",
"stateful_set_control.go",
"stateful_set_status_updater.go",
"stateful_set_utils.go",
],
tags = ["automanaged"],
@@ -29,12 +30,15 @@ go_library(
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/history:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
@@ -50,6 +54,7 @@ go_test(
srcs = [
"stateful_pod_control_test.go",
"stateful_set_control_test.go",
"stateful_set_status_updater_test.go",
"stateful_set_test.go",
"stateful_set_utils_test.go",
],
@@ -68,6 +73,7 @@ go_test(
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/history:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@@ -77,6 +83,7 @@ go_test(
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme:go_default_library",
],
)

View File

@@ -49,10 +49,6 @@ type StatefulPodControlInterface interface {
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
// the returned error is nil.
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any
// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
// returned error is nil, and set has its status updated.
UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error
}
func NewRealStatefulPodControl(
@@ -93,7 +89,6 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
attemptedUpdate := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
consistent := true
@@ -149,30 +144,6 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
return err
}
func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
set.Status.Replicas = replicas
set.Status.ObservedGeneration = &generation
_, updateErr := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if updateErr == nil {
return nil
}
if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := scheme.Scheme.DeepCopy(updated); err == nil {
set = copy.(*apps.StatefulSet)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
}
return updateErr
})
}
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {

View File

@@ -29,9 +29,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
@@ -461,116 +459,6 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
}
}
func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdatesObservedGeneration(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
sts := update.GetObject().(*apps.StatefulSet)
if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) {
t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name)
}
return true, sts, nil
})
if err := control.UpdateStatefulSetStatus(set, 2, 3); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
}
func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
t.Error("Failed update did not return error")
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
conflict := false
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
if !conflict {
conflict = true
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
} else {
return true, update.GetObject(), nil
}
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
})
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)

View File

@@ -40,6 +40,7 @@ import (
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"github.com/golang/glog"
)
@@ -80,6 +81,7 @@ func NewStatefulSetController(
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
@@ -95,8 +97,9 @@ func NewStatefulSetController(
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder,
),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
@@ -305,6 +308,32 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s
return cm.ClaimPods(pods, filter)
}
// adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
revisions, err := ssc.control.ListRevisions(set)
if err != nil {
return err
}
hasOrphans := false
for i := range revisions {
if controller.GetControllerOf(revisions[i]) == nil {
hasOrphans = true
break
}
}
if hasOrphans {
fresh, err := ssc.kubeClient.AppsV1beta1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
if err != nil {
return err
}
if fresh.UID != set.UID {
return fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
}
return ssc.control.AdoptOrphanRevisions(set, revisions)
}
return nil
}
// getStatefulSetsForPod returns a list of StatefulSets that potentially match
// a given pod.
func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
@@ -406,6 +435,10 @@ func (ssc *StatefulSetController) sync(key string) error {
return nil
}
if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err

View File

@@ -17,12 +17,14 @@ limitations under the License.
package statefulset
import (
"fmt"
"math"
"sort"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller/history"
"github.com/golang/glog"
)
@@ -36,18 +38,30 @@ type StatefulSetControlInterface interface {
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
// successful the returned error is nil.
AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
}
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. You should use an instance returned from
// NewRealStatefulPodControl() for any scenario other than testing.
func NewDefaultStatefulSetControl(podControl StatefulPodControlInterface) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl}
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
podControl StatefulPodControlInterface,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
}
type defaultStatefulSetControl struct {
podControl StatefulPodControlInterface
podControl StatefulPodControlInterface
statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface
}
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
@@ -57,20 +71,223 @@ type defaultStatefulSetControl struct {
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
}
history.SortControllerRevisions(revisions)
// get the current, and update revisions
currentRevision, updateRevision, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return err
}
// perform the main update function and get the status
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, pods)
if err != nil {
return err
}
// update the set's status
err = ssc.updateStatefulSetStatus(set, status)
if err != nil {
return err
}
glog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
status.Replicas,
status.ReadyReplicas,
status.CurrentReplicas,
status.UpdatedReplicas)
glog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
status.CurrentRevision,
status.UpdateRevision)
// maintain the set's revision history limit
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return ssc.controllerHistory.ListControllerRevisions(set, selector)
}
func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) error {
for i := range revisions {
adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
if err != nil {
return err
}
revisions[i] = adopted
}
return nil
}
// truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
// CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
// considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
// only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
// expects that revisions is sorted when supplied.
func (ssc *defaultStatefulSetControl) truncateHistory(
set *apps.StatefulSet,
pods []*v1.Pod,
revisions []*apps.ControllerRevision,
current *apps.ControllerRevision,
update *apps.ControllerRevision) error {
history := make([]*apps.ControllerRevision, 0, len(revisions))
// mark all live revisions
live := map[string]bool{current.Name: true, update.Name: true}
for i := range pods {
live[getPodRevision(pods[i])] = true
}
// collect live revisions and historic revisions
for i := range revisions {
if !live[revisions[i].Name] {
history = append(history, revisions[i])
}
}
historyLen := len(history)
historyLimit := int(*set.Spec.RevisionHistoryLimit)
if historyLen <= historyLimit {
return nil
}
// delete any non-live history to maintain the revision limit.
history = history[:(historyLen - historyLimit)]
for i := 0; i < len(history); i++ {
if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
return err
}
}
return nil
}
// getStatefulSetRevisions returns the current and update ControllerRevisions for set. This method may create a new revision,
// or modify the Revision of an existing revision if an update to set is detected. This method expects that revisions
// is sorted when supplied.
func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
set *apps.StatefulSet,
revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisionCount := len(revisions)
history.SortControllerRevisions(revisions)
// create a new revision from the current set
updateRevision, err := newRevision(set, nextRevision(revisions))
if err != nil {
return nil, nil, err
}
// find any equivalent revisions
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
// if the equivalent revision is immediately prior the update revision has not changed
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
// if the equivalent revision is not immediately prior we will roll back by incrementing the
// Revision of the equivalent revision
updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
equalRevisions[equalCount-1],
updateRevision.Revision)
if err != nil {
return nil, nil, err
}
} else {
//if there is no equivalent revision we create a new one
updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision)
if err != nil {
return nil, nil, err
}
}
// attempt to find the revision that corresponds to the current revision
for i := range revisions {
if revisions[i].Name == set.Status.CurrentRevision {
currentRevision = revisions[i]
}
}
// if the current revision is nil we initialize the history by setting it to the update revision
if currentRevision == nil {
currentRevision = updateRevision
}
return currentRevision, updateRevision, nil
}
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
// the set in order to conform the system to the target state for the set. The target state always contains
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
// RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
// If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
// the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
// update must be recorded. If the error is not nil, the method should be retried until successful.
func (ssc *defaultStatefulSetControl) updateStatefulSet(
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// get the current and update revisions of the set.
currentSet, err := applyRevision(set, currentRevision)
if err != nil {
return nil, err
}
updateSet, err := applyRevision(set, updateRevision)
if err != nil {
return nil, err
}
// set the generation, and revisions in the returned status
status := apps.StatefulSetStatus{}
status.ObservedGeneration = new(int64)
*status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
condemned := make([]*v1.Pod, 0, len(pods))
ready := 0
unhealthy := 0
firstUnhealthyOrdinal := math.MaxInt32
var firstUnhealthyPod *v1.Pod
// First we partition pods into two lists valid replicas and condemned Pods
for i := range pods {
//count the number of running and ready replicas
status.Replicas++
// count the number of running and ready replicas
if isRunningAndReady(pods[i]) {
ready++
status.ReadyReplicas++
}
// count the number of current and update replicas
if isCreated(pods[i]) && !isTerminating(pods[i]) {
if getPodRevision(pods[i]) == currentRevision.Name {
status.CurrentReplicas++
} else if getPodRevision(pods[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
}
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
// if the ordinal of the pod is within the range of the current number of replicas,
// insert it at the indirection of its ordinal
@@ -83,45 +300,53 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newStatefulSetPod(set, ord)
}
}
// count the number of unhealthy pods
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
}
}
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}
// sort the condemned Pods by their ordinals
sort.Sort(ascendingOrdinal(condemned))
// if the current number of replicas has changed update the statefulSets replicas
if set.Status.Replicas != int32(ready) || set.Status.ObservedGeneration == nil || set.Generation > *set.Status.ObservedGeneration {
obj, err := scheme.Scheme.Copy(set)
if err != nil {
return fmt.Errorf("unable to copy set: %v", err)
// find the first unhealthy Pod
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = replicas[i]
}
}
set = obj.(*apps.StatefulSet)
}
if err := ssc.podControl.UpdateStatefulSetStatus(set, int32(ready), set.Generation); err != nil {
return err
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = condemned[i]
}
}
}
if unhealthy > 0 {
glog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
set.Namespace,
set.Name,
unhealthy,
firstUnhealthyPod.Name)
}
// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return nil
return &status, nil
}
monotonic := !allowsBurst(set)
@@ -130,20 +355,41 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
for i := range replicas {
// delete and recreate failed pods
if isFailed(replicas[i]) {
glog.V(2).Infof("StatefulSet %s is recreating failed Pod %s", set.Name, replicas[i].Name)
glog.V(4).Infof("StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return err
return &status, err
}
replicas[i] = newStatefulSetPod(set, i)
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
} else if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return err
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
} else if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
// if the set does not allow bursting, return immediately
if monotonic {
return nil
return &status, nil
}
// pod created, no more work possible for this round
continue
@@ -151,15 +397,23 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate", set.Name, replicas[i].Name)
return nil
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to be Running and Ready", set.Name, replicas[i].Name)
return nil
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// Enforce the StatefulSet invariants
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
@@ -168,31 +422,120 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// Make a deep copy so we don't mutate the shared cache
copy, err := scheme.Scheme.DeepCopy(replicas[i])
if err != nil {
return err
return &status, err
}
replica := copy.(*v1.Pod)
if err := ssc.podControl.UpdateStatefulPod(set, replica); err != nil {
return err
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
return &status, err
}
}
// At this point, all of the current Replicas are Running and Ready, we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
// Note that we do not resurrect Pods in this interval.
if unhealthy > 0 && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting on %d Pods", set.Name, unhealthy)
return nil
}
// Note that we do not resurrect Pods in this interval. Also not that scaling will take precedence over
// updates.
for target := len(condemned) - 1; target >= 0; target-- {
glog.V(2).Infof("StatefulSet %s terminating Pod %s", set.Name, condemned[target])
// wait for terminating pods to expire
if isTerminating(condemned[target]) {
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
set.Namespace,
set.Name,
condemned[target].Name)
// block if we are in monotonic mode
if monotonic {
return &status, nil
}
continue
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
glog.V(4).Infof("StatefulSet %s/%s terminating Pod %s for scale dowm",
set.Namespace,
set.Name,
condemned[target].Name)
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return err
return &status, err
}
if getPodRevision(condemned[target]) == currentRevision.Name {
status.CurrentReplicas--
} else if getPodRevision(condemned[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
if monotonic {
return nil
return &status, nil
}
}
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
}
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
if set.Spec.UpdateStrategy.Type == apps.PartitionStatefulSetStrategyType {
updateMin = int(set.Spec.UpdateStrategy.Partition.Ordinal)
}
// we terminate the Pod with the largest ordinal that does not match the update revision.
for target := len(replicas) - 1; target >= updateMin; target-- {
// all replicas should be healthy before an update progresses we allow termination of the firstUnhealthy
// Pod in any state allow for rolling back a failed update.
if !isRunningAndReady(replicas[target]) && replicas[target] != firstUnhealthyPod {
glog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to update",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
if getPodRevision(replicas[target]) != updateRevision.Name {
glog.V(4).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
status.CurrentReplicas--
return &status, err
}
}
return &status, nil
}
// updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
// mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
// returned error is nil, the update is successful.
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// complete any in progress rolling update if necessary
completeRollingUpdate(set, status)
// if the status is not inconsistent do not perform an update
if !inconsistentStatus(set, status) {
return nil
}
// copy set and update its status
obj, err := scheme.Scheme.Copy(set)
if err != nil {
return err
}
set = obj.(*apps.StatefulSet)
if err := ssc.statusUpdater.UpdateStatefulSetStatus(set, status); err != nil {
return err
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,78 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/retry"
)
// StatefulSetStatusUpdaterInterface is an interface used to update the StatefulSetStatus associated with a StatefulSet.
// For any use other than testing, clients should create an instance using NewRealStatefulSetStatusUpdater.
type StatefulSetStatusUpdaterInterface interface {
// UpdateStatefulSetStatus sets the set's Status to status. Implementations are required to retry on conflicts,
// but fail on other errors. If the returned error is nil set's Status has been successfully set to status.
UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error
}
// NewRealStatefulSetStatusUpdater returns a StatefulSetStatusUpdaterInterface that updates the Status of a StatefulSet,
// using the supplied client and setLister.
func NewRealStatefulSetStatusUpdater(
client clientset.Interface,
setLister appslisters.StatefulSetLister) StatefulSetStatusUpdaterInterface {
return &realStatefulSetStatusUpdater{client, setLister}
}
type realStatefulSetStatusUpdater struct {
client clientset.Interface
setLister appslisters.StatefulSetLister
}
func (ssu *realStatefulSetStatusUpdater) UpdateStatefulSetStatus(
set *apps.StatefulSet,
status *apps.StatefulSetStatus) error {
// don't wait due to limited number of clients, but backoff after the default number of steps
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
set.Status = *status
_, updateErr := ssu.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if updateErr == nil {
return nil
}
if updated, err := ssu.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := scheme.Scheme.DeepCopy(updated); err == nil {
set = copy.(*apps.StatefulSet)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
}
return updateErr
})
}
var _ StatefulSetStatusUpdaterInterface = &realStatefulSetStatusUpdater{}

View File

@@ -0,0 +1,141 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"errors"
"testing"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
)
func TestStatefulSetUpdaterUpdatesSetStatus(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(1)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
updater := NewRealStatefulSetStatusUpdater(fakeClient, nil)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
}
func TestStatefulSetStatusUpdaterUpdatesObservedGeneration(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
updater := NewRealStatefulSetStatusUpdater(fakeClient, nil)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
sts := update.GetObject().(*apps.StatefulSet)
if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) {
t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name)
}
return true, sts, nil
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
}
func TestStatefulSetStatusUpdaterUpdateReplicasFailure(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
updater := NewRealStatefulSetStatusUpdater(fakeClient, setLister)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := updater.UpdateStatefulSetStatus(set, &status); err == nil {
t.Error("Failed update did not return error")
}
}
func TestStatefulSetStatusUpdaterUpdateReplicasConflict(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
conflict := false
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
updater := NewRealStatefulSetStatusUpdater(fakeClient, setLister)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
if !conflict {
conflict = true
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
} else {
return true, update.GetObject(), nil
}
})
if err := updater.UpdateStatefulSetStatus(set, &status); err != nil {
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
}
if set.Status.Replicas != 2 {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
}
func TestStatefulSetStatusUpdaterUpdateReplicasConflictFailure(t *testing.T) {
set := newStatefulSet(3)
status := apps.StatefulSetStatus{ObservedGeneration: func() *int64 {
i := int64(3)
return &i
}(), Replicas: 2}
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
updater := NewRealStatefulSetStatusUpdater(fakeClient, setLister)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
})
if err := updater.UpdateStatefulSetStatus(set, &status); err == nil {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
}
}

View File

@@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
)
func alwaysReady() bool { return true }
@@ -578,15 +579,18 @@ func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSe
client := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
fpc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewStatefulSetController(
informerFactory.Core().V1().Pods(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Apps().V1beta1().ControllerRevisions(),
client,
)
ssh := history.NewFakeHistory(informerFactory.Apps().V1beta1().ControllerRevisions())
ssc.podListerSynced = alwaysReady
ssc.setListerSynced = alwaysReady
ssc.control = NewDefaultStatefulSetControl(fpc)
ssc.control = NewDefaultStatefulSetControl(fpc, ssu, ssh)
return ssc, fpc
}
@@ -614,7 +618,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
if err != nil {
return err
}
for set.Status.Replicas < *set.Spec.Replicas {
for set.Status.ReadyReplicas < *set.Spec.Replicas {
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
ord := len(pods) - 1
pod := getPodAtOrdinal(pods, ord)

View File

@@ -17,15 +17,23 @@ limitations under the License.
package statefulset
import (
"encoding/json"
"fmt"
"regexp"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"
"github.com/golang/glog"
)
@@ -36,6 +44,7 @@ const maxUpdateRetries = 10
// updateConflictError is the error used to indicate that the maximum number of retries against the API server have
// been attempted and we need to back off
var updateConflictError = fmt.Errorf("aborting update after %d attempts", maxUpdateRetries)
var patchCodec = api.Codecs.LegacyCodec(apps.SchemeGroupVersion)
// overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker.
// Generally used to tie break between StatefulSets that have overlapping selectors.
@@ -246,6 +255,23 @@ func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference {
}
}
// setPodRevision sets the revision of Pod to revision by adding the StatefulSetRevisionLabel
func setPodRevision(pod *v1.Pod, revision string) {
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[apps.StatefulSetRevisionLabel] = revision
}
// getPodRevision gets the revision of Pod by inspecting the StatefulSetRevisionLabel. If pod has no revision the empty
// string is returned.
func getPodRevision(pod *v1.Pod) string {
if pod.Labels == nil {
return ""
}
return pod.Labels[apps.StatefulSetRevisionLabel]
}
// newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal.
func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, newControllerRef(set))
@@ -255,6 +281,122 @@ func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
return pod
}
// newVersionedStatefulSetPod creates a new Pod for a StatefulSet. currentSet is the representation of the set at the
// current revision. updateSet is the representation of the set at the updateRevision. currentRevision is the name of
// the current revision. updateRevision is the name of the update revision. ordinal is the ordinal of the Pod. If the
// returned error is nil, the returned Pod is valid.
func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
if (currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
ordinal < int(currentSet.Status.CurrentReplicas)) ||
(currentSet.Spec.UpdateStrategy.Type == apps.PartitionStatefulSetStrategyType &&
ordinal < int(currentSet.Spec.UpdateStrategy.Partition.Ordinal)) {
pod := newStatefulSetPod(currentSet, ordinal)
setPodRevision(pod, currentRevision)
return pod
}
pod := newStatefulSetPod(updateSet, ordinal)
setPodRevision(pod, updateRevision)
return pod
}
// getPatch returns a strategic merge patch that can be applied to restore a StatefulSet to a
// previous version. If the returned error is nil the patch is valid. The current state that we save is just the
// PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
// recorded patches.
func getPatch(set *apps.StatefulSet) ([]byte, error) {
str, err := runtime.Encode(patchCodec, set)
if err != nil {
return nil, err
}
var raw map[string]interface{}
json.Unmarshal([]byte(str), &raw)
objCopy := make(map[string]interface{})
specCopy := make(map[string]interface{})
spec := raw["spec"].(map[string]interface{})
template := spec["template"].(map[string]interface{})
specCopy["template"] = template
template["$patch"] = "replace"
objCopy["spec"] = specCopy
patch, err := json.Marshal(objCopy)
return patch, err
}
// newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
// ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
// to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
func newRevision(set *apps.StatefulSet, revision int64) (*apps.ControllerRevision, error) {
patch, err := getPatch(set)
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return history.NewControllerRevision(set,
controllerKind,
selector,
runtime.RawExtension{Raw: patch},
revision)
}
// applyRevision returns a new StatefulSet constructed by restoring the state in revision to set. If the returned error
// is nil, the returned StatefulSet is valid.
func applyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) {
obj, err := scheme.Scheme.DeepCopy(set)
if err != nil {
return nil, err
}
clone := obj.(*apps.StatefulSet)
patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone)
if err != nil {
return nil, err
}
err = json.Unmarshal(patched, clone)
if err != nil {
return nil, err
}
return clone, nil
}
// nextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
func nextRevision(revisions []*apps.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}
// inconsistentStatus returns true if the ObservedGeneration of status is greater than set's
// Generation or if any of the status's fields do not match those of set's status.
func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool {
return set.Status.ObservedGeneration == nil ||
*status.ObservedGeneration > *set.Status.ObservedGeneration ||
status.Replicas != set.Status.Replicas ||
status.CurrentReplicas != set.Status.CurrentReplicas ||
status.ReadyReplicas != set.Status.ReadyReplicas ||
status.UpdatedReplicas != set.Status.UpdatedReplicas ||
status.CurrentRevision != set.Status.CurrentRevision ||
status.UpdateRevision != set.Status.UpdateRevision
}
// completeRollingUpdate completes a rolling update when all of set's replica Pods have been updated
// to the updateRevision. status's currentRevision is set to updateRevision and its' updateRevision
// is set to the empty string. status's currentReplicas is set to updateReplicas and its updateReplicas
// are set to 0.
func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) {
if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
status.UpdatedReplicas == status.Replicas &&
status.ReadyReplicas == status.Replicas {
status.CurrentReplicas = status.UpdatedReplicas
status.CurrentRevision = status.UpdateRevision
}
}
// ascendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
// to the front of the list.

View File

@@ -32,6 +32,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
)
func TestGetParentNameAndOrdinal(t *testing.T) {
@@ -287,6 +288,26 @@ func TestNewPodControllerRef(t *testing.T) {
}
}
func TestCreateApplyRevision(t *testing.T) {
set := newStatefulSet(1)
revision, err := newRevision(set, 1)
if err != nil {
t.Fatal(err)
}
set.Spec.Template.Spec.Containers[0].Name = "foo"
restoredSet, err := applyRevision(set, revision)
if err != nil {
t.Fatal(err)
}
restoredRevision, err := newRevision(restoredSet, 2)
if err != nil {
t.Fatal(err)
}
if !history.EqualRevision(revision, restoredRevision) {
t.Errorf("wanted %v got %v", string(revision.Data.Raw), string(restoredRevision.Data.Raw))
}
}
func newPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
@@ -354,6 +375,11 @@ func newStatefulSetWithVolumes(replicas int, name string, petMounts []v1.VolumeM
Template: template,
VolumeClaimTemplates: claims,
ServiceName: "governingsvc",
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
RevisionHistoryLimit: func() *int32 {
limit := int32(2)
return &limit
}(),
},
}
}