mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Make scheduler optimistic about its bindings
This commit is contained in:
		@@ -105,20 +105,21 @@ func (s *Scheduler) Run() {
 | 
				
			|||||||
	go util.Until(s.scheduleOne, 0, s.config.StopEverything)
 | 
						go util.Until(s.scheduleOne, 0, s.config.StopEverything)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *Scheduler) scheduleOne() {
 | 
					func (s *Scheduler) schedule() (executeBinding func()) {
 | 
				
			||||||
	pod := s.config.NextPod()
 | 
						pod := s.config.NextPod()
 | 
				
			||||||
	glog.V(3).Infof("Attempting to schedule: %v", pod)
 | 
						glog.V(3).Infof("Attempting to schedule: %v", pod)
 | 
				
			||||||
	start := time.Now()
 | 
						start := time.Now()
 | 
				
			||||||
	defer func() {
 | 
						recordTime := func() {
 | 
				
			||||||
		metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
							metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
				
			||||||
	}()
 | 
						}
 | 
				
			||||||
	dest, err := s.config.Algorithm.Schedule(pod, s.config.MinionLister)
 | 
						dest, err := s.config.Algorithm.Schedule(pod, s.config.MinionLister)
 | 
				
			||||||
	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
						metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.V(1).Infof("Failed to schedule: %v", pod)
 | 
							glog.V(1).Infof("Failed to schedule: %v", pod)
 | 
				
			||||||
		s.config.Recorder.Eventf(pod, "failedScheduling", "Error scheduling: %v", err)
 | 
							s.config.Recorder.Eventf(pod, "failedScheduling", "Error scheduling: %v", err)
 | 
				
			||||||
		s.config.Error(pod, err)
 | 
							s.config.Error(pod, err)
 | 
				
			||||||
		return
 | 
							recordTime()
 | 
				
			||||||
 | 
							return func() {}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	b := &api.Binding{
 | 
						b := &api.Binding{
 | 
				
			||||||
		ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
 | 
							ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
 | 
				
			||||||
@@ -128,22 +129,32 @@ func (s *Scheduler) scheduleOne() {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// We want to add the pod to the model iff the bind succeeds, but we don't want to race
 | 
						// Actually do the binding asynchronously with respect to the scheduling queue.
 | 
				
			||||||
	// with any deletions, which happen asyncronously.
 | 
						return func() {
 | 
				
			||||||
	s.config.Modeler.LockedAction(func() {
 | 
							defer recordTime()
 | 
				
			||||||
 | 
							defer util.HandleCrash()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Make an object representing our assumtion that the bind will succeed.
 | 
				
			||||||
 | 
							assumed := *pod
 | 
				
			||||||
 | 
							assumed.Spec.Host = dest
 | 
				
			||||||
 | 
							s.config.Modeler.AssumePod(&assumed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		bindingStart := time.Now()
 | 
							bindingStart := time.Now()
 | 
				
			||||||
		err := s.config.Binder.Bind(b)
 | 
							err := s.config.Binder.Bind(b)
 | 
				
			||||||
		metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
 | 
							metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
 | 
								// Remove our (now invalid) assumption
 | 
				
			||||||
 | 
								s.config.Modeler.ForgetPod(&assumed)
 | 
				
			||||||
			glog.V(1).Infof("Failed to bind pod: %v", err)
 | 
								glog.V(1).Infof("Failed to bind pod: %v", err)
 | 
				
			||||||
			s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err)
 | 
								s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err)
 | 
				
			||||||
			s.config.Error(pod, err)
 | 
								s.config.Error(pod, err)
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
 | 
							s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
 | 
				
			||||||
		// tell the model to assume that this binding took effect.
 | 
						}
 | 
				
			||||||
		assumed := *pod
 | 
					}
 | 
				
			||||||
		assumed.Spec.Host = dest
 | 
					
 | 
				
			||||||
		s.config.Modeler.AssumePod(&assumed)
 | 
					func (s *Scheduler) scheduleOne() {
 | 
				
			||||||
	})
 | 
						bind := s.schedule()
 | 
				
			||||||
 | 
						go bind()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -112,6 +112,11 @@ func TestScheduler(t *testing.T) {
 | 
				
			|||||||
				AssumePodFunc: func(pod *api.Pod) {
 | 
									AssumePodFunc: func(pod *api.Pod) {
 | 
				
			||||||
					gotAssumedPod = pod
 | 
										gotAssumedPod = pod
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
 | 
									ForgetPodFunc: func(pod *api.Pod) {
 | 
				
			||||||
 | 
										if gotAssumedPod != nil && gotAssumedPod.Name == pod.Name && gotAssumedPod.Namespace == pod.Namespace {
 | 
				
			||||||
 | 
											gotAssumedPod = nil
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			MinionLister: scheduler.FakeMinionLister(
 | 
								MinionLister: scheduler.FakeMinionLister(
 | 
				
			||||||
				api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
 | 
									api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
 | 
				
			||||||
@@ -138,7 +143,7 @@ func TestScheduler(t *testing.T) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			close(called)
 | 
								close(called)
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
		s.scheduleOne()
 | 
							s.schedule()()
 | 
				
			||||||
		if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
 | 
							if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
 | 
				
			||||||
			t.Errorf("%v: assumed pod: wanted %v, got %v", i, e, a)
 | 
								t.Errorf("%v: assumed pod: wanted %v, got %v", i, e, a)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -228,7 +233,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
 | 
				
			|||||||
	// scheduledPodStore: []
 | 
						// scheduledPodStore: []
 | 
				
			||||||
	// assumedPods: []
 | 
						// assumedPods: []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.scheduleOne()
 | 
						s.schedule()()
 | 
				
			||||||
	// queuedPodStore: []
 | 
						// queuedPodStore: []
 | 
				
			||||||
	// scheduledPodStore: [foo:8080]
 | 
						// scheduledPodStore: [foo:8080]
 | 
				
			||||||
	// assumedPods: [foo:8080]
 | 
						// assumedPods: [foo:8080]
 | 
				
			||||||
@@ -282,7 +287,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
 | 
				
			|||||||
		close(called)
 | 
							close(called)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.scheduleOne()
 | 
						s.schedule()()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	expectBind = &api.Binding{
 | 
						expectBind = &api.Binding{
 | 
				
			||||||
		ObjectMeta: api.ObjectMeta{Name: "bar"},
 | 
							ObjectMeta: api.ObjectMeta{Name: "bar"},
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user