Merge pull request #132433 from michaelasp/configurablecle

feat: make CLE timers configurable
This commit is contained in:
Kubernetes Prow Robot
2025-06-30 12:08:31 -07:00
committed by GitHub
9 changed files with 133 additions and 24 deletions

View File

@@ -130,6 +130,9 @@ func TestAddFlags(t *testing.T) {
"--emulated-version=test=1.31",
"--emulation-forward-compatible=true",
"--runtime-config-emulation-forward-compatible=true",
"--coordinated-leadership-lease-duration=10s",
"--coordinated-leadership-renew-deadline=5s",
"--coordinated-leadership-retry-period=1s",
}
fs.Parse(args)
utilruntime.Must(componentGlobalsRegistry.Set())
@@ -307,6 +310,9 @@ func TestAddFlags(t *testing.T) {
},
AggregatorRejectForwardingRedirects: true,
SystemNamespaces: []string{"kube-system", "kube-public", "default", "kube-node-lease"},
CoordinatedLeadershipLeaseDuration: 10 * time.Second,
CoordinatedLeadershipRenewDeadline: 5 * time.Second,
CoordinatedLeadershipRetryPeriod: 1 * time.Second,
},
Extra: Extra{

View File

@@ -102,6 +102,11 @@ type Extra struct {
SystemNamespaces []string
VersionedInformers clientgoinformers.SharedInformerFactory
// Coordinated Leader Election timers
CoordinatedLeadershipLeaseDuration time.Duration
CoordinatedLeadershipRenewDeadline time.Duration
CoordinatedLeadershipRetryPeriod time.Duration
}
// BuildGenericConfig takes the generic controlplane apiserver options and produces
@@ -302,6 +307,10 @@ func CreateConfig(
ExtendExpiration: opts.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
CoordinatedLeadershipLeaseDuration: opts.CoordinatedLeadershipLeaseDuration,
CoordinatedLeadershipRenewDeadline: opts.CoordinatedLeadershipRenewDeadline,
CoordinatedLeadershipRetryPeriod: opts.CoordinatedLeadershipRetryPeriod,
},
}

View File

@@ -92,6 +92,10 @@ type Options struct {
SystemNamespaces []string
ServiceAccountSigningEndpoint string
CoordinatedLeadershipLeaseDuration time.Duration
CoordinatedLeadershipRenewDeadline time.Duration
CoordinatedLeadershipRetryPeriod time.Duration
}
// completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
@@ -125,6 +129,9 @@ func NewOptions() *Options {
EventTTL: 1 * time.Hour,
AggregatorRejectForwardingRedirects: true,
SystemNamespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic, metav1.NamespaceDefault},
CoordinatedLeadershipLeaseDuration: 15 * time.Second,
CoordinatedLeadershipRenewDeadline: 10 * time.Second,
CoordinatedLeadershipRetryPeriod: 2 * time.Second,
}
// Overwrite the default for storage data format.
@@ -202,6 +209,13 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) {
fs.StringVar(&s.ServiceAccountSigningEndpoint, "service-account-signing-endpoint", s.ServiceAccountSigningEndpoint, ""+
"Path to socket where a external JWT signer is listening. This flag is mutually exclusive with --service-account-signing-key-file and --service-account-key-file. Requires enabling feature gate (ExternalServiceAccountTokenSigner)")
fs.DurationVar(&s.CoordinatedLeadershipLeaseDuration, "coordinated-leadership-lease-duration", s.CoordinatedLeadershipLeaseDuration,
"The duration of the lease used for Coordinated Leader Election.")
fs.DurationVar(&s.CoordinatedLeadershipRenewDeadline, "coordinated-leadership-renew-deadline", s.CoordinatedLeadershipRenewDeadline,
"The deadline for renewing a coordinated leader election lease.")
fs.DurationVar(&s.CoordinatedLeadershipRetryPeriod, "coordinated-leadership-retry-period", s.CoordinatedLeadershipRetryPeriod,
"The period for retrying to renew a coordinated leader election lease.")
}
func (o *Options) Complete(ctx context.Context, alternateDNS []string, alternateIPs []net.IP) (CompletedOptions, error) {

View File

@@ -123,6 +123,9 @@ func TestAddFlags(t *testing.T) {
"--storage-backend=etcd3",
"--lease-reuse-duration-seconds=100",
"--emulated-version=test=1.31",
"--coordinated-leadership-lease-duration=10s",
"--coordinated-leadership-renew-deadline=5s",
"--coordinated-leadership-retry-period=1s",
}
fs.Parse(args)
utilruntime.Must(componentGlobalsRegistry.Set())
@@ -297,6 +300,9 @@ func TestAddFlags(t *testing.T) {
},
AggregatorRejectForwardingRedirects: true,
SystemNamespaces: []string{"kube-system", "kube-public", "default"},
CoordinatedLeadershipLeaseDuration: 10 * time.Second,
CoordinatedLeadershipRenewDeadline: 5 * time.Second,
CoordinatedLeadershipRetryPeriod: 1 * time.Second,
}
expected.Authentication.OIDC.UsernameClaim = "sub"

View File

@@ -117,6 +117,17 @@ func validateServiceAccountTokenSigningConfig(options *Options) []error {
return errors
}
func validateCoordinatedLeadershipFlags(options *Options) []error {
var errs []error
if options.CoordinatedLeadershipLeaseDuration <= options.CoordinatedLeadershipRenewDeadline {
errs = append(errs, fmt.Errorf("--coordinated-leadership-lease-duration must be greater than --coordinated-leadership-renew-deadline"))
}
if options.CoordinatedLeadershipRenewDeadline <= options.CoordinatedLeadershipRetryPeriod {
errs = append(errs, fmt.Errorf("--coordinated-leadership-renew-deadline must be greater than --coordinated-leadership-retry-period"))
}
return errs
}
// Validate checks Options and return a slice of found errs.
func (s *Options) Validate() []error {
var errs []error
@@ -135,6 +146,7 @@ func (s *Options) Validate() []error {
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)
errs = append(errs, validateNodeSelectorAuthorizationFeature()...)
errs = append(errs, validateServiceAccountTokenSigningConfig(s)...)
errs = append(errs, validateCoordinatedLeadershipFlags(s)...)
return errs
}

View File

@@ -395,3 +395,68 @@ func TestValidateServiceAccountTokenSigningConfig(t *testing.T) {
})
}
}
func TestValidateCoordinatedLeadershipFlags(t *testing.T) {
tests := []struct {
name string
options *Options
expectedErrors map[string]bool
}{
{
name: "no errors",
options: &Options{
CoordinatedLeadershipLeaseDuration: 10,
CoordinatedLeadershipRenewDeadline: 5,
CoordinatedLeadershipRetryPeriod: 2,
},
},
{
name: "invalid lease duration",
options: &Options{
CoordinatedLeadershipLeaseDuration: 5,
CoordinatedLeadershipRenewDeadline: 5,
CoordinatedLeadershipRetryPeriod: 2,
},
expectedErrors: map[string]bool{
"--coordinated-leadership-lease-duration must be greater than --coordinated-leadership-renew-deadline": true,
},
},
{
name: "invalid retry period",
options: &Options{
CoordinatedLeadershipLeaseDuration: 10,
CoordinatedLeadershipRenewDeadline: 5,
CoordinatedLeadershipRetryPeriod: 5,
},
expectedErrors: map[string]bool{
"--coordinated-leadership-renew-deadline must be greater than --coordinated-leadership-retry-period": true,
},
},
{
name: "all errors",
options: &Options{
CoordinatedLeadershipLeaseDuration: 5,
CoordinatedLeadershipRenewDeadline: 5,
CoordinatedLeadershipRetryPeriod: 5,
},
expectedErrors: map[string]bool{
"--coordinated-leadership-lease-duration must be greater than --coordinated-leadership-renew-deadline": true,
"--coordinated-leadership-renew-deadline must be greater than --coordinated-leadership-retry-period": true,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
errs := validateCoordinatedLeadershipFlags(test.options)
if len(errs) != len(test.expectedErrors) {
t.Errorf("Expected %d errors, but got %d", len(test.expectedErrors), len(errs))
}
for _, err := range errs {
if _, ok := test.expectedErrors[err.Error()]; !ok {
t.Errorf("Unexpected error: %v", err)
}
}
})
}
}

View File

@@ -190,6 +190,10 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
go controller.Run(ctx, workers)
go gccontroller.Run(ctx)
}, err
}, leaderelection.LeaderElectionTimers{
LeaseDuration: c.CoordinatedLeadershipLeaseDuration,
RenewDeadline: c.CoordinatedLeadershipRenewDeadline,
RetryPeriod: c.CoordinatedLeadershipRetryPeriod,
})
return nil
})

View File

@@ -29,12 +29,11 @@ import (
"k8s.io/klog/v2"
)
var (
// TODO: Eventually these should be configurable
LeaseDuration = 15 * time.Second
RenewDeadline = 10 * time.Second
RetryPeriod = 2 * time.Second
)
type LeaderElectionTimers struct {
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
}
type NewRunner func() (func(ctx context.Context, workers int), error)
@@ -43,7 +42,7 @@ type NewRunner func() (func(ctx context.Context, workers int), error)
// controller instance's Run method each time.
// RunWithLeaderElection only returns when the context is done, or initial
// leader election fails.
func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) {
func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner, timers LeaderElectionTimers) {
hostname, err := os.Hostname()
if err != nil {
klog.Infof("Error parsing hostname: %v", err)
@@ -83,9 +82,9 @@ func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: LeaseDuration,
RenewDeadline: RenewDeadline,
RetryPeriod: RetryPeriod,
LeaseDuration: timers.LeaseDuration,
RenewDeadline: timers.RenewDeadline,
RetryPeriod: timers.RetryPeriod,
Callbacks: callbacks,
Name: controllerName,
ReleaseOnCancel: true,
@@ -95,5 +94,5 @@ func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn
return
}
le.Run(ctx)
}, RetryPeriod, ctx.Done())
}, timers.RetryPeriod, ctx.Done())
}

View File

@@ -37,25 +37,19 @@ import (
)
func TestCoordinatedLeaderElectionLeaseTransfer(t *testing.T) {
// Reset the coordinated leader election variables after the test
defaultLeaseDuration := leaderelection.LeaseDuration
defaultRenewDeadline := leaderelection.RenewDeadline
defaultRetryPeriod := leaderelection.RetryPeriod
defer func() {
leaderelection.LeaseDuration = defaultLeaseDuration
leaderelection.RenewDeadline = defaultRenewDeadline
leaderelection.RetryPeriod = defaultRetryPeriod
}()
// Use shorter interval for lease duration in integration test
leaderelection.LeaseDuration = 5 * time.Second
leaderelection.RenewDeadline = 3 * time.Second
leaderelection.RetryPeriod = 2 * time.Second
timers := leaderelection.LeaderElectionTimers{
LeaseDuration: 5 * time.Second,
RenewDeadline: 3 * time.Second,
RetryPeriod: 2 * time.Second,
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
etcd := framework.SharedEtcd()
flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1beta1.SchemeGroupVersion)}
// Set the timers on the apiserver .
flags = append(flags, fmt.Sprintf("--coordinated-leadership-lease-duration=%s", timers.LeaseDuration.String()), fmt.Sprintf("--coordinated-leadership-renew-deadline=%s", timers.RenewDeadline.String()), fmt.Sprintf("--coordinated-leadership-retry-period=%s", timers.RetryPeriod.String()))
server := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), flags, etcd)
defer server.TearDownFn()