kubectl: Support scaling deployments

This commit adds support for using kubectl scale to scale deployments. Makes use of the
deployments/scale endpoint instead of updating deployment.spec.replicas directly.
This commit is contained in:
Michail Kargakis
2015-11-13 13:44:03 +01:00
parent a7425bf070
commit 99fc35880b
11 changed files with 558 additions and 200 deletions

View File

@@ -28,74 +28,7 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
)
// ScalePrecondition describes a condition that must be true for the scale to take place
// If CurrentSize == -1, it is ignored.
// If CurrentResourceVersion is the empty string, it is ignored.
// Otherwise they must equal the values in the replication controller for it to be valid.
type ScalePrecondition struct {
Size int
ResourceVersion string
}
// A PreconditionError is returned when a replication controller fails to match
// the scale preconditions passed to kubectl.
type PreconditionError struct {
Precondition string
ExpectedValue string
ActualValue string
}
func (pe PreconditionError) Error() string {
return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
}
type ControllerScaleErrorType int
const (
ControllerScaleGetFailure ControllerScaleErrorType = iota
ControllerScaleUpdateFailure
ControllerScaleUpdateInvalidFailure
)
// A ControllerScaleError is returned when a scale request passes
// preconditions but fails to actually scale the controller.
type ControllerScaleError struct {
FailureType ControllerScaleErrorType
ResourceVersion string
ActualError error
}
func (c ControllerScaleError) Error() string {
return fmt.Sprintf(
"Scaling the controller failed with: %s; Current resource version %s",
c.ActualError, c.ResourceVersion)
}
// ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise
func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error {
if precondition.Size != -1 && controller.Spec.Replicas != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(controller.Spec.Replicas)}
}
if precondition.ResourceVersion != "" && controller.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, controller.ResourceVersion}
}
return nil
}
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise
func (precondition *ScalePrecondition) ValidateJob(job *extensions.Job) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
}
if precondition.Size != -1 && *job.Spec.Parallelism != precondition.Size {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(*job.Spec.Parallelism)}
}
if precondition.ResourceVersion != "" && job.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
}
return nil
}
// Scaler provides an interface for resources that can be scaled.
type Scaler interface {
// Scale scales the named resource after checking preconditions. It optionally
// retries in the event of resource version mismatch (if retry is not nil),
@@ -111,16 +44,54 @@ func ScalerFor(kind string, c client.Interface) (Scaler, error) {
case "ReplicationController":
return &ReplicationControllerScaler{c}, nil
case "Job":
return &JobScaler{c}, nil
return &JobScaler{c.Extensions()}, nil
case "Deployment":
return &DeploymentScaler{c.Extensions()}, nil
}
return nil, fmt.Errorf("no scaler has been implemented for %q", kind)
}
type ReplicationControllerScaler struct {
c client.Interface
// ScalePrecondition describes a condition that must be true for the scale to take place
// If CurrentSize == -1, it is ignored.
// If CurrentResourceVersion is the empty string, it is ignored.
// Otherwise they must equal the values in the resource for it to be valid.
type ScalePrecondition struct {
Size int
ResourceVersion string
}
type JobScaler struct {
c client.Interface
// A PreconditionError is returned when a resource fails to match
// the scale preconditions passed to kubectl.
type PreconditionError struct {
Precondition string
ExpectedValue string
ActualValue string
}
func (pe PreconditionError) Error() string {
return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
}
type ScaleErrorType int
const (
ScaleGetFailure ScaleErrorType = iota
ScaleUpdateFailure
ScaleUpdateInvalidFailure
)
// A ScaleError is returned when a scale request passes
// preconditions but fails to actually scale the controller.
type ScaleError struct {
FailureType ScaleErrorType
ResourceVersion string
ActualError error
}
func (c ScaleError) Error() string {
return fmt.Sprintf(
"Scaling the resource failed with: %v; Current resource version %s",
c.ActualError, c.ResourceVersion)
}
// RetryParams encapsulates the retry parameters used by kubectl's scaler.
@@ -136,15 +107,15 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams {
func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint) wait.ConditionFunc {
return func() (bool, error) {
err := r.ScaleSimple(namespace, name, precondition, count)
switch e, _ := err.(ControllerScaleError); err.(type) {
switch e, _ := err.(ScaleError); err.(type) {
case nil:
return true, nil
case ControllerScaleError:
case ScaleError:
// if it's invalid we shouldn't keep waiting
if e.FailureType == ControllerScaleUpdateInvalidFailure {
if e.FailureType == ScaleUpdateInvalidFailure {
return false, err
}
if e.FailureType == ControllerScaleUpdateFailure {
if e.FailureType == ScaleUpdateFailure {
return false, nil
}
}
@@ -152,10 +123,25 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
}
}
// ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise
func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error {
if precondition.Size != -1 && controller.Spec.Replicas != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(controller.Spec.Replicas)}
}
if len(precondition.ResourceVersion) != 0 && controller.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, controller.ResourceVersion}
}
return nil
}
type ReplicationControllerScaler struct {
c client.Interface
}
func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error {
controller, err := scaler.c.ReplicationControllers(namespace).Get(name)
if err != nil {
return ControllerScaleError{ControllerScaleGetFailure, "Unknown", err}
return ScaleError{ScaleGetFailure, "Unknown", err}
}
if preconditions != nil {
if err := preconditions.ValidateReplicationController(controller); err != nil {
@@ -166,11 +152,10 @@ func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, p
// TODO: do retry on 409 errors here?
if _, err := scaler.c.ReplicationControllers(namespace).Update(controller); err != nil {
if errors.IsInvalid(err) {
return ControllerScaleError{ControllerScaleUpdateInvalidFailure, controller.ResourceVersion, err}
return ScaleError{ScaleUpdateInvalidFailure, controller.ResourceVersion, err}
}
return ControllerScaleError{ControllerScaleUpdateFailure, controller.ResourceVersion, err}
return ScaleError{ScaleUpdateFailure, controller.ResourceVersion, err}
}
// TODO: do a better job of printing objects here.
return nil
}
@@ -200,11 +185,29 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize
return nil
}
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateJob(job *extensions.Job) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
}
if precondition.Size != -1 && *job.Spec.Parallelism != precondition.Size {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(*job.Spec.Parallelism)}
}
if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
}
return nil
}
type JobScaler struct {
c client.ExtensionsInterface
}
// ScaleSimple is responsible for updating job's parallelism.
func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error {
job, err := scaler.c.Extensions().Jobs(namespace).Get(name)
job, err := scaler.c.Jobs(namespace).Get(name)
if err != nil {
return ControllerScaleError{ControllerScaleGetFailure, "Unknown", err}
return ScaleError{ScaleGetFailure, "Unknown", err}
}
if preconditions != nil {
if err := preconditions.ValidateJob(job); err != nil {
@@ -213,12 +216,11 @@ func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *Scal
}
parallelism := int(newSize)
job.Spec.Parallelism = &parallelism
if _, err := scaler.c.Extensions().Jobs(namespace).Update(job); err != nil {
if _, err := scaler.c.Jobs(namespace).Update(job); err != nil {
if errors.IsInvalid(err) {
return ControllerScaleError{ControllerScaleUpdateInvalidFailure, job.ResourceVersion, err}
return ScaleError{ScaleUpdateInvalidFailure, job.ResourceVersion, err}
}
return ControllerScaleError{ControllerScaleUpdateFailure, job.ResourceVersion, err}
return ScaleError{ScaleUpdateFailure, job.ResourceVersion, err}
}
return nil
}
@@ -239,7 +241,7 @@ func (scaler *JobScaler) Scale(namespace, name string, newSize uint, preconditio
return err
}
if waitForReplicas != nil {
job, err := scaler.c.Extensions().Jobs(namespace).Get(name)
job, err := scaler.c.Jobs(namespace).Get(name)
if err != nil {
return err
}
@@ -248,3 +250,65 @@ func (scaler *JobScaler) Scale(namespace, name string, newSize uint, preconditio
}
return nil
}
// ValidateDeployment ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateDeployment(deployment *extensions.Deployment) error {
if precondition.Size != -1 && deployment.Spec.Replicas != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(deployment.Spec.Replicas)}
}
if len(precondition.ResourceVersion) != 0 && deployment.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, deployment.ResourceVersion}
}
return nil
}
type DeploymentScaler struct {
c client.ExtensionsInterface
}
// ScaleSimple is responsible for updating a deployment's desired replicas count.
func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error {
deployment, err := scaler.c.Deployments(namespace).Get(name)
if err != nil {
return ScaleError{ScaleGetFailure, "Unknown", err}
}
if preconditions != nil {
if err := preconditions.ValidateDeployment(deployment); err != nil {
return err
}
}
scale := extensions.ScaleFromDeployment(deployment)
scale.Spec.Replicas = int(newSize)
if _, err := scaler.c.Scales(namespace).Update("Deployment", scale); err != nil {
if errors.IsInvalid(err) {
return ScaleError{ScaleUpdateInvalidFailure, deployment.ResourceVersion, err}
}
return ScaleError{ScaleUpdateFailure, deployment.ResourceVersion, err}
}
return nil
}
// Scale updates a deployment to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for the status to reach desired count.
func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize)
if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
deployment, err := scaler.c.Deployments(namespace).Get(name)
if err != nil {
return err
}
return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout,
client.DeploymentHasDesiredReplicas(scaler.c, deployment))
}
return nil
}