mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	NewSchedulerFromInterface implementation
This commit is contained in:
		| @@ -78,24 +78,20 @@ func Run(s *options.SchedulerServer) error { | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("unable to create kube client: %v", err) | ||||
| 	} | ||||
| 	config, err := createConfig(s, kubecli) | ||||
| 	recorder := createRecorder(kubecli, s) | ||||
| 	sched, err := createScheduler(s, kubecli, recorder) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create scheduler configuration: %v", err) | ||||
| 		return fmt.Errorf("error creating scheduler: %v", err) | ||||
| 	} | ||||
| 	sched := scheduler.New(config) | ||||
|  | ||||
| 	go startHTTP(s) | ||||
|  | ||||
| 	run := func(_ <-chan struct{}) { | ||||
| 		sched.Run() | ||||
| 		select {} | ||||
| 	} | ||||
|  | ||||
| 	if !s.LeaderElection.LeaderElect { | ||||
| 		run(nil) | ||||
| 		panic("unreachable") | ||||
| 	} | ||||
|  | ||||
| 	id, err := os.Hostname() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("unable to get hostname: %v", err) | ||||
| @@ -109,7 +105,7 @@ func Run(s *options.SchedulerServer) error { | ||||
| 		Client: kubecli, | ||||
| 		LockConfig: resourcelock.ResourceLockConfig{ | ||||
| 			Identity:      id, | ||||
| 			EventRecorder: config.Recorder, | ||||
| 			EventRecorder: recorder, | ||||
| 		}, | ||||
| 	} | ||||
| 	leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ | ||||
| @@ -127,6 +123,13 @@ func Run(s *options.SchedulerServer) error { | ||||
| 	panic("unreachable") | ||||
| } | ||||
|  | ||||
| func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder { | ||||
| 	eventBroadcaster := record.NewBroadcaster() | ||||
| 	eventBroadcaster.StartLogging(glog.Infof) | ||||
| 	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")}) | ||||
| 	return eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName}) | ||||
| } | ||||
|  | ||||
| func startHTTP(s *options.SchedulerServer) { | ||||
| 	mux := http.NewServeMux() | ||||
| 	healthz.InstallHandler(mux) | ||||
| @@ -171,33 +174,42 @@ func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) { | ||||
| 	return cli, nil | ||||
| } | ||||
|  | ||||
| func createConfig(s *options.SchedulerServer, kubecli *clientset.Clientset) (*scheduler.Config, error) { | ||||
| 	configFactory := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) | ||||
| 	if _, err := os.Stat(s.PolicyConfigFile); err == nil { | ||||
| 		var ( | ||||
| 			policy     schedulerapi.Policy | ||||
| 			configData []byte | ||||
| 		) | ||||
| 		configData, err := ioutil.ReadFile(s.PolicyConfigFile) | ||||
| // schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user | ||||
| // provided config file. | ||||
| type schedulerConfigurator struct { | ||||
| 	scheduler.Configurator | ||||
| 	policyFile        string | ||||
| 	algorithmProvider string | ||||
| } | ||||
|  | ||||
| func (sc schedulerConfigurator) Create() (*scheduler.Config, error) { | ||||
| 	if _, err := os.Stat(sc.policyFile); err != nil { | ||||
| 		return sc.Configurator.CreateFromProvider(sc.algorithmProvider) | ||||
| 	} | ||||
|  | ||||
| 	// policy file is valid, try to create a configuration from it. | ||||
| 	var policy schedulerapi.Policy | ||||
| 	configData, err := ioutil.ReadFile(sc.policyFile) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("unable to read policy config: %v", err) | ||||
| 	} | ||||
| 	if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { | ||||
| 		return nil, fmt.Errorf("invalid configuration: %v", err) | ||||
| 	} | ||||
| 		return configFactory.CreateFromConfig(policy) | ||||
| 	} | ||||
|  | ||||
| 	// if the config file isn't provided, use the specified (or default) provider | ||||
| 	config, err := configFactory.CreateFromProvider(s.AlgorithmProvider) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	eventBroadcaster := record.NewBroadcaster() | ||||
| 	config.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName}) | ||||
| 	eventBroadcaster.StartLogging(glog.Infof) | ||||
| 	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")}) | ||||
|  | ||||
| 	return config, nil | ||||
| 	return sc.CreateFromConfig(policy) | ||||
| } | ||||
|  | ||||
| // createScheduler encapsulates the entire creation of a runnable scheduler. | ||||
| func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) { | ||||
| 	configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) | ||||
|  | ||||
| 	// Rebuild the configurator with a default Create(...) method. | ||||
| 	configurator = &schedulerConfigurator{ | ||||
| 		configurator, | ||||
| 		s.PolicyConfigFile, | ||||
| 		s.AlgorithmProvider} | ||||
|  | ||||
| 	return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) { | ||||
| 		cfg.Recorder = recorder | ||||
| 	}) | ||||
| } | ||||
|   | ||||
| @@ -51,6 +51,10 @@ type Scheduler struct { | ||||
| 	config *Config | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) StopEverything() { | ||||
| 	close(sched.config.StopEverything) | ||||
| } | ||||
|  | ||||
| // These are the functions which need to be provided in order to build a Scheduler configuration. | ||||
| // An implementation of this can be seen in factory.go. | ||||
| type Configurator interface { | ||||
| @@ -78,6 +82,7 @@ type Configurator interface { | ||||
| 	CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) | ||||
| } | ||||
|  | ||||
| // TODO over time we should make this struct a hidden implementation detail of the scheduler. | ||||
| type Config struct { | ||||
| 	// It is expected that changes made via SchedulerCache will be observed | ||||
| 	// by NodeLister and Algorithm. | ||||
| @@ -108,6 +113,7 @@ type Config struct { | ||||
| } | ||||
|  | ||||
| // New returns a new scheduler. | ||||
| // TODO replace this with NewFromConfigurator. | ||||
| func New(c *Config) *Scheduler { | ||||
| 	s := &Scheduler{ | ||||
| 		config: c, | ||||
| @@ -116,6 +122,25 @@ func New(c *Config) *Scheduler { | ||||
| 	return s | ||||
| } | ||||
|  | ||||
| // NewFromConfigurator returns a new scheduler that is created entirely by the Configurator.  Assumes Create() is implemented. | ||||
| // Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created. | ||||
| func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) { | ||||
| 	cfg, err := c.Create() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	// Mutate it if any functions were provided, changes might be required for certain types of tests (i.e. change the recorder). | ||||
| 	for _, modifier := range modifiers { | ||||
| 		modifier(cfg) | ||||
| 	} | ||||
| 	// From this point on the config is immutable to the outside. | ||||
| 	s := &Scheduler{ | ||||
| 		config: cfg, | ||||
| 	} | ||||
| 	metrics.Register() | ||||
| 	return s, nil | ||||
| } | ||||
|  | ||||
| // Run begins watching and scheduling. It starts a goroutine and returns immediately. | ||||
| func (s *Scheduler) Run() { | ||||
| 	go wait.Until(s.scheduleOne, 0, s.config.StopEverything) | ||||
|   | ||||
| @@ -40,7 +40,7 @@ import ( | ||||
| // remove resources after finished. | ||||
| // Notes on rate limiter: | ||||
| //   - client rate limit is set to 5000. | ||||
| func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destroyFunc func()) { | ||||
| func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroyFunc func()) { | ||||
|  | ||||
| 	h := &framework.MasterHolder{Initialized: make(chan struct{})} | ||||
| 	s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { | ||||
| @@ -57,20 +57,23 @@ func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destro | ||||
| 		Burst:         5000, | ||||
| 	}) | ||||
|  | ||||
| 	schedulerConfigFactory = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) | ||||
| 	schedulerConfigurator = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) | ||||
|  | ||||
| 	schedulerConfig, err := schedulerConfigFactory.Create() | ||||
| 	if err != nil { | ||||
| 		panic("Couldn't create scheduler config") | ||||
| 	} | ||||
| 	eventBroadcaster := record.NewBroadcaster() | ||||
| 	schedulerConfig.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: "scheduler"}) | ||||
| 	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: clientSet.Core().Events("")}) | ||||
| 	scheduler.New(schedulerConfig).Run() | ||||
|  | ||||
| 	sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) { | ||||
| 		conf.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: "scheduler"}) | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		glog.Fatalf("Error creating scheduler: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	sched.Run() | ||||
|  | ||||
| 	destroyFunc = func() { | ||||
| 		glog.Infof("destroying") | ||||
| 		close(schedulerConfig.StopEverything) | ||||
| 		sched.StopEverything() | ||||
| 		s.Close() | ||||
| 		glog.Infof("destroyed") | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 jayunit100
					jayunit100