mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-02 03:08:15 +00:00
Making the LoggingConfiguration part of the versioned component-base/config API
had the theoretic advantage that components could have offered different
configuration APIs with experimental features limited to alpha versions (for
example, sanitization offered only in a v1alpha1.KubeletConfiguration). Some
components could have decided to only use stable logging options.
In practice, this wasn't done. Furthermore, we don't want different components
to make different choices regarding which logging features they offer to
users. It should always be the same everywhere, for the sake of consistency.
This can be achieved with a saner Go API by dropping the distinction between
internal and external LoggingConfiguration types. Different stability levels of
indidividual fields have to be covered by documentation (done) and potentially
feature gates (not currently done).
Advantages:
- everything related to logging is under component-base/logs;
previously this was scattered across different packages and
different files under "logs" (why some code was in logs/config.go
vs. logs/options.go vs. logs/logs.go always confused me again
and again when coming back to the code):
- long-term config and command line API are clearly separated
into the "api" package underneath that
- logs/logs.go itself only deals with legacy global flags and
logging configuration
- removal of separate Go APIs like logs.BindLoggingFlags and
logs.Options
- LogRegistry becomes an implementation detail, with less code
and less exported functionality (only registration needs to
be exported, querying is internal)
358 lines
13 KiB
Go
358 lines
13 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
// Package app implements a Server object for running the scheduler.
|
|
package app
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
goruntime "runtime"
|
|
|
|
"github.com/spf13/cobra"
|
|
|
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apiserver/pkg/authentication/authenticator"
|
|
"k8s.io/apiserver/pkg/authorization/authorizer"
|
|
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
|
"k8s.io/apiserver/pkg/server"
|
|
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
|
"k8s.io/apiserver/pkg/server/healthz"
|
|
"k8s.io/apiserver/pkg/server/mux"
|
|
"k8s.io/apiserver/pkg/server/routes"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
cliflag "k8s.io/component-base/cli/flag"
|
|
"k8s.io/component-base/cli/globalflag"
|
|
"k8s.io/component-base/configz"
|
|
"k8s.io/component-base/logs"
|
|
logsapi "k8s.io/component-base/logs/api/v1"
|
|
"k8s.io/component-base/metrics/legacyregistry"
|
|
"k8s.io/component-base/term"
|
|
"k8s.io/component-base/version"
|
|
"k8s.io/component-base/version/verflag"
|
|
"k8s.io/klog/v2"
|
|
schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
|
|
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
|
|
"k8s.io/kubernetes/pkg/scheduler"
|
|
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
|
"k8s.io/kubernetes/pkg/scheduler/metrics/resources"
|
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
)
|
|
|
|
func init() {
|
|
utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
|
|
}
|
|
|
|
// Option configures a framework.Registry.
|
|
type Option func(runtime.Registry) error
|
|
|
|
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
|
|
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
|
|
opts := options.NewOptions()
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "kube-scheduler",
|
|
Long: `The Kubernetes scheduler is a control plane process which assigns
|
|
Pods to Nodes. The scheduler determines which Nodes are valid placements for
|
|
each Pod in the scheduling queue according to constraints and available
|
|
resources. The scheduler then ranks each valid Node and binds the Pod to a
|
|
suitable Node. Multiple different schedulers may be used within a cluster;
|
|
kube-scheduler is the reference implementation.
|
|
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
|
|
for more information about scheduling and the kube-scheduler component.`,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return runCommand(cmd, opts, registryOptions...)
|
|
},
|
|
Args: func(cmd *cobra.Command, args []string) error {
|
|
for _, arg := range args {
|
|
if len(arg) > 0 {
|
|
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
|
|
nfs := opts.Flags
|
|
verflag.AddFlags(nfs.FlagSet("global"))
|
|
globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
|
|
fs := cmd.Flags()
|
|
for _, f := range nfs.FlagSets {
|
|
fs.AddFlagSet(f)
|
|
}
|
|
|
|
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
|
|
cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
|
|
|
|
if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
|
|
klog.ErrorS(err, "Failed to mark flag filename")
|
|
}
|
|
|
|
return cmd
|
|
}
|
|
|
|
// runCommand runs the scheduler.
|
|
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
|
|
verflag.PrintAndExitIfRequested()
|
|
|
|
// Activate logging as soon as possible, after that
|
|
// show flags with the final logging configuration.
|
|
if err := opts.Logs.ValidateAndApply(utilfeature.DefaultFeatureGate); err != nil {
|
|
fmt.Fprintf(os.Stderr, "%v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
cliflag.PrintFlags(cmd.Flags())
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go func() {
|
|
stopCh := server.SetupSignalHandler()
|
|
<-stopCh
|
|
cancel()
|
|
}()
|
|
|
|
cc, sched, err := Setup(ctx, opts, registryOptions...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return Run(ctx, cc, sched)
|
|
}
|
|
|
|
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
|
|
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
|
|
// To help debugging, immediately log version
|
|
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
|
|
|
|
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
|
|
|
|
// Configz registration.
|
|
if cz, err := configz.New("componentconfig"); err == nil {
|
|
cz.Set(cc.ComponentConfig)
|
|
} else {
|
|
return fmt.Errorf("unable to register configz: %s", err)
|
|
}
|
|
|
|
// Start events processing pipeline.
|
|
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
|
|
defer cc.EventBroadcaster.Shutdown()
|
|
|
|
// Setup healthz checks.
|
|
var checks []healthz.HealthChecker
|
|
if cc.ComponentConfig.LeaderElection.LeaderElect {
|
|
checks = append(checks, cc.LeaderElection.WatchDog)
|
|
}
|
|
|
|
waitingForLeader := make(chan struct{})
|
|
isLeader := func() bool {
|
|
select {
|
|
case _, ok := <-waitingForLeader:
|
|
// if channel is closed, we are leading
|
|
return !ok
|
|
default:
|
|
// channel is open, we are waiting for a leader
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Start up the healthz server.
|
|
if cc.SecureServing != nil {
|
|
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
|
|
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
|
|
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
|
|
// fail early for secure handlers, removing the old error loop from above
|
|
return fmt.Errorf("failed to start secure server: %v", err)
|
|
}
|
|
}
|
|
|
|
// Start all informers.
|
|
cc.InformerFactory.Start(ctx.Done())
|
|
// DynInformerFactory can be nil in tests.
|
|
if cc.DynInformerFactory != nil {
|
|
cc.DynInformerFactory.Start(ctx.Done())
|
|
}
|
|
|
|
// Wait for all caches to sync before scheduling.
|
|
cc.InformerFactory.WaitForCacheSync(ctx.Done())
|
|
// DynInformerFactory can be nil in tests.
|
|
if cc.DynInformerFactory != nil {
|
|
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
|
|
}
|
|
|
|
// If leader election is enabled, runCommand via LeaderElector until done and exit.
|
|
if cc.LeaderElection != nil {
|
|
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
|
|
OnStartedLeading: func(ctx context.Context) {
|
|
close(waitingForLeader)
|
|
sched.Run(ctx)
|
|
},
|
|
OnStoppedLeading: func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
// We were asked to terminate. Exit 0.
|
|
klog.InfoS("Requested to terminate, exiting")
|
|
os.Exit(0)
|
|
default:
|
|
// We lost the lock.
|
|
klog.ErrorS(nil, "Leaderelection lost")
|
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
|
}
|
|
},
|
|
}
|
|
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't create leader elector: %v", err)
|
|
}
|
|
|
|
leaderElector.Run(ctx)
|
|
|
|
return fmt.Errorf("lost lease")
|
|
}
|
|
|
|
// Leader election is disabled, so runCommand inline until done.
|
|
close(waitingForLeader)
|
|
sched.Run(ctx)
|
|
return fmt.Errorf("finished without leader elect")
|
|
}
|
|
|
|
// buildHandlerChain wraps the given handler with the standard filters.
|
|
func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
|
|
requestInfoResolver := &apirequest.RequestInfoFactory{}
|
|
failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
|
|
|
|
handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
|
|
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)
|
|
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
|
|
handler = genericapifilters.WithCacheControl(handler)
|
|
handler = genericfilters.WithHTTPLogging(handler)
|
|
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
|
|
|
|
return handler
|
|
}
|
|
|
|
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
|
|
configz.InstallHandler(pathRecorderMux)
|
|
pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
|
|
|
|
resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
|
|
pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
|
|
if !isLeader() {
|
|
return
|
|
}
|
|
resourceMetricsHandler.ServeHTTP(w, req)
|
|
})
|
|
}
|
|
|
|
// newHealthzAndMetricsHandler creates a healthz server from the config, and will also
|
|
// embed the metrics handler.
|
|
func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
|
|
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
|
|
healthz.InstallHandler(pathRecorderMux, checks...)
|
|
installMetricHandler(pathRecorderMux, informers, isLeader)
|
|
if config.EnableProfiling {
|
|
routes.Profiling{}.Install(pathRecorderMux)
|
|
if config.EnableContentionProfiling {
|
|
goruntime.SetBlockProfileRate(1)
|
|
}
|
|
routes.DebugFlags{}.Install(pathRecorderMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
|
|
}
|
|
return pathRecorderMux
|
|
}
|
|
|
|
func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
|
|
return func(name string) events.EventRecorder {
|
|
return cc.EventBroadcaster.NewRecorder(name)
|
|
}
|
|
}
|
|
|
|
// WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins,
|
|
// hence there are no references to it from the kubernetes scheduler code base.
|
|
func WithPlugin(name string, factory runtime.PluginFactory) Option {
|
|
return func(registry runtime.Registry) error {
|
|
return registry.Register(name, factory)
|
|
}
|
|
}
|
|
|
|
// Setup creates a completed config and a scheduler based on the command args and options
|
|
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
|
|
if cfg, err := latest.Default(); err != nil {
|
|
return nil, nil, err
|
|
} else {
|
|
opts.ComponentConfig = cfg
|
|
}
|
|
|
|
if errs := opts.Validate(); len(errs) > 0 {
|
|
return nil, nil, utilerrors.NewAggregate(errs)
|
|
}
|
|
|
|
c, err := opts.Config()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Get the completed config
|
|
cc := c.Complete()
|
|
|
|
outOfTreeRegistry := make(runtime.Registry)
|
|
for _, option := range outOfTreeRegistryOptions {
|
|
if err := option(outOfTreeRegistry); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
recorderFactory := getRecorderFactory(&cc)
|
|
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
|
|
// Create the scheduler.
|
|
sched, err := scheduler.New(cc.Client,
|
|
cc.InformerFactory,
|
|
cc.DynInformerFactory,
|
|
recorderFactory,
|
|
ctx.Done(),
|
|
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
|
|
scheduler.WithKubeConfig(cc.KubeConfig),
|
|
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
|
|
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
|
|
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
|
|
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
|
|
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
|
|
scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
|
|
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
|
|
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
|
|
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
|
|
// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
|
|
completedProfiles = append(completedProfiles, profile)
|
|
}),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return &cc, sched, nil
|
|
}
|