Merge pull request #18876 from erictune/dynamic-job

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2016-01-25 08:06:22 -08:00
14 changed files with 325 additions and 91 deletions

View File

@@ -362,7 +362,33 @@ func (jm *JobController) syncJob(key string) error {
active = jm.manageJob(activePods, succeeded, &job)
}
completions := succeeded
if completions == *job.Spec.Completions {
complete := false
if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
if succeeded > 0 && active == 0 {
complete = true
}
} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
if completions >= *job.Spec.Completions {
complete = true
if active > 0 {
jm.recorder.Event(&job, api.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
}
if completions > *job.Spec.Completions {
jm.recorder.Event(&job, api.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
}
}
if complete {
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobComplete, "", ""))
now := unversioned.Now()
job.Status.CompletionTime = &now
@@ -453,15 +479,31 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex
wait.Wait()
} else if active < parallelism {
// how many executions are left to run
diff := *job.Spec.Completions - succeeded
// limit to parallelism and count active pods as well
if diff > parallelism {
diff = parallelism
wantActive := 0
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded
if wantActive > parallelism {
wantActive = parallelism
}
}
diff := wantActive - active
if diff < 0 {
glog.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)
diff = 0
}
diff -= active
jm.expectations.ExpectCreations(jobKey, diff)
glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff)
glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
active += diff
wait := sync.WaitGroup{}

View File

@@ -36,14 +36,12 @@ import (
var alwaysReady = func() bool { return true }
func newJob(parallelism, completions int) *extensions.Job {
return &extensions.Job{
j := &extensions.Job{
ObjectMeta: api.ObjectMeta{
Name: "foobar",
Namespace: api.NamespaceDefault,
},
Spec: extensions.JobSpec{
Parallelism: &parallelism,
Completions: &completions,
Selector: &extensions.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
@@ -61,6 +59,19 @@ func newJob(parallelism, completions int) *extensions.Job {
},
},
}
// Special case: -1 for either completions or parallelism means leave nil (negative is not allowed
// in practice by validation.
if completions >= 0 {
j.Spec.Completions = &completions
} else {
j.Spec.Completions = nil
}
if parallelism >= 0 {
j.Spec.Parallelism = &parallelism
} else {
j.Spec.Parallelism = nil
}
return j
}
func getKey(job *extensions.Job, t *testing.T) string {
@@ -114,16 +125,31 @@ func TestControllerSyncJob(t *testing.T) {
nil, 0, 0, 0,
2, 0, 2, 0, 0, false,
},
"WQ job start": {
2, -1,
nil, 0, 0, 0,
2, 0, 2, 0, 0, false,
},
"correct # of pods": {
2, 5,
nil, 2, 0, 0,
0, 0, 2, 0, 0, false,
},
"WQ job: correct # of pods": {
2, -1,
nil, 2, 0, 0,
0, 0, 2, 0, 0, false,
},
"too few active pods": {
2, 5,
nil, 1, 1, 0,
1, 0, 2, 1, 0, false,
},
"too few active pods with a dynamic job": {
2, -1,
nil, 1, 0, 0,
1, 0, 2, 0, 0, false,
},
"too few active pods, with controller error": {
2, 5,
fmt.Errorf("Fake error"), 1, 1, 0,
@@ -149,6 +175,21 @@ func TestControllerSyncJob(t *testing.T) {
nil, 0, 5, 0,
0, 0, 0, 5, 0, true,
},
"WQ job finishing": {
2, -1,
nil, 1, 1, 0,
0, 0, 1, 1, 0, false,
},
"WQ job all finished": {
2, -1,
nil, 0, 2, 0,
0, 0, 0, 2, 0, true,
},
"WQ job all finished despite one failure": {
2, -1,
nil, 0, 1, 1,
0, 0, 0, 1, 1, true,
},
"more active pods than completions": {
2, 5,
nil, 10, 0, 0,