mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-02 03:08:15 +00:00
Slow-start batch pod creation of rs, rc, ds, jobs
Prevent too-large replicas from generating enormous numbers of events by creating only a few pods at a time, then increasing the batch size when pod creations succeed. Stop creating batches of pods when any pod creation errors are encountered.
This commit is contained in:
@@ -39,6 +39,7 @@ import (
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/integer"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
@@ -652,34 +653,60 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
|
||||
|
||||
active += diff
|
||||
wait := sync.WaitGroup{}
|
||||
wait.Add(int(diff))
|
||||
for i := int32(0); i < diff; i++ {
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
|
||||
if err != nil && errors.IsTimeout(err) {
|
||||
// Pod is created but its initialization has timed out.
|
||||
// If the initialization is successful eventually, the
|
||||
// controller will observe the creation via the informer.
|
||||
// If the initialization fails, or if the pod keeps
|
||||
// uninitialized for a long time, the informer will not
|
||||
// receive any update, and the controller will create a new
|
||||
// pod when the expectation expires.
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
defer utilruntime.HandleError(err)
|
||||
|
||||
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
|
||||
// and double with each successful iteration in a kind of "slow start".
|
||||
// This handles attempts to start large numbers of pods that would
|
||||
// likely all fail with the same error. For example a project with a
|
||||
// low quota that attempts to create a large number of pods will be
|
||||
// prevented from spamming the API service with the pod create requests
|
||||
// after one of its pods fails. Conveniently, this also prevents the
|
||||
// event spam that those failures would generate.
|
||||
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
|
||||
errorCount := len(errCh)
|
||||
wait.Add(int(batchSize))
|
||||
for i := int32(0); i < batchSize; i++ {
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
|
||||
if err != nil && errors.IsTimeout(err) {
|
||||
// Pod is created but its initialization has timed out.
|
||||
// If the initialization is successful eventually, the
|
||||
// controller will observe the creation via the informer.
|
||||
// If the initialization fails, or if the pod keeps
|
||||
// uninitialized for a long time, the informer will not
|
||||
// receive any update, and the controller will create a new
|
||||
// pod when the expectation expires.
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
defer utilruntime.HandleError(err)
|
||||
// Decrement the expected number of creates because the informer won't observe this pod
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
|
||||
jm.expectations.CreationObserved(jobKey)
|
||||
activeLock.Lock()
|
||||
active--
|
||||
activeLock.Unlock()
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
wait.Wait()
|
||||
// any skipped pods that we never attempted to start shouldn't be expected.
|
||||
skippedPods := diff - batchSize
|
||||
if errorCount < len(errCh) && skippedPods > 0 {
|
||||
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
|
||||
active -= skippedPods
|
||||
for i := int32(0); i < skippedPods; i++ {
|
||||
// Decrement the expected number of creates because the informer won't observe this pod
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
|
||||
jm.expectations.CreationObserved(jobKey)
|
||||
activeLock.Lock()
|
||||
active--
|
||||
activeLock.Unlock()
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
// The skipped pods will be retried later. The next controller resync will
|
||||
// retry the slow start process.
|
||||
break
|
||||
}
|
||||
diff -= batchSize
|
||||
}
|
||||
wait.Wait()
|
||||
}
|
||||
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user