mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Performance change to option enable client.QPS, client.Burst
and change default on max_requests_inflight.
This commit is contained in:
		@@ -165,7 +165,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
 | 
				
			|||||||
	fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
 | 
						fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
 | 
				
			||||||
	fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
 | 
						fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
 | 
				
			||||||
	fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
 | 
						fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
 | 
				
			||||||
	fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 20, "The maximum number of requests in flight at a given time.  When the server exceeds this, it rejects requests.  Zero for no limit.")
 | 
						fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 400, "The maximum number of requests in flight at a given time.  When the server exceeds this, it rejects requests.  Zero for no limit.")
 | 
				
			||||||
	fs.StringVar(&s.LongRunningRequestRE, "long_running_request_regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
 | 
						fs.StringVar(&s.LongRunningRequestRE, "long_running_request_regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -99,6 +99,8 @@ func NewCMServer() *CMServer {
 | 
				
			|||||||
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
 | 
					func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
 | 
				
			||||||
	fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
 | 
						fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
 | 
				
			||||||
	fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
 | 
						fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
 | 
				
			||||||
 | 
						s.ClientConfig.QPS = 20.0
 | 
				
			||||||
 | 
						s.ClientConfig.Burst = 30
 | 
				
			||||||
	client.BindClientConfigFlags(fs, &s.ClientConfig)
 | 
						client.BindClientConfigFlags(fs, &s.ClientConfig)
 | 
				
			||||||
	fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services.  Empty string for no provider.")
 | 
						fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services.  Empty string for no provider.")
 | 
				
			||||||
	fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file.  Empty string for no configuration file.")
 | 
						fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file.  Empty string for no configuration file.")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -208,7 +208,7 @@ func contactOthers(state *State) {
 | 
				
			|||||||
		Host:   os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"),
 | 
							Host:   os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"),
 | 
				
			||||||
		Path:   "/api/v1beta1",
 | 
							Path:   "/api/v1beta1",
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 0)}
 | 
						client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 5, 10)}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Do this repeatedly, in case there's some propagation delay with getting
 | 
						// Do this repeatedly, in case there's some propagation delay with getting
 | 
				
			||||||
	// newly started pods into the endpoints list.
 | 
						// newly started pods into the endpoints list.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,6 +27,8 @@ type FlagSet interface {
 | 
				
			|||||||
	BoolVar(p *bool, name string, value bool, usage string)
 | 
						BoolVar(p *bool, name string, value bool, usage string)
 | 
				
			||||||
	UintVar(p *uint, name string, value uint, usage string)
 | 
						UintVar(p *uint, name string, value uint, usage string)
 | 
				
			||||||
	DurationVar(p *time.Duration, name string, value time.Duration, usage string)
 | 
						DurationVar(p *time.Duration, name string, value time.Duration, usage string)
 | 
				
			||||||
 | 
						Float32Var(p *float32, name string, value float32, usage string)
 | 
				
			||||||
 | 
						IntVar(p *int, name string, value int, usage string)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server.
 | 
					// BindClientConfigFlags registers a standard set of CLI flags for connecting to a Kubernetes API server.
 | 
				
			||||||
@@ -38,6 +40,8 @@ func BindClientConfigFlags(flags FlagSet, config *Config) {
 | 
				
			|||||||
	flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.")
 | 
						flags.StringVar(&config.CertFile, "client_certificate", config.CertFile, "Path to a client key file for TLS.")
 | 
				
			||||||
	flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.")
 | 
						flags.StringVar(&config.KeyFile, "client_key", config.KeyFile, "Path to a client key file for TLS.")
 | 
				
			||||||
	flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.")
 | 
						flags.StringVar(&config.CAFile, "certificate_authority", config.CAFile, "Path to a cert. file for the certificate authority.")
 | 
				
			||||||
 | 
						flags.Float32Var(&config.QPS, "max_outgoing_qps", config.QPS, "Maximum number of queries per second that could be issued by this client.")
 | 
				
			||||||
 | 
						flags.IntVar(&config.Burst, "max_outgoing_burst", config.Burst, "Maximum throttled burst")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func BindKubeletClientConfigFlags(flags FlagSet, config *KubeletConfig) {
 | 
					func BindKubeletClientConfigFlags(flags FlagSet, config *KubeletConfig) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -68,11 +68,31 @@ func (f *fakeFlagSet) DurationVar(p *time.Duration, name string, value time.Dura
 | 
				
			|||||||
	f.set.Insert(name)
 | 
						f.set.Insert(name)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *fakeFlagSet) Float32Var(p *float32, name string, value float32, usage string) {
 | 
				
			||||||
 | 
						if p == nil {
 | 
				
			||||||
 | 
							f.t.Errorf("unexpected nil pointer")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if usage == "" {
 | 
				
			||||||
 | 
							f.t.Errorf("unexpected empty usage")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						f.set.Insert(name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *fakeFlagSet) IntVar(p *int, name string, value int, usage string) {
 | 
				
			||||||
 | 
						if p == nil {
 | 
				
			||||||
 | 
							f.t.Errorf("unexpected nil pointer")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if usage == "" {
 | 
				
			||||||
 | 
							f.t.Errorf("unexpected empty usage")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						f.set.Insert(name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestBindClientConfigFlags(t *testing.T) {
 | 
					func TestBindClientConfigFlags(t *testing.T) {
 | 
				
			||||||
	flags := &fakeFlagSet{t, util.StringSet{}}
 | 
						flags := &fakeFlagSet{t, util.StringSet{}}
 | 
				
			||||||
	config := &Config{}
 | 
						config := &Config{}
 | 
				
			||||||
	BindClientConfigFlags(flags, config)
 | 
						BindClientConfigFlags(flags, config)
 | 
				
			||||||
	if len(flags.set) != 6 {
 | 
						if len(flags.set) != 8 {
 | 
				
			||||||
		t.Errorf("unexpected flag set: %#v", flags)
 | 
							t.Errorf("unexpected flag set: %#v", flags)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -80,6 +80,9 @@ type Config struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// QPS indicates the maximum QPS to the master from this client.  If zero, QPS is unlimited.
 | 
						// QPS indicates the maximum QPS to the master from this client.  If zero, QPS is unlimited.
 | 
				
			||||||
	QPS float32
 | 
						QPS float32
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Maximum burst for throttle
 | 
				
			||||||
 | 
						Burst int
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type KubeletConfig struct {
 | 
					type KubeletConfig struct {
 | 
				
			||||||
@@ -181,6 +184,9 @@ func SetKubernetesDefaults(config *Config) error {
 | 
				
			|||||||
	if config.QPS == 0.0 {
 | 
						if config.QPS == 0.0 {
 | 
				
			||||||
		config.QPS = 5.0
 | 
							config.QPS = 5.0
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if config.Burst == 0 {
 | 
				
			||||||
 | 
							config.Burst = 10
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -201,7 +207,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
 | 
				
			|||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS)
 | 
						client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS, config.Burst)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	transport, err := TransportFor(config)
 | 
						transport, err := TransportFor(config)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -274,6 +274,7 @@ func TestSetKubernetesDefaults(t *testing.T) {
 | 
				
			|||||||
				Codec:          latest.Codec,
 | 
									Codec:          latest.Codec,
 | 
				
			||||||
				LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"),
 | 
									LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"),
 | 
				
			||||||
				QPS:            5,
 | 
									QPS:            5,
 | 
				
			||||||
 | 
									Burst:          10,
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
			false,
 | 
								false,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -61,7 +61,7 @@ type RESTClient struct {
 | 
				
			|||||||
// such as Get, Put, Post, and Delete on specified paths.  Codec controls encoding and
 | 
					// such as Get, Put, Post, and Delete on specified paths.  Codec controls encoding and
 | 
				
			||||||
// decoding of responses from the server. If this client should use the older, legacy
 | 
					// decoding of responses from the server. If this client should use the older, legacy
 | 
				
			||||||
// API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true.
 | 
					// API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true.
 | 
				
			||||||
func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32) *RESTClient {
 | 
					func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32, maxBurst int) *RESTClient {
 | 
				
			||||||
	base := *baseURL
 | 
						base := *baseURL
 | 
				
			||||||
	if !strings.HasSuffix(base.Path, "/") {
 | 
						if !strings.HasSuffix(base.Path, "/") {
 | 
				
			||||||
		base.Path += "/"
 | 
							base.Path += "/"
 | 
				
			||||||
@@ -71,7 +71,7 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	var throttle util.RateLimiter
 | 
						var throttle util.RateLimiter
 | 
				
			||||||
	if maxQPS > 0 {
 | 
						if maxQPS > 0 {
 | 
				
			||||||
		throttle = util.NewTokenBucketRateLimiter(maxQPS, 10)
 | 
							throttle = util.NewTokenBucketRateLimiter(maxQPS, maxBurst)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &RESTClient{
 | 
						return &RESTClient{
 | 
				
			||||||
		baseURL:    &base,
 | 
							baseURL:    &base,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -48,7 +48,7 @@ func getFakeClient(t *testing.T, validURLs []string) (ClientPosterFunc, *httptes
 | 
				
			|||||||
	return func(mapping *meta.RESTMapping) (RESTClientPoster, error) {
 | 
						return func(mapping *meta.RESTMapping) (RESTClientPoster, error) {
 | 
				
			||||||
		fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1")
 | 
							fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1")
 | 
				
			||||||
		fakeUri, _ := url.Parse(server.URL + "/api/v1beta1")
 | 
							fakeUri, _ := url.Parse(server.URL + "/api/v1beta1")
 | 
				
			||||||
		return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 0), nil
 | 
							return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 5, 10), nil
 | 
				
			||||||
	}, server
 | 
						}, server
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -67,6 +67,8 @@ func NewSchedulerServer() *SchedulerServer {
 | 
				
			|||||||
func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
 | 
					func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
 | 
				
			||||||
	fs.IntVar(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on")
 | 
						fs.IntVar(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on")
 | 
				
			||||||
	fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
 | 
						fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
 | 
				
			||||||
 | 
						s.ClientConfig.QPS = 20.0
 | 
				
			||||||
 | 
						s.ClientConfig.Burst = 100
 | 
				
			||||||
	client.BindClientConfigFlags(fs, &s.ClientConfig)
 | 
						client.BindClientConfigFlags(fs, &s.ClientConfig)
 | 
				
			||||||
	fs.StringVar(&s.AlgorithmProvider, "algorithm_provider", s.AlgorithmProvider, "The scheduling algorithm provider to use")
 | 
						fs.StringVar(&s.AlgorithmProvider, "algorithm_provider", s.AlgorithmProvider, "The scheduling algorithm provider to use")
 | 
				
			||||||
	fs.StringVar(&s.PolicyConfigFile, "policy_config_file", s.PolicyConfigFile, "File with scheduler policy configuration")
 | 
						fs.StringVar(&s.PolicyConfigFile, "policy_config_file", s.PolicyConfigFile, "File with scheduler policy configuration")
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user