mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	fix job controller hot loop
Change-Id: I55ce706381f1494e5cd2064177b938f56d9c356a
This commit is contained in:
		@@ -19,14 +19,13 @@ limitations under the License.
 | 
				
			|||||||
package endpoint
 | 
					package endpoint
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"encoding/json"
 | 
						"encoding/json"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"github.com/golang/glog"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/endpoints"
 | 
						"k8s.io/kubernetes/pkg/api/endpoints"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/errors"
 | 
						"k8s.io/kubernetes/pkg/api/errors"
 | 
				
			||||||
@@ -44,6 +43,8 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
						"k8s.io/kubernetes/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/workqueue"
 | 
						"k8s.io/kubernetes/pkg/util/workqueue"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/watch"
 | 
						"k8s.io/kubernetes/pkg/watch"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/golang/glog"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,12 +17,12 @@ limitations under the License.
 | 
				
			|||||||
package job
 | 
					package job
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"sort"
 | 
						"sort"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
						"k8s.io/kubernetes/pkg/api/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/apis/batch"
 | 
						"k8s.io/kubernetes/pkg/apis/batch"
 | 
				
			||||||
@@ -39,6 +39,8 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
						"k8s.io/kubernetes/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/workqueue"
 | 
						"k8s.io/kubernetes/pkg/util/workqueue"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/watch"
 | 
						"k8s.io/kubernetes/pkg/watch"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/golang/glog"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type JobController struct {
 | 
					type JobController struct {
 | 
				
			||||||
@@ -71,7 +73,7 @@ type JobController struct {
 | 
				
			|||||||
	podStore cache.StoreToPodLister
 | 
						podStore cache.StoreToPodLister
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Jobs that need to be updated
 | 
						// Jobs that need to be updated
 | 
				
			||||||
	queue *workqueue.Type
 | 
						queue workqueue.RateLimitingInterface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	recorder record.EventRecorder
 | 
						recorder record.EventRecorder
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -93,7 +95,7 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
 | 
				
			|||||||
			Recorder:   eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
 | 
								Recorder:   eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		expectations: controller.NewControllerExpectations(),
 | 
							expectations: controller.NewControllerExpectations(),
 | 
				
			||||||
		queue:        workqueue.NewNamed("job"),
 | 
							queue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"),
 | 
				
			||||||
		recorder:     eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
 | 
							recorder:     eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -144,6 +146,12 @@ func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod con
 | 
				
			|||||||
// Run the main goroutine responsible for watching and syncing jobs.
 | 
					// Run the main goroutine responsible for watching and syncing jobs.
 | 
				
			||||||
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
 | 
					func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
 | 
				
			||||||
	defer utilruntime.HandleCrash()
 | 
						defer utilruntime.HandleCrash()
 | 
				
			||||||
 | 
						defer jm.queue.ShutDown()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced) {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go jm.jobController.Run(stopCh)
 | 
						go jm.jobController.Run(stopCh)
 | 
				
			||||||
	for i := 0; i < workers; i++ {
 | 
						for i := 0; i < workers; i++ {
 | 
				
			||||||
		go wait.Until(jm.worker, time.Second, stopCh)
 | 
							go wait.Until(jm.worker, time.Second, stopCh)
 | 
				
			||||||
@@ -155,7 +163,6 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	<-stopCh
 | 
						<-stopCh
 | 
				
			||||||
	glog.Infof("Shutting down Job Manager")
 | 
						glog.Infof("Shutting down Job Manager")
 | 
				
			||||||
	jm.queue.ShutDown()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// getPodJob returns the job managing the given pod.
 | 
					// getPodJob returns the job managing the given pod.
 | 
				
			||||||
@@ -166,7 +173,7 @@ func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job {
 | 
				
			|||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if len(jobs) > 1 {
 | 
						if len(jobs) > 1 {
 | 
				
			||||||
		glog.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)
 | 
							utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
 | 
				
			||||||
		sort.Sort(byCreationTimestamp(jobs))
 | 
							sort.Sort(byCreationTimestamp(jobs))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &jobs[0]
 | 
						return &jobs[0]
 | 
				
			||||||
@@ -184,7 +191,7 @@ func (jm *JobController) addPod(obj interface{}) {
 | 
				
			|||||||
	if job := jm.getPodJob(pod); job != nil {
 | 
						if job := jm.getPodJob(pod); job != nil {
 | 
				
			||||||
		jobKey, err := controller.KeyFunc(job)
 | 
							jobKey, err := controller.KeyFunc(job)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			glog.Errorf("Couldn't get key for job %#v: %v", job, err)
 | 
								utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		jm.expectations.CreationObserved(jobKey)
 | 
							jm.expectations.CreationObserved(jobKey)
 | 
				
			||||||
@@ -236,19 +243,19 @@ func (jm *JobController) deletePod(obj interface{}) {
 | 
				
			|||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
							tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
				
			||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			glog.Errorf("Couldn't get object from tombstone %+v", obj)
 | 
								utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		pod, ok = tombstone.Obj.(*api.Pod)
 | 
							pod, ok = tombstone.Obj.(*api.Pod)
 | 
				
			||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
 | 
								utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj))
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if job := jm.getPodJob(pod); job != nil {
 | 
						if job := jm.getPodJob(pod); job != nil {
 | 
				
			||||||
		jobKey, err := controller.KeyFunc(job)
 | 
							jobKey, err := controller.KeyFunc(job)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			glog.Errorf("Couldn't get key for job %#v: %v", job, err)
 | 
								utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		jm.expectations.DeletionObserved(jobKey)
 | 
							jm.expectations.DeletionObserved(jobKey)
 | 
				
			||||||
@@ -260,7 +267,7 @@ func (jm *JobController) deletePod(obj interface{}) {
 | 
				
			|||||||
func (jm *JobController) enqueueController(obj interface{}) {
 | 
					func (jm *JobController) enqueueController(obj interface{}) {
 | 
				
			||||||
	key, err := controller.KeyFunc(obj)
 | 
						key, err := controller.KeyFunc(obj)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
 | 
							utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -276,19 +283,27 @@ func (jm *JobController) enqueueController(obj interface{}) {
 | 
				
			|||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
 | 
					// worker runs a worker thread that just dequeues items, processes them, and marks them done.
 | 
				
			||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
 | 
					// It enforces that the syncHandler is never invoked concurrently with the same key.
 | 
				
			||||||
func (jm *JobController) worker() {
 | 
					func (jm *JobController) worker() {
 | 
				
			||||||
	for {
 | 
						for jm.processNextWorkItem() {
 | 
				
			||||||
		func() {
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (jm *JobController) processNextWorkItem() bool {
 | 
				
			||||||
	key, quit := jm.queue.Get()
 | 
						key, quit := jm.queue.Get()
 | 
				
			||||||
	if quit {
 | 
						if quit {
 | 
				
			||||||
				return
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer jm.queue.Done(key)
 | 
						defer jm.queue.Done(key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err := jm.syncHandler(key.(string))
 | 
						err := jm.syncHandler(key.(string))
 | 
				
			||||||
			if err != nil {
 | 
						if err == nil {
 | 
				
			||||||
				glog.Errorf("Error syncing job: %v", err)
 | 
							jm.queue.Forget(key)
 | 
				
			||||||
			}
 | 
							return true
 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
 | 
				
			||||||
 | 
						jm.queue.AddRateLimited(key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
 | 
					// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
 | 
				
			||||||
@@ -300,14 +315,6 @@ func (jm *JobController) syncJob(key string) error {
 | 
				
			|||||||
		glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
 | 
							glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !jm.podStoreSynced() {
 | 
					 | 
				
			||||||
		// Sleep so we give the pod reflector goroutine a chance to run.
 | 
					 | 
				
			||||||
		time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod)
 | 
					 | 
				
			||||||
		glog.V(4).Infof("Waiting for pods controller to sync, requeuing job %v", key)
 | 
					 | 
				
			||||||
		jm.queue.Add(key)
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	obj, exists, err := jm.jobStore.Store.GetByKey(key)
 | 
						obj, exists, err := jm.jobStore.Store.GetByKey(key)
 | 
				
			||||||
	if !exists {
 | 
						if !exists {
 | 
				
			||||||
		glog.V(4).Infof("Job has been deleted: %v", key)
 | 
							glog.V(4).Infof("Job has been deleted: %v", key)
 | 
				
			||||||
@@ -315,8 +322,6 @@ func (jm *JobController) syncJob(key string) error {
 | 
				
			|||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Errorf("Unable to retrieve job %v from store: %v", key, err)
 | 
					 | 
				
			||||||
		jm.queue.Add(key)
 | 
					 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	job := *obj.(*batch.Job)
 | 
						job := *obj.(*batch.Job)
 | 
				
			||||||
@@ -324,17 +329,10 @@ func (jm *JobController) syncJob(key string) error {
 | 
				
			|||||||
	// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
 | 
						// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
 | 
				
			||||||
	// and update the expectations after we've retrieved active pods from the store. If a new pod enters
 | 
						// and update the expectations after we've retrieved active pods from the store. If a new pod enters
 | 
				
			||||||
	// the store after we've checked the expectation, the job sync is just deferred till the next relist.
 | 
						// the store after we've checked the expectation, the job sync is just deferred till the next relist.
 | 
				
			||||||
	jobKey, err := controller.KeyFunc(&job)
 | 
						jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		glog.Errorf("Couldn't get key for job %#v: %v", job, err)
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey)
 | 
					 | 
				
			||||||
	selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
 | 
						selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
 | 
				
			||||||
	pods, err := jm.podStore.Pods(job.Namespace).List(selector)
 | 
						pods, err := jm.podStore.Pods(job.Namespace).List(selector)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Errorf("Error getting pods for job %q: %v", key, err)
 | 
					 | 
				
			||||||
		jm.queue.Add(key)
 | 
					 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -418,8 +416,7 @@ func (jm *JobController) syncJob(key string) error {
 | 
				
			|||||||
		job.Status.Failed = failed
 | 
							job.Status.Failed = failed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := jm.updateHandler(&job); err != nil {
 | 
							if err := jm.updateHandler(&job); err != nil {
 | 
				
			||||||
			glog.Errorf("Failed to update job %v, requeuing.  Error: %v", job.Name, err)
 | 
								return err
 | 
				
			||||||
			jm.enqueueController(&job)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
@@ -464,7 +461,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *
 | 
				
			|||||||
	parallelism := *job.Spec.Parallelism
 | 
						parallelism := *job.Spec.Parallelism
 | 
				
			||||||
	jobKey, err := controller.KeyFunc(job)
 | 
						jobKey, err := controller.KeyFunc(job)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Errorf("Couldn't get key for job %#v: %v", job, err)
 | 
							utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
 | 
				
			||||||
		return 0
 | 
							return 0
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -516,7 +513,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		diff := wantActive - active
 | 
							diff := wantActive - active
 | 
				
			||||||
		if diff < 0 {
 | 
							if diff < 0 {
 | 
				
			||||||
			glog.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)
 | 
								utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
 | 
				
			||||||
			diff = 0
 | 
								diff = 0
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		jm.expectations.ExpectCreations(jobKey, int(diff))
 | 
							jm.expectations.ExpectCreations(jobKey, int(diff))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -476,12 +476,16 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
 | 
				
			|||||||
	fakePodControl := controller.FakePodControl{}
 | 
						fakePodControl := controller.FakePodControl{}
 | 
				
			||||||
	manager.podControl = &fakePodControl
 | 
						manager.podControl = &fakePodControl
 | 
				
			||||||
	manager.podStoreSynced = alwaysReady
 | 
						manager.podStoreSynced = alwaysReady
 | 
				
			||||||
	manager.updateHandler = func(job *batch.Job) error { return fmt.Errorf("Fake error") }
 | 
						updateError := fmt.Errorf("Update error")
 | 
				
			||||||
 | 
						manager.updateHandler = func(job *batch.Job) error {
 | 
				
			||||||
 | 
							manager.queue.AddRateLimited(getKey(job, t))
 | 
				
			||||||
 | 
							return updateError
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	job := newJob(2, 2)
 | 
						job := newJob(2, 2)
 | 
				
			||||||
	manager.jobStore.Store.Add(job)
 | 
						manager.jobStore.Store.Add(job)
 | 
				
			||||||
	err := manager.syncJob(getKey(job, t))
 | 
						err := manager.syncJob(getKey(job, t))
 | 
				
			||||||
	if err != nil {
 | 
						if err == nil || err != updateError {
 | 
				
			||||||
		t.Errorf("Unxpected error when syncing jobs, got %v", err)
 | 
							t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	t.Log("Waiting for a job in the queue")
 | 
						t.Log("Waiting for a job in the queue")
 | 
				
			||||||
	key, _ := manager.queue.Get()
 | 
						key, _ := manager.queue.Get()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user