mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-02 19:28:16 +00:00
Merge pull request #17940 from soltysh/job_deadline
Added ActiveDeadlineSeconds to jobs
This commit is contained in:
@@ -64,6 +64,8 @@ type JobController struct {
|
||||
|
||||
// Jobs that need to be updated
|
||||
queue *workqueue.Type
|
||||
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func NewJobController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
|
||||
@@ -75,10 +77,11 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn
|
||||
kubeClient: kubeClient,
|
||||
podControl: controller.RealPodControl{
|
||||
KubeClient: kubeClient,
|
||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job"}),
|
||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
|
||||
},
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.New(),
|
||||
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
|
||||
}
|
||||
|
||||
jm.jobStore.Store, jm.jobController = framework.NewInformer(
|
||||
@@ -322,16 +325,52 @@ func (jm *JobController) syncJob(key string) error {
|
||||
activePods := controller.FilterActivePods(podList.Items)
|
||||
active := len(activePods)
|
||||
succeeded, failed := getStatus(podList.Items)
|
||||
if jobNeedsSync {
|
||||
active = jm.manageJob(activePods, succeeded, &job)
|
||||
conditions := len(job.Status.Conditions)
|
||||
if job.Status.StartTime == nil {
|
||||
now := unversioned.Now()
|
||||
job.Status.StartTime = &now
|
||||
}
|
||||
completions := succeeded
|
||||
if completions == *job.Spec.Completions {
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition())
|
||||
if pastActiveDeadline(&job) {
|
||||
// if job was finished previously, we don't want to redo the termination
|
||||
if isJobFinished(&job) {
|
||||
return nil
|
||||
}
|
||||
// TODO: below code should be replaced with pod termination resulting in
|
||||
// pod failures, rather than killing pods. Unfortunately none such solution
|
||||
// exists ATM. There's an open discussion in the topic in
|
||||
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
|
||||
// some sort of solution to above problem.
|
||||
// kill remaining active pods
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(active)
|
||||
for i := 0; i < active; i++ {
|
||||
go func(ix int) {
|
||||
defer wait.Done()
|
||||
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil {
|
||||
defer util.HandleError(err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wait.Wait()
|
||||
// update status values accordingly
|
||||
failed += active
|
||||
active = 0
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
jm.recorder.Event(&job, api.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
|
||||
} else {
|
||||
if jobNeedsSync {
|
||||
active = jm.manageJob(activePods, succeeded, &job)
|
||||
}
|
||||
completions := succeeded
|
||||
if completions == *job.Spec.Completions {
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobComplete, "", ""))
|
||||
now := unversioned.Now()
|
||||
job.Status.CompletionTime = &now
|
||||
}
|
||||
}
|
||||
|
||||
// no need to update the job if the status hasn't changed since last time
|
||||
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed {
|
||||
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
|
||||
job.Status.Active = active
|
||||
job.Status.Succeeded = succeeded
|
||||
job.Status.Failed = failed
|
||||
@@ -344,21 +383,38 @@ func (jm *JobController) syncJob(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newCondition() extensions.JobCondition {
|
||||
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
|
||||
func pastActiveDeadline(job *extensions.Job) bool {
|
||||
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
|
||||
return false
|
||||
}
|
||||
now := unversioned.Now()
|
||||
start := job.Status.StartTime.Time
|
||||
duration := now.Time.Sub(start)
|
||||
allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
return duration >= allowedDuration
|
||||
}
|
||||
|
||||
func newCondition(conditionType extensions.JobConditionType, reason, message string) extensions.JobCondition {
|
||||
return extensions.JobCondition{
|
||||
Type: extensions.JobComplete,
|
||||
Type: conditionType,
|
||||
Status: api.ConditionTrue,
|
||||
LastProbeTime: unversioned.Now(),
|
||||
LastTransitionTime: unversioned.Now(),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
}
|
||||
}
|
||||
|
||||
// getStatus returns no of succeeded and failed pods running a job
|
||||
func getStatus(pods []api.Pod) (succeeded, failed int) {
|
||||
succeeded = filterPods(pods, api.PodSucceeded)
|
||||
failed = filterPods(pods, api.PodFailed)
|
||||
return
|
||||
}
|
||||
|
||||
// manageJob is the core method responsible for managing the number of running
|
||||
// pods according to what is specified in the job.Spec.
|
||||
func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *extensions.Job) int {
|
||||
var activeLock sync.Mutex
|
||||
active := len(activePods)
|
||||
@@ -447,7 +503,7 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int {
|
||||
|
||||
func isJobFinished(j *extensions.Job) bool {
|
||||
for _, c := range j.Status.Conditions {
|
||||
if c.Type == extensions.JobComplete && c.Status == api.ConditionTrue {
|
||||
if (c.Type == extensions.JobComplete || c.Type == extensions.JobFailed) && c.Status == api.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
@@ -209,22 +210,158 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
if actual.Status.Failed != tc.expectedFailed {
|
||||
t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
|
||||
}
|
||||
if actual.Status.StartTime == nil {
|
||||
t.Errorf("%s: .status.startTime was not set", name)
|
||||
}
|
||||
// validate conditions
|
||||
if tc.expectedComplete {
|
||||
completed := false
|
||||
for _, v := range actual.Status.Conditions {
|
||||
if v.Type == extensions.JobComplete && v.Status == api.ConditionTrue {
|
||||
completed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !completed {
|
||||
t.Errorf("%s: expected completion condition. Got %v", name, actual.Status.Conditions)
|
||||
}
|
||||
if tc.expectedComplete && !getCondition(actual, extensions.JobComplete) {
|
||||
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncJobPastDeadline(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
// job setup
|
||||
parallelism int
|
||||
completions int
|
||||
activeDeadlineSeconds int64
|
||||
startTime int64
|
||||
|
||||
// pod setup
|
||||
activePods int
|
||||
succeededPods int
|
||||
failedPods int
|
||||
|
||||
// expectations
|
||||
expectedDeletions int
|
||||
expectedActive int
|
||||
expectedSucceeded int
|
||||
expectedFailed int
|
||||
}{
|
||||
"activeDeadlineSeconds less than single pod execution": {
|
||||
1, 1, 10, 15,
|
||||
1, 0, 0,
|
||||
1, 0, 0, 1,
|
||||
},
|
||||
"activeDeadlineSeconds bigger than single pod execution": {
|
||||
1, 2, 10, 15,
|
||||
1, 1, 0,
|
||||
1, 0, 1, 1,
|
||||
},
|
||||
"activeDeadlineSeconds times-out before any pod starts": {
|
||||
1, 1, 10, 10,
|
||||
0, 0, 0,
|
||||
0, 0, 0, 0,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
// job manager setup
|
||||
client := client.NewOrDie(&client.Config{Host: "", GroupVersion: testapi.Default.GroupVersion()})
|
||||
manager := NewJobController(client, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
var actual *extensions.Job
|
||||
manager.updateHandler = func(job *extensions.Job) error {
|
||||
actual = job
|
||||
return nil
|
||||
}
|
||||
|
||||
// job & pods setup
|
||||
job := newJob(tc.parallelism, tc.completions)
|
||||
job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
|
||||
start := unversioned.Unix(unversioned.Now().Time.Unix()-tc.startTime, 0)
|
||||
job.Status.StartTime = &start
|
||||
manager.jobStore.Store.Add(job)
|
||||
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
|
||||
manager.podStore.Store.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
|
||||
manager.podStore.Store.Add(&pod)
|
||||
}
|
||||
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
|
||||
manager.podStore.Store.Add(&pod)
|
||||
}
|
||||
|
||||
// run
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error when syncing jobs %v", err)
|
||||
}
|
||||
|
||||
// validate created/deleted pods
|
||||
if len(fakePodControl.Templates) != 0 {
|
||||
t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
|
||||
}
|
||||
if len(fakePodControl.DeletePodName) != tc.expectedDeletions {
|
||||
t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
|
||||
}
|
||||
// validate status
|
||||
if actual.Status.Active != tc.expectedActive {
|
||||
t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
|
||||
}
|
||||
if actual.Status.Succeeded != tc.expectedSucceeded {
|
||||
t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
|
||||
}
|
||||
if actual.Status.Failed != tc.expectedFailed {
|
||||
t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
|
||||
}
|
||||
if actual.Status.StartTime == nil {
|
||||
t.Errorf("%s: .status.startTime was not set", name)
|
||||
}
|
||||
// validate conditions
|
||||
if !getCondition(actual, extensions.JobFailed) {
|
||||
t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getCondition(job *extensions.Job, condition extensions.JobConditionType) bool {
|
||||
for _, v := range job.Status.Conditions {
|
||||
if v.Type == condition && v.Status == api.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", GroupVersion: testapi.Default.GroupVersion()})
|
||||
manager := NewJobController(client, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
var actual *extensions.Job
|
||||
manager.updateHandler = func(job *extensions.Job) error {
|
||||
actual = job
|
||||
return nil
|
||||
}
|
||||
|
||||
job := newJob(1, 1)
|
||||
activeDeadlineSeconds := int64(10)
|
||||
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
|
||||
start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0)
|
||||
job.Status.StartTime = &start
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
manager.jobStore.Store.Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||
}
|
||||
if len(fakePodControl.Templates) != 0 {
|
||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
|
||||
}
|
||||
if len(fakePodControl.DeletePodName) != 0 {
|
||||
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
|
||||
}
|
||||
if actual != nil {
|
||||
t.Error("Unexpected job modification")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSyncJobDeleted(t *testing.T) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", GroupVersion: testapi.Default.GroupVersion()})
|
||||
manager := NewJobController(client, controller.NoResyncPeriodFunc)
|
||||
|
||||
Reference in New Issue
Block a user