mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-01 18:58:18 +00:00
refactor replicaset sync call tree
This commit is contained in:
@@ -437,10 +437,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
|
||||
return nil
|
||||
}
|
||||
var errCh chan error
|
||||
if diff < 0 {
|
||||
diff *= -1
|
||||
errCh = make(chan error, diff)
|
||||
if diff > rsc.burstReplicas {
|
||||
diff = rsc.burstReplicas
|
||||
}
|
||||
@@ -450,7 +448,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
|
||||
// into a performance bottleneck. We should generate a UID for the pod
|
||||
// beforehand and store it via ExpectCreations.
|
||||
rsc.expectations.ExpectCreations(rsKey, diff)
|
||||
var wg sync.WaitGroup
|
||||
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
|
||||
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
|
||||
// and double with each successful iteration in a kind of "slow start".
|
||||
@@ -460,105 +457,85 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
|
||||
// prevented from spamming the API service with the pod create requests
|
||||
// after one of its pods fails. Conveniently, this also prevents the
|
||||
// event spam that those failures would generate.
|
||||
for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) {
|
||||
errorCount := len(errCh)
|
||||
wg.Add(batchSize)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
boolPtr := func(b bool) *bool { return &b }
|
||||
controllerRef := &metav1.OwnerReference{
|
||||
APIVersion: controllerKind.GroupVersion().String(),
|
||||
Kind: controllerKind.Kind,
|
||||
Name: rs.Name,
|
||||
UID: rs.UID,
|
||||
BlockOwnerDeletion: boolPtr(true),
|
||||
Controller: boolPtr(true),
|
||||
}
|
||||
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
|
||||
if err != nil && errors.IsTimeout(err) {
|
||||
// Pod is created but its initialization has timed out.
|
||||
// If the initialization is successful eventually, the
|
||||
// controller will observe the creation via the informer.
|
||||
// If the initialization fails, or if the pod keeps
|
||||
// uninitialized for a long time, the informer will not
|
||||
// receive any update, and the controller will create a new
|
||||
// pod when the expectation expires.
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
// Decrement the expected number of creates because the informer won't observe this pod
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
|
||||
rsc.expectations.CreationObserved(rsKey)
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
|
||||
boolPtr := func(b bool) *bool { return &b }
|
||||
controllerRef := &metav1.OwnerReference{
|
||||
APIVersion: controllerKind.GroupVersion().String(),
|
||||
Kind: controllerKind.Kind,
|
||||
Name: rs.Name,
|
||||
UID: rs.UID,
|
||||
BlockOwnerDeletion: boolPtr(true),
|
||||
Controller: boolPtr(true),
|
||||
}
|
||||
wg.Wait()
|
||||
// any skipped pods that we never attempted to start shouldn't be expected.
|
||||
skippedPods := diff - batchSize
|
||||
if errorCount < len(errCh) && skippedPods > 0 {
|
||||
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for replica set %q/%q", skippedPods, rs.Namespace, rs.Name)
|
||||
for i := 0; i < skippedPods; i++ {
|
||||
// Decrement the expected number of creates because the informer won't observe this pod
|
||||
rsc.expectations.CreationObserved(rsKey)
|
||||
}
|
||||
// The skipped pods will be retried later. The next controller resync will
|
||||
// retry the slow start process.
|
||||
break
|
||||
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
|
||||
if err != nil && errors.IsTimeout(err) {
|
||||
// Pod is created but its initialization has timed out.
|
||||
// If the initialization is successful eventually, the
|
||||
// controller will observe the creation via the informer.
|
||||
// If the initialization fails, or if the pod keeps
|
||||
// uninitialized for a long time, the informer will not
|
||||
// receive any update, and the controller will create a new
|
||||
// pod when the expectation expires.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
// Any skipped pods that we never attempted to start shouldn't be expected.
|
||||
// The skipped pods will be retried later. The next controller resync will
|
||||
// retry the slow start process.
|
||||
if skippedPods := diff - successfulCreations; skippedPods > 0 {
|
||||
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for replica set %v/%v", skippedPods, rs.Namespace, rs.Name)
|
||||
for i := 0; i < skippedPods; i++ {
|
||||
// Decrement the expected number of creates because the informer won't observe this pod
|
||||
rsc.expectations.CreationObserved(rsKey)
|
||||
}
|
||||
diff -= batchSize
|
||||
}
|
||||
return err
|
||||
} else if diff > 0 {
|
||||
if diff > rsc.burstReplicas {
|
||||
diff = rsc.burstReplicas
|
||||
}
|
||||
errCh = make(chan error, diff)
|
||||
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
|
||||
// No need to sort pods if we are about to delete all of them
|
||||
if *(rs.Spec.Replicas) != 0 {
|
||||
// Sort the pods in the order such that not-ready < ready, unscheduled
|
||||
// < scheduled, and pending < running. This ensures that we delete pods
|
||||
// in the earlier stages whenever possible.
|
||||
sort.Sort(controller.ActivePods(filteredPods))
|
||||
}
|
||||
|
||||
// Choose which Pods to delete, preferring those in earlier phases of startup.
|
||||
podsToDelete := getPodsToDelete(filteredPods, diff)
|
||||
|
||||
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
|
||||
// deleted, so we know to record their expectations exactly once either
|
||||
// when we see it as an update of the deletion timestamp, or as a delete.
|
||||
// Note that if the labels on a pod/rs change in a way that the pod gets
|
||||
// orphaned, the rs will only wake up after the expectations have
|
||||
// expired even if other pods are deleted.
|
||||
deletedPodKeys := []string{}
|
||||
for i := 0; i < diff; i++ {
|
||||
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
|
||||
}
|
||||
rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
|
||||
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
|
||||
|
||||
errCh := make(chan error, diff)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(diff)
|
||||
for i := 0; i < diff; i++ {
|
||||
go func(ix int) {
|
||||
for _, pod := range podsToDelete {
|
||||
go func(targetPod *v1.Pod) {
|
||||
defer wg.Done()
|
||||
if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
|
||||
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
|
||||
// Decrement the expected number of deletes because the informer won't observe this deletion
|
||||
podKey := controller.PodKey(filteredPods[ix])
|
||||
podKey := controller.PodKey(targetPod)
|
||||
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
|
||||
rsc.expectations.DeletionObserved(rsKey, podKey)
|
||||
errCh <- err
|
||||
}
|
||||
}(i)
|
||||
}(pod)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -606,22 +583,10 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
filteredPods = append(filteredPods, pod)
|
||||
}
|
||||
}
|
||||
// If any adoptions are attempted, we should first recheck for deletion with
|
||||
// an uncached quorum read sometime after listing Pods (see #42639).
|
||||
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
|
||||
fresh, err := rsc.kubeClient.ExtensionsV1beta1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fresh.UID != rs.UID {
|
||||
return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID)
|
||||
}
|
||||
return fresh, nil
|
||||
})
|
||||
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc)
|
||||
|
||||
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
||||
// modify them, you need to copy it first.
|
||||
filteredPods, err = cm.ClaimPods(filteredPods)
|
||||
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -630,9 +595,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
if rsNeedsSync && rs.DeletionTimestamp == nil {
|
||||
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
|
||||
}
|
||||
|
||||
rs = rs.DeepCopy()
|
||||
|
||||
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
|
||||
|
||||
// Always updates status as pods come up or die.
|
||||
@@ -650,3 +613,77 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
}
|
||||
return manageReplicasErr
|
||||
}
|
||||
|
||||
func (rsc *ReplicaSetController) claimPods(rs *extensions.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
|
||||
// If any adoptions are attempted, we should first recheck for deletion with
|
||||
// an uncached quorum read sometime after listing Pods (see #42639).
|
||||
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
|
||||
fresh, err := rsc.kubeClient.ExtensionsV1beta1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if fresh.UID != rs.UID {
|
||||
return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID)
|
||||
}
|
||||
return fresh, nil
|
||||
})
|
||||
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc)
|
||||
return cm.ClaimPods(filteredPods)
|
||||
}
|
||||
|
||||
// slowStartBatch tries to call the provided function a total of 'count' times,
|
||||
// starting slow to check for errors, then speeding up if calls succeed.
|
||||
//
|
||||
// It groups the calls into batches, starting with a group of initialBatchSize.
|
||||
// Within each batch, it may call the function multiple times concurrently.
|
||||
//
|
||||
// If a whole batch succeeds, the next batch may get exponentially larger.
|
||||
// If there are any failures in a batch, all remaining batches are skipped
|
||||
// after waiting for the current batch to complete.
|
||||
//
|
||||
// It returns the number of successful calls to the function.
|
||||
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
|
||||
remaining := count
|
||||
successes := 0
|
||||
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
|
||||
errCh := make(chan error, batchSize)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(batchSize)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := fn(); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
curSuccesses := batchSize - len(errCh)
|
||||
successes += curSuccesses
|
||||
if len(errCh) > 0 {
|
||||
return successes, <-errCh
|
||||
}
|
||||
remaining -= batchSize
|
||||
}
|
||||
return successes, nil
|
||||
}
|
||||
|
||||
func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
|
||||
// No need to sort pods if we are about to delete all of them.
|
||||
// diff will always be <= len(filteredPods), so not need to handle > case.
|
||||
if diff < len(filteredPods) {
|
||||
// Sort the pods in the order such that not-ready < ready, unscheduled
|
||||
// < scheduled, and pending < running. This ensures that we delete pods
|
||||
// in the earlier stages whenever possible.
|
||||
sort.Sort(controller.ActivePods(filteredPods))
|
||||
}
|
||||
return filteredPods[:diff]
|
||||
}
|
||||
|
||||
func getPodKeys(pods []*v1.Pod) []string {
|
||||
podKeys := make([]string, 0, len(pods))
|
||||
for _, pod := range pods {
|
||||
podKeys = append(podKeys, controller.PodKey(pod))
|
||||
}
|
||||
return podKeys
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user