diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 9f9308c842f..629bb3e0005 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -130,6 +130,9 @@ func TestAddFlags(t *testing.T) { "--emulated-version=test=1.31", "--emulation-forward-compatible=true", "--runtime-config-emulation-forward-compatible=true", + "--coordinated-leadership-lease-duration=10s", + "--coordinated-leadership-renew-deadline=5s", + "--coordinated-leadership-retry-period=1s", } fs.Parse(args) utilruntime.Must(componentGlobalsRegistry.Set()) @@ -307,6 +310,9 @@ func TestAddFlags(t *testing.T) { }, AggregatorRejectForwardingRedirects: true, SystemNamespaces: []string{"kube-system", "kube-public", "default", "kube-node-lease"}, + CoordinatedLeadershipLeaseDuration: 10 * time.Second, + CoordinatedLeadershipRenewDeadline: 5 * time.Second, + CoordinatedLeadershipRetryPeriod: 1 * time.Second, }, Extra: Extra{ diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index d476d4bef96..b81cb296876 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -102,6 +102,11 @@ type Extra struct { SystemNamespaces []string VersionedInformers clientgoinformers.SharedInformerFactory + + // Coordinated Leader Election timers + CoordinatedLeadershipLeaseDuration time.Duration + CoordinatedLeadershipRenewDeadline time.Duration + CoordinatedLeadershipRetryPeriod time.Duration } // BuildGenericConfig takes the generic controlplane apiserver options and produces @@ -302,6 +307,10 @@ func CreateConfig( ExtendExpiration: opts.Authentication.ServiceAccounts.ExtendExpiration, VersionedInformers: versionedInformers, + + CoordinatedLeadershipLeaseDuration: opts.CoordinatedLeadershipLeaseDuration, + CoordinatedLeadershipRenewDeadline: opts.CoordinatedLeadershipRenewDeadline, + CoordinatedLeadershipRetryPeriod: opts.CoordinatedLeadershipRetryPeriod, }, } diff --git a/pkg/controlplane/apiserver/options/options.go b/pkg/controlplane/apiserver/options/options.go index e5c4daf376f..b409f595808 100644 --- a/pkg/controlplane/apiserver/options/options.go +++ b/pkg/controlplane/apiserver/options/options.go @@ -92,6 +92,10 @@ type Options struct { SystemNamespaces []string ServiceAccountSigningEndpoint string + + CoordinatedLeadershipLeaseDuration time.Duration + CoordinatedLeadershipRenewDeadline time.Duration + CoordinatedLeadershipRetryPeriod time.Duration } // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked. @@ -125,6 +129,9 @@ func NewOptions() *Options { EventTTL: 1 * time.Hour, AggregatorRejectForwardingRedirects: true, SystemNamespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic, metav1.NamespaceDefault}, + CoordinatedLeadershipLeaseDuration: 15 * time.Second, + CoordinatedLeadershipRenewDeadline: 10 * time.Second, + CoordinatedLeadershipRetryPeriod: 2 * time.Second, } // Overwrite the default for storage data format. @@ -202,6 +209,13 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) { fs.StringVar(&s.ServiceAccountSigningEndpoint, "service-account-signing-endpoint", s.ServiceAccountSigningEndpoint, ""+ "Path to socket where a external JWT signer is listening. This flag is mutually exclusive with --service-account-signing-key-file and --service-account-key-file. Requires enabling feature gate (ExternalServiceAccountTokenSigner)") + + fs.DurationVar(&s.CoordinatedLeadershipLeaseDuration, "coordinated-leadership-lease-duration", s.CoordinatedLeadershipLeaseDuration, + "The duration of the lease used for Coordinated Leader Election.") + fs.DurationVar(&s.CoordinatedLeadershipRenewDeadline, "coordinated-leadership-renew-deadline", s.CoordinatedLeadershipRenewDeadline, + "The deadline for renewing a coordinated leader election lease.") + fs.DurationVar(&s.CoordinatedLeadershipRetryPeriod, "coordinated-leadership-retry-period", s.CoordinatedLeadershipRetryPeriod, + "The period for retrying to renew a coordinated leader election lease.") } func (o *Options) Complete(ctx context.Context, alternateDNS []string, alternateIPs []net.IP) (CompletedOptions, error) { diff --git a/pkg/controlplane/apiserver/options/options_test.go b/pkg/controlplane/apiserver/options/options_test.go index af1bcc24377..91d450fd25b 100644 --- a/pkg/controlplane/apiserver/options/options_test.go +++ b/pkg/controlplane/apiserver/options/options_test.go @@ -123,6 +123,9 @@ func TestAddFlags(t *testing.T) { "--storage-backend=etcd3", "--lease-reuse-duration-seconds=100", "--emulated-version=test=1.31", + "--coordinated-leadership-lease-duration=10s", + "--coordinated-leadership-renew-deadline=5s", + "--coordinated-leadership-retry-period=1s", } fs.Parse(args) utilruntime.Must(componentGlobalsRegistry.Set()) @@ -297,6 +300,9 @@ func TestAddFlags(t *testing.T) { }, AggregatorRejectForwardingRedirects: true, SystemNamespaces: []string{"kube-system", "kube-public", "default"}, + CoordinatedLeadershipLeaseDuration: 10 * time.Second, + CoordinatedLeadershipRenewDeadline: 5 * time.Second, + CoordinatedLeadershipRetryPeriod: 1 * time.Second, } expected.Authentication.OIDC.UsernameClaim = "sub" diff --git a/pkg/controlplane/apiserver/options/validation.go b/pkg/controlplane/apiserver/options/validation.go index 2a63457799e..e4d7c3dc0e9 100644 --- a/pkg/controlplane/apiserver/options/validation.go +++ b/pkg/controlplane/apiserver/options/validation.go @@ -117,6 +117,17 @@ func validateServiceAccountTokenSigningConfig(options *Options) []error { return errors } +func validateCoordinatedLeadershipFlags(options *Options) []error { + var errs []error + if options.CoordinatedLeadershipLeaseDuration <= options.CoordinatedLeadershipRenewDeadline { + errs = append(errs, fmt.Errorf("--coordinated-leadership-lease-duration must be greater than --coordinated-leadership-renew-deadline")) + } + if options.CoordinatedLeadershipRenewDeadline <= options.CoordinatedLeadershipRetryPeriod { + errs = append(errs, fmt.Errorf("--coordinated-leadership-renew-deadline must be greater than --coordinated-leadership-retry-period")) + } + return errs +} + // Validate checks Options and return a slice of found errs. func (s *Options) Validate() []error { var errs []error @@ -135,6 +146,7 @@ func (s *Options) Validate() []error { errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...) errs = append(errs, validateNodeSelectorAuthorizationFeature()...) errs = append(errs, validateServiceAccountTokenSigningConfig(s)...) + errs = append(errs, validateCoordinatedLeadershipFlags(s)...) return errs } diff --git a/pkg/controlplane/apiserver/options/validation_test.go b/pkg/controlplane/apiserver/options/validation_test.go index 097a62da11b..be3ff76a069 100644 --- a/pkg/controlplane/apiserver/options/validation_test.go +++ b/pkg/controlplane/apiserver/options/validation_test.go @@ -395,3 +395,68 @@ func TestValidateServiceAccountTokenSigningConfig(t *testing.T) { }) } } + +func TestValidateCoordinatedLeadershipFlags(t *testing.T) { + tests := []struct { + name string + options *Options + expectedErrors map[string]bool + }{ + { + name: "no errors", + options: &Options{ + CoordinatedLeadershipLeaseDuration: 10, + CoordinatedLeadershipRenewDeadline: 5, + CoordinatedLeadershipRetryPeriod: 2, + }, + }, + { + name: "invalid lease duration", + options: &Options{ + CoordinatedLeadershipLeaseDuration: 5, + CoordinatedLeadershipRenewDeadline: 5, + CoordinatedLeadershipRetryPeriod: 2, + }, + expectedErrors: map[string]bool{ + "--coordinated-leadership-lease-duration must be greater than --coordinated-leadership-renew-deadline": true, + }, + }, + { + name: "invalid retry period", + options: &Options{ + CoordinatedLeadershipLeaseDuration: 10, + CoordinatedLeadershipRenewDeadline: 5, + CoordinatedLeadershipRetryPeriod: 5, + }, + expectedErrors: map[string]bool{ + "--coordinated-leadership-renew-deadline must be greater than --coordinated-leadership-retry-period": true, + }, + }, + { + name: "all errors", + options: &Options{ + CoordinatedLeadershipLeaseDuration: 5, + CoordinatedLeadershipRenewDeadline: 5, + CoordinatedLeadershipRetryPeriod: 5, + }, + expectedErrors: map[string]bool{ + "--coordinated-leadership-lease-duration must be greater than --coordinated-leadership-renew-deadline": true, + "--coordinated-leadership-renew-deadline must be greater than --coordinated-leadership-retry-period": true, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + errs := validateCoordinatedLeadershipFlags(test.options) + if len(errs) != len(test.expectedErrors) { + t.Errorf("Expected %d errors, but got %d", len(test.expectedErrors), len(errs)) + } + for _, err := range errs { + if _, ok := test.expectedErrors[err.Error()]; !ok { + t.Errorf("Unexpected error: %v", err) + } + } + }) + } +} diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index c939850e98d..caa635febf4 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -190,6 +190,10 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele go controller.Run(ctx, workers) go gccontroller.Run(ctx) }, err + }, leaderelection.LeaderElectionTimers{ + LeaseDuration: c.CoordinatedLeadershipLeaseDuration, + RenewDeadline: c.CoordinatedLeadershipRenewDeadline, + RetryPeriod: c.CoordinatedLeadershipRetryPeriod, }) return nil }) diff --git a/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go b/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go index 399fdb3faf0..2593bc76423 100644 --- a/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go +++ b/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go @@ -29,12 +29,11 @@ import ( "k8s.io/klog/v2" ) -var ( - // TODO: Eventually these should be configurable - LeaseDuration = 15 * time.Second - RenewDeadline = 10 * time.Second - RetryPeriod = 2 * time.Second -) +type LeaderElectionTimers struct { + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration +} type NewRunner func() (func(ctx context.Context, workers int), error) @@ -43,7 +42,7 @@ type NewRunner func() (func(ctx context.Context, workers int), error) // controller instance's Run method each time. // RunWithLeaderElection only returns when the context is done, or initial // leader election fails. -func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) { +func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner, timers LeaderElectionTimers) { hostname, err := os.Hostname() if err != nil { klog.Infof("Error parsing hostname: %v", err) @@ -83,9 +82,9 @@ func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: rl, - LeaseDuration: LeaseDuration, - RenewDeadline: RenewDeadline, - RetryPeriod: RetryPeriod, + LeaseDuration: timers.LeaseDuration, + RenewDeadline: timers.RenewDeadline, + RetryPeriod: timers.RetryPeriod, Callbacks: callbacks, Name: controllerName, ReleaseOnCancel: true, @@ -95,5 +94,5 @@ func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn return } le.Run(ctx) - }, RetryPeriod, ctx.Done()) + }, timers.RetryPeriod, ctx.Done()) } diff --git a/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go b/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go index a1618b209d0..d9ad391a9d2 100644 --- a/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go +++ b/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go @@ -37,25 +37,19 @@ import ( ) func TestCoordinatedLeaderElectionLeaseTransfer(t *testing.T) { - // Reset the coordinated leader election variables after the test - defaultLeaseDuration := leaderelection.LeaseDuration - defaultRenewDeadline := leaderelection.RenewDeadline - defaultRetryPeriod := leaderelection.RetryPeriod - defer func() { - leaderelection.LeaseDuration = defaultLeaseDuration - leaderelection.RenewDeadline = defaultRenewDeadline - leaderelection.RetryPeriod = defaultRetryPeriod - }() - // Use shorter interval for lease duration in integration test - leaderelection.LeaseDuration = 5 * time.Second - leaderelection.RenewDeadline = 3 * time.Second - leaderelection.RetryPeriod = 2 * time.Second + timers := leaderelection.LeaderElectionTimers{ + LeaseDuration: 5 * time.Second, + RenewDeadline: 3 * time.Second, + RetryPeriod: 2 * time.Second, + } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) etcd := framework.SharedEtcd() flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1beta1.SchemeGroupVersion)} + // Set the timers on the apiserver . + flags = append(flags, fmt.Sprintf("--coordinated-leadership-lease-duration=%s", timers.LeaseDuration.String()), fmt.Sprintf("--coordinated-leadership-renew-deadline=%s", timers.RenewDeadline.String()), fmt.Sprintf("--coordinated-leadership-retry-period=%s", timers.RetryPeriod.String())) server := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), flags, etcd) defer server.TearDownFn()