feat: node ipam controller

Add node IPAM controller.

It supports two modes:
* RangeAllocator - classic mode (kubernetes does the same)
* CloudAllocator - Talos is responsible for setting PodCIDRs

Signed-off-by: Serge Logvinov <serge.logvinov@sinextra.dev>
This commit is contained in:
Serge Logvinov
2024-05-06 18:12:16 +03:00
parent 3b20bb0d45
commit e1a0e0e5a4
33 changed files with 4071 additions and 198 deletions

View File

@@ -19,9 +19,9 @@ run:
# won't be reported. Default value is empty list, but there is
# no need to include all autogenerated files, we confidently recognize
# autogenerated files. If it's not please let us know.
exclude-files:
- charts/
- docs/
exclude-dirs:
- charts
- docs
# list of build tags, all linters use it. Default is empty list.
build-tags:
@@ -199,6 +199,19 @@ issues:
# Default value for this option is true.
exclude-use-default: false
# Which dirs to exclude: issues from them won't be reported.
# Can use regexp here: `generated.*`, regexp is applied on full path,
# including the path prefix if one is set.
exclude-dirs:
# copied from kubernetes repo
- pkg/names
- pkg/nodeipam/config
- pkg/utils/controller/node
- pkg/nodeipam/ipam/cidrset
exclude-files:
- cmd/talos-cloud-controller-manager/options/nodeipamcontroller.go
- pkg/nodeipam/ipam/range_allocator.go
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0

View File

@@ -72,8 +72,11 @@ build: ## Build
.PHONY: run
run: build
./talos-cloud-controller-manager-$(ARCH) --v=5 --kubeconfig=kubeconfig --cloud-config=hack/ccm-config.yaml --controllers=cloud-node \
--use-service-account-credentials --leader-elect=false --bind-address=127.0.0.1 --authorization-always-allow-paths=/healthz,/livez,/readyz,/metrics
./talos-cloud-controller-manager-$(ARCH) --v=5 --kubeconfig=kubeconfig --cloud-config=hack/ccm-config.yaml --controllers=cloud-node,node-ipam-controller \
--allocate-node-cidrs \
--node-cidr-mask-size-ipv4=24 --node-cidr-mask-size-ipv6=80 \
--cidr-allocator-type=CloudAllocator \
--use-service-account-credentials --leader-elect=false --bind-address=127.0.0.1 --secure-port=0 --authorization-always-allow-paths=/healthz,/livez,/readyz,/metrics
.PHONY: lint
lint: ## Lint Code

View File

@@ -14,10 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// This file should be written by each cloud provider.
// For an minimal working example, please refer to k8s.io/cloud-provider/sample/basic_main.go
// For more details, please refer to k8s.io/kubernetes/cmd/cloud-controller-manager/main.go
// Package main provides the CCM implementation.
package main
@@ -26,6 +22,7 @@ import (
"github.com/spf13/pflag"
kcmnames "github.com/siderolabs/talos-cloud-controller-manager/pkg/names"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/talos"
"k8s.io/apimachinery/pkg/util/wait"
@@ -48,8 +45,26 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
controllerInitializers := app.DefaultInitFuncConstructors
controllerAliases := names.CCMControllerAliases()
nodeIpamController := nodeIPAMController{}
nodeIpamController.nodeIPAMControllerOptions.NodeIPAMControllerConfiguration = &nodeIpamController.nodeIPAMControllerConfiguration
fss := cliflag.NamedFlagSets{}
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, app.DefaultInitFuncConstructors, names.CCMControllerAliases(), fss, wait.NeverStop)
nodeIpamController.nodeIPAMControllerOptions.AddFlags(fss.FlagSet(kcmnames.NodeIpamController))
controllerInitializers[kcmnames.NodeIpamController] = app.ControllerInitFuncConstructor{
// "node-controller" is the shared identity of all node controllers, including node, node lifecycle, and node ipam.
// See https://github.com/kubernetes/kubernetes/pull/72764#issuecomment-453300990 for more context.
InitContext: app.ControllerInitContext{
ClientName: "node-controller",
},
Constructor: nodeIpamController.startNodeIpamControllerWrapper,
}
app.ControllersDisabledByDefault.Insert(kcmnames.NodeIpamController)
controllerAliases["nodeipam"] = kcmnames.NodeIpamController
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, controllerAliases, fss, wait.NeverStop)
command.Flags().VisitAll(func(flag *pflag.Flag) {
if flag.Name == "cloud-provider" {

View File

@@ -0,0 +1,250 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main // copy from kubernetes/cmd/cloud-controller-manager/nodeipamcontroller.go
import (
"context"
"errors"
"fmt"
"net"
"strings"
nodeipamcontrolleroptions "github.com/siderolabs/talos-cloud-controller-manager/cmd/talos-cloud-controller-manager/options"
nodeipamcontroller "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam"
nodeipamconfig "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/config"
ipam "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/ipam"
talosclient "github.com/siderolabs/talos-cloud-controller-manager/pkg/talosclient"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
)
const (
// defaultNodeMaskCIDRIPv4 is default mask size for IPv4 node cidr.
defaultNodeMaskCIDRIPv4 = 24
// defaultNodeMaskCIDRIPv6 is default mask size for IPv6 node cidr.
defaultNodeMaskCIDRIPv6 = 80
)
type nodeIPAMController struct {
nodeIPAMControllerConfiguration nodeipamconfig.NodeIPAMControllerConfiguration
nodeIPAMControllerOptions nodeipamcontrolleroptions.NodeIPAMControllerOptions
}
func (nodeIpamController *nodeIPAMController) startNodeIpamControllerWrapper(
initContext app.ControllerInitContext,
completedConfig *cloudcontrollerconfig.CompletedConfig,
cloud cloudprovider.Interface,
) app.InitFunc {
klog.V(4).InfoS("nodeIpamController.startNodeIpamControllerWrapper() called")
allErrors := nodeIpamController.nodeIPAMControllerOptions.Validate()
if len(allErrors) > 0 {
klog.Fatal("NodeIPAM controller values are not properly set.")
}
nodeIpamController.nodeIPAMControllerOptions.ApplyTo(&nodeIpamController.nodeIPAMControllerConfiguration) //nolint:errcheck
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startNodeIpamController(ctx, initContext, completedConfig, nodeIpamController.nodeIPAMControllerConfiguration, controllerContext, cloud)
}
}
func startNodeIpamController(
ctx context.Context,
initContext app.ControllerInitContext,
ccmConfig *cloudcontrollerconfig.CompletedConfig,
nodeIPAMConfig nodeipamconfig.NodeIPAMControllerConfiguration,
controllerCtx genericcontrollermanager.ControllerContext,
cloud cloudprovider.Interface,
) (controller.Interface, bool, error) {
// should we start nodeIPAM
if !ccmConfig.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
return nil, false, nil
}
talos, err := talosclient.New(ctx)
if err != nil {
return nil, false, err
}
if ccmConfig.ComponentConfig.KubeCloudShared.ClusterCIDR == "" {
clusterCIDRs, err := talos.GetPodCIDRs(ctx)
if err != nil {
return nil, false, err
}
ccmConfig.ComponentConfig.KubeCloudShared.ClusterCIDR = strings.Join(clusterCIDRs, ",")
}
// failure: bad cidrs in config
clusterCIDRs, dualStack, err := processCIDRs(ccmConfig.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
return nil, false, err
}
// failure: more than one cidr but they are not configured as dual stack
if len(clusterCIDRs) > 1 && !dualStack {
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
}
// failure: more than cidrs is not allowed even with dual stack
if len(clusterCIDRs) > 2 {
return nil, false, fmt.Errorf("len of clusters is:%v > more than max allowed of 2", len(clusterCIDRs))
}
svcCIDRs, err := talos.GetServiceCIDRs(ctx)
if err != nil {
return nil, false, err
}
serviceCIDRs, err := netutils.ParseCIDRs(svcCIDRs)
if err != nil {
return nil, false, err
}
nodeIPAMConfig.ServiceCIDR = svcCIDRs[0]
if len(svcCIDRs) > 1 {
nodeIPAMConfig.SecondaryServiceCIDR = svcCIDRs[1]
}
nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(nodeIPAMConfig, clusterCIDRs)
if err != nil {
return nil, false, err
}
klog.V(4).InfoS("nodeIpamController called", "clusterCIDRs", clusterCIDRs, "serviceCIDRs", serviceCIDRs, "nodeCIDRMaskSizes", nodeCIDRMaskSizes)
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx,
controllerCtx.InformerFactory.Core().V1().Nodes(),
cloud,
controllerCtx.ClientBuilder.ClientOrDie(initContext.ClientName),
clusterCIDRs,
serviceCIDRs,
nodeCIDRMaskSizes,
ipam.CIDRAllocatorType(ccmConfig.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
)
if err != nil {
return nil, true, err
}
go nodeIpamController.Run(ctx)
return nil, true, nil
}
// processCIDRs is a helper function that works on a comma separated cidrs and returns
// a list of typed cidrs
// a flag if cidrs represents a dual stack
// error if failed to parse any of the cidrs.
func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) {
cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",")
cidrs, err := netutils.ParseCIDRs(cidrsSplit)
if err != nil {
return nil, false, err
}
// if cidrs has an error then the previous call will fail
// safe to ignore error checking on next call
dualstack, _ := netutils.IsDualStackCIDRs(cidrs) //nolint:errcheck
return cidrs, dualstack, nil
}
// setNodeCIDRMaskSizes returns the IPv4 and IPv6 node cidr mask sizes to the value provided
// for --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 respectively. If value not provided,
// then it will return default IPv4 and IPv6 cidr mask sizes.
func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, clusterCIDRs []*net.IPNet) ([]int, error) {
sortedSizes := func(maskSizeIPv4, maskSizeIPv6 int) []int {
nodeMaskCIDRs := make([]int, len(clusterCIDRs))
for idx, clusterCIDR := range clusterCIDRs {
if netutils.IsIPv6CIDR(clusterCIDR) {
nodeMaskCIDRs[idx] = maskSizeIPv6
} else {
nodeMaskCIDRs[idx] = maskSizeIPv4
}
}
return nodeMaskCIDRs
}
// --node-cidr-mask-size flag is incompatible with dual stack clusters.
ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
isDualstack := len(clusterCIDRs) > 1
// case one: cluster is dualstack (i.e, more than one cidr)
if isDualstack {
// if --node-cidr-mask-size then fail, user must configure the correct dual-stack mask sizes (or use default)
if cfg.NodeCIDRMaskSize != 0 {
return nil, errors.New("usage of --node-cidr-mask-size is not allowed with dual-stack clusters")
}
if cfg.NodeCIDRMaskSizeIPv4 != 0 {
ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
}
if cfg.NodeCIDRMaskSizeIPv6 != 0 {
ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
}
return sortedSizes(ipv4Mask, ipv6Mask), nil
}
maskConfigured := cfg.NodeCIDRMaskSize != 0
maskV4Configured := cfg.NodeCIDRMaskSizeIPv4 != 0
maskV6Configured := cfg.NodeCIDRMaskSizeIPv6 != 0
isSingleStackIPv6 := netutils.IsIPv6CIDR(clusterCIDRs[0])
// original flag is set
if maskConfigured {
// original mask flag is still the main reference.
if maskV4Configured || maskV6Configured {
return nil, errors.New("usage of --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 is not allowed if --node-cidr-mask-size is set. For dual-stack clusters please unset it and use IPFamily specific flags") //nolint:lll
}
mask := int(cfg.NodeCIDRMaskSize)
return sortedSizes(mask, mask), nil
}
if maskV4Configured {
if isSingleStackIPv6 {
return nil, errors.New("usage of --node-cidr-mask-size-ipv4 is not allowed for a single-stack IPv6 cluster")
}
ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
}
// !maskV4Configured && !maskConfigured && maskV6Configured
if maskV6Configured {
if !isSingleStackIPv6 {
return nil, errors.New("usage of --node-cidr-mask-size-ipv6 is not allowed for a single-stack IPv4 cluster")
}
ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
}
return sortedSizes(ipv4Mask, ipv6Mask), nil
}

View File

@@ -0,0 +1,79 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options // copy from kubernetes/cmd/kube-controller-manager/app/options/nodeipamcontroller.go
import (
"fmt"
"strings"
"github.com/spf13/pflag"
nodeipamconfig "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/config"
)
// NodeIPAMControllerOptions holds the NodeIpamController options.
type NodeIPAMControllerOptions struct {
*nodeipamconfig.NodeIPAMControllerConfiguration
}
// AddFlags adds flags related to NodeIpamController for controller manager to the specified FlagSet.
func (o *NodeIPAMControllerOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.StringVar(&o.ServiceCIDR, "service-cluster-ip-range", o.ServiceCIDR, "CIDR Range for Services in cluster. Requires --allocate-node-cidrs to be true")
fs.Int32Var(&o.NodeCIDRMaskSize, "node-cidr-mask-size", o.NodeCIDRMaskSize, "Mask size for node cidr in cluster. Default is 24 for IPv4 and 80 for IPv6.")
fs.Int32Var(&o.NodeCIDRMaskSizeIPv4, "node-cidr-mask-size-ipv4", o.NodeCIDRMaskSizeIPv4, "Mask size for IPv4 node cidr in dual-stack cluster. Default is 24.")
fs.Int32Var(&o.NodeCIDRMaskSizeIPv6, "node-cidr-mask-size-ipv6", o.NodeCIDRMaskSizeIPv6, "Mask size for IPv6 node cidr in dual-stack cluster. Default is 80.")
}
// ApplyTo fills up NodeIpamController config with options.
func (o *NodeIPAMControllerOptions) ApplyTo(cfg *nodeipamconfig.NodeIPAMControllerConfiguration) error {
if o == nil {
return nil
}
// split the cidrs list and assign primary and secondary
serviceCIDRList := strings.Split(o.ServiceCIDR, ",")
if len(serviceCIDRList) > 0 {
cfg.ServiceCIDR = serviceCIDRList[0]
}
if len(serviceCIDRList) > 1 {
cfg.SecondaryServiceCIDR = serviceCIDRList[1]
}
cfg.NodeCIDRMaskSize = o.NodeCIDRMaskSize
cfg.NodeCIDRMaskSizeIPv4 = o.NodeCIDRMaskSizeIPv4
cfg.NodeCIDRMaskSizeIPv6 = o.NodeCIDRMaskSizeIPv6
return nil
}
// Validate checks validation of NodeIPAMControllerOptions.
func (o *NodeIPAMControllerOptions) Validate() []error {
if o == nil {
return nil
}
errs := make([]error, 0)
serviceCIDRList := strings.Split(o.ServiceCIDR, ",")
if len(serviceCIDRList) > 2 {
errs = append(errs, fmt.Errorf("--service-cluster-ip-range can not contain more than two entries"))
}
return errs
}

View File

@@ -61,12 +61,6 @@ global:
# Parameter is optional, by default is "false"
approveNodeCSR: true
# The list of endpoints to connect to the Talos API (control-plane)
# Parameter is optional, by default the controller will discover the control-plane endpoint
endpoints:
- 1.2.3.4
- 4.3.2.1
# Transformations rules for nodes
transformations:
# All rules are applied in order, all matched rules are applied to the node

20
go.mod
View File

@@ -3,9 +3,10 @@ module github.com/siderolabs/talos-cloud-controller-manager
go 1.22.4
require (
github.com/cosi-project/runtime v0.4.5
github.com/cosi-project/runtime v0.5.0
github.com/siderolabs/go-retry v0.3.3
github.com/siderolabs/net v0.4.0
github.com/siderolabs/talos/pkg/machinery v1.7.4
github.com/siderolabs/talos/pkg/machinery v1.7.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
gopkg.in/yaml.v3 v3.0.1
@@ -15,13 +16,13 @@ require (
k8s.io/client-go v0.30.2
k8s.io/cloud-provider v0.30.2
k8s.io/component-base v0.30.2
k8s.io/klog/v2 v2.130.0
k8s.io/controller-manager v0.30.2
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0
)
require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect
@@ -34,8 +35,8 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.3.9 // indirect
github.com/containerd/go-cni v1.1.9 // indirect
github.com/containernetworking/cni v1.2.0 // indirect
github.com/containerd/go-cni v1.1.10 // indirect
github.com/containernetworking/cni v1.2.2 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@@ -119,8 +120,8 @@ require (
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
@@ -128,9 +129,8 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apiserver v0.30.2 // indirect
k8s.io/component-helpers v0.30.2 // indirect
k8s.io/controller-manager v0.30.2 // indirect
k8s.io/kms v0.30.2 // indirect
k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a // indirect
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect

38
go.sum
View File

@@ -3,8 +3,6 @@ cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2Qx
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/ProtonMail/go-crypto v0.0.0-20230717121422-5aa5874ade95/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
@@ -38,16 +36,16 @@ github.com/cloudflare/circl v1.3.9 h1:QFrlgFYf2Qpi8bSpVPK1HBvWpx16v/1TZivyo7pGuB
github.com/cloudflare/circl v1.3.9/go.mod h1:PDRU+oXvdD7KCtgKxW95M5Z8BpSCJXQORiZFnBQS5QU=
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipamEh2BpGYxScCH1TOF1LL1cXc=
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM=
github.com/containerd/go-cni v1.1.9 h1:ORi7P1dYzCwVM6XPN4n3CbkuOx/NZ2DOqy+SHRdo9rU=
github.com/containerd/go-cni v1.1.9/go.mod h1:XYrZJ1d5W6E2VOvjffL3IZq0Dz6bsVlERHbekNK90PM=
github.com/containernetworking/cni v1.2.0 h1:fEjhlfWwWAXEvlcMQu/i6z8DA0Kbu7EcmR5+zb6cm5I=
github.com/containernetworking/cni v1.2.0/go.mod h1:/r+vA/7vrynNfbvSP9g8tIKEoy6win7sALJAw4ZiJks=
github.com/containerd/go-cni v1.1.10 h1:c2U73nld7spSWfiJwSh/8W9DK+/qQwYM2rngIhCyhyg=
github.com/containerd/go-cni v1.1.10/go.mod h1:/Y/sL8yqYQn1ZG1om1OncJB1W4zN3YmjfP/ShCzG/OY=
github.com/containernetworking/cni v1.2.2 h1:9IbP6KJQQxVKo4hhnm8r50YcVKrJbJu3Dqw+Rbt1vYk=
github.com/containernetworking/cni v1.2.2/go.mod h1:DuLgF+aPd3DzcTQTtp/Nvl1Kim23oFKdm2okJzBQA5M=
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cosi-project/runtime v0.4.5 h1:WNEuUa6TDOhWMU3vno1/a1WErZNFrPzqNzwcoR4Aw8I=
github.com/cosi-project/runtime v0.4.5/go.mod h1:fucT88LYFzORKoXQ1SzITQ11nq0HlR10RAXW5jkPWHQ=
github.com/cosi-project/runtime v0.5.0 h1:Thin3w4ZQbRgHIv2h6a+zNGvp7N85eO8VEz0Kp859Jo=
github.com/cosi-project/runtime v0.5.0/go.mod h1:GH/EZSmCeMpOFNeZYZnlmdB14mj0zLyREA4PqOI0ubg=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
@@ -172,8 +170,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g=
github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk=
@@ -215,8 +213,8 @@ github.com/siderolabs/net v0.4.0 h1:1bOgVay/ijPkJz4qct98nHsiB/ysLQU0KLoBC4qLm7I=
github.com/siderolabs/net v0.4.0/go.mod h1:/ibG+Hm9HU27agp5r9Q3eZicEfjquzNzQNux5uEk0kM=
github.com/siderolabs/protoenc v0.2.1 h1:BqxEmeWQeMpNP3R6WrPqDatX8sM/r4t97OP8mFmg6GA=
github.com/siderolabs/protoenc v0.2.1/go.mod h1:StTHxjet1g11GpNAWiATgc8K0HMKiFSEVVFOa/H0otc=
github.com/siderolabs/talos/pkg/machinery v1.7.4 h1:/LP1m7iIzpuTuiG+0aWJpJCdrC4K48btT+CgLXYjUqk=
github.com/siderolabs/talos/pkg/machinery v1.7.4/go.mod h1:85iUG7/XS654ki2Rkk7kTEU8YsnNhj6vAr7TnpbOebk=
github.com/siderolabs/talos/pkg/machinery v1.7.5 h1:M02UZSDfN0BB4bXhTYDjEmVvAIX1GsAS45cyKh6+HHU=
github.com/siderolabs/talos/pkg/machinery v1.7.5/go.mod h1:OeamhNo92c3V96bddZNhcCgoRyzw2KWBtpma1lfchtg=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
@@ -370,10 +368,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY=
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 h1:QW9+G6Fir4VcRXVH8x3LilNAb6cxBGLa6+GM4hRwexE=
google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3/go.mod h1:kdrSS/OiLkPrNUpzD4aHgCq2rVuC/YRxok32HXZ4vRE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 h1:9Xyg6I9IWQZhRVfCWjKK+l6kI0jHcPesVlMnT//aHNo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d h1:Aqf0fiIdUQEj0Gn9mKFFXoQfTTEaNopWpfVyYADxiSg=
google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Od4k8V1LQSizPRUK4OzZ7TBE/20k+jPczUDAEyvn69Y=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
@@ -409,12 +407,12 @@ k8s.io/component-helpers v0.30.2 h1:kDMYLiWEYeWU7H6jBI+Ua1i2hqNh0DzqDHNIppFC3po=
k8s.io/component-helpers v0.30.2/go.mod h1:tI0anfS6AbRqooaICkGg7UVAQLedOauVSQW9srDBnJw=
k8s.io/controller-manager v0.30.2 h1:tC7V7IdGUW2I4de3bXx4m2fS3naP7VlCYlECCajK9fU=
k8s.io/controller-manager v0.30.2/go.mod h1:CYltIHGhCgldEkXT5vS2JHCCWM1WyBI4kA2UfP9cZvY=
k8s.io/klog/v2 v2.130.0 h1:5nB3+3HpqKqXJIXNtJdtxcDCfaa9KL8StJgMzGJkUkM=
k8s.io/klog/v2 v2.130.0/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kms v0.30.2 h1:VSZILO/tkzrz5Tu2j+yFQZ2Dc5JerQZX2GqhFJbQrfw=
k8s.io/kms v0.30.2/go.mod h1:GrMurD0qk3G4yNgGcsCEmepqf9KyyIrTXYR2lyUOJC4=
k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA=
k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc=
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b h1:Q9xmGWBvOGd8UJyccgpYlLosk/JlfP3xQLNkQlHJeXw=
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc=
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak=
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=

View File

@@ -0,0 +1,87 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package names // import "k8s.io/kubernetes/cmd/kube-controller-manager/names"
// Canonical controller names
//
// NAMING CONVENTIONS
// 1. naming should be consistent across the controllers
// 2. use of shortcuts should be avoided, unless they are well-known non-Kubernetes shortcuts
// 3. Kubernetes' resources should be written together without a hyphen ("-")
//
// CHANGE POLICY
// The controller names should be treated as IDs.
// They can only be changed if absolutely necessary. For example if an inappropriate name was chosen in the past, or if the scope of the controller changes.
// When a name is changed, the old name should be aliased in app.ControllerDescriptor#GetAliases, while preserving all old aliases.
// This is done to achieve backwards compatibility
//
// USE CASES
// The following places should use the controller name constants, when:
// 1. defining a new app.ControllerDescriptor so it can be used in app.NewControllerDescriptors or app.KnownControllers:
// 2. used anywhere inside the controller itself:
// 2.1. [TODO] logging should use a canonical controller name when referencing a controller (Eg. Starting X, Shutting down X)
// 2.2. [TODO] emitted events should have an EventSource.Component set to the controller name (usually when initializing an EventRecorder)
// 2.3. [TODO] registering ControllerManagerMetrics with ControllerStarted and ControllerStopped
// 2.4. [TODO] calling WaitForNamedCacheSync
// 3. defining controller options for "--help" command or generated documentation
// 3.1. controller name should be used to create a pflag.FlagSet when registering controller options (the name is rendered in a controller flag group header) in options.KubeControllerManagerOptions
// 3.2. when defined flag's help mentions a controller name
// 4. defining a new service account for a new controller (old controllers may have inconsistent service accounts to stay backwards compatible)
const (
ServiceAccountTokenController = "serviceaccount-token-controller"
EndpointsController = "endpoints-controller"
EndpointSliceController = "endpointslice-controller"
EndpointSliceMirroringController = "endpointslice-mirroring-controller"
ReplicationControllerController = "replicationcontroller-controller"
PodGarbageCollectorController = "pod-garbage-collector-controller"
ResourceQuotaController = "resourcequota-controller"
NamespaceController = "namespace-controller"
ServiceAccountController = "serviceaccount-controller"
GarbageCollectorController = "garbage-collector-controller"
DaemonSetController = "daemonset-controller"
JobController = "job-controller"
DeploymentController = "deployment-controller"
ReplicaSetController = "replicaset-controller"
HorizontalPodAutoscalerController = "horizontal-pod-autoscaler-controller"
DisruptionController = "disruption-controller"
StatefulSetController = "statefulset-controller"
CronJobController = "cronjob-controller"
CertificateSigningRequestSigningController = "certificatesigningrequest-signing-controller"
CertificateSigningRequestApprovingController = "certificatesigningrequest-approving-controller"
CertificateSigningRequestCleanerController = "certificatesigningrequest-cleaner-controller"
TTLController = "ttl-controller"
BootstrapSignerController = "bootstrap-signer-controller"
TokenCleanerController = "token-cleaner-controller"
NodeIpamController = "node-ipam-controller"
NodeLifecycleController = "node-lifecycle-controller"
TaintEvictionController = "taint-eviction-controller"
PersistentVolumeBinderController = "persistentvolume-binder-controller"
PersistentVolumeAttachDetachController = "persistentvolume-attach-detach-controller"
PersistentVolumeExpanderController = "persistentvolume-expander-controller"
ClusterRoleAggregationController = "clusterrole-aggregation-controller"
PersistentVolumeClaimProtectionController = "persistentvolumeclaim-protection-controller"
PersistentVolumeProtectionController = "persistentvolume-protection-controller"
TTLAfterFinishedController = "ttl-after-finished-controller"
RootCACertificatePublisherController = "root-ca-certificate-publisher-controller"
EphemeralVolumeController = "ephemeral-volume-controller"
StorageVersionGarbageCollectorController = "storageversion-garbage-collector-controller"
ResourceClaimController = "resourceclaim-controller"
LegacyServiceAccountTokenCleanerController = "legacy-serviceaccount-token-cleaner-controller"
ValidatingAdmissionPolicyStatusController = "validatingadmissionpolicy-status-controller"
ServiceCIDRController = "service-cidr-controller"
StorageVersionMigratorController = "storage-version-migrator-controller"
)

View File

@@ -0,0 +1,19 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
package config // import "k8s.io/kubernetes/pkg/controller/nodeipam/config"

View File

@@ -0,0 +1,34 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// NodeIPAMControllerConfiguration contains elements describing NodeIPAMController.
type NodeIPAMControllerConfiguration struct {
// serviceCIDR is CIDR Range for Services in cluster.
ServiceCIDR string
// secondaryServiceCIDR is CIDR Range for Services in cluster. This is used in dual stack clusters. SecondaryServiceCIDR must be of different IP family than ServiceCIDR
SecondaryServiceCIDR string
// NodeCIDRMaskSize is the mask size for node cidr in single-stack cluster.
// This can be used only with single stack clusters and is incompatible with dual stack clusters.
NodeCIDRMaskSize int32
// NodeCIDRMaskSizeIPv4 is the mask size for IPv4 node cidr in dual-stack cluster.
// This can be used only with dual stack clusters and is incompatible with single stack clusters.
NodeCIDRMaskSizeIPv4 int32
// NodeCIDRMaskSizeIPv6 is the mask size for IPv6 node cidr in dual-stack cluster.
// This can be used only with dual stack clusters and is incompatible with single stack clusters.
NodeCIDRMaskSizeIPv6 int32
}

19
pkg/nodeipam/doc.go Normal file
View File

@@ -0,0 +1,19 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package nodeipam contains code for syncing cloud instances with
// node registry
package nodeipam // import "k8s.io/kubernetes/pkg/controller/nodeipam"

View File

@@ -0,0 +1,150 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package ipam implement IPAM for node CIDR range allocation.
package ipam // copy from kubernetes/pkg/controller/nodeipam/ipam/cidr_allocator.go
import (
"context"
"fmt"
"net"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
)
// CIDRAllocatorType is the type of the allocator to use.
type CIDRAllocatorType string
const (
// RangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations.
RangeAllocatorType CIDRAllocatorType = "RangeAllocator"
// CloudAllocatorType is the allocator that uses cloud platform
// support to do node CIDR range allocations.
CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
)
// TODO: figure out the good setting for those constants.
const (
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
// The no. of NodeSpec updates NC can process concurrently.
cidrUpdateWorkers = 30
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
cidrUpdateRetries = 3
)
// nodePollInterval is used in listing node.
var nodePollInterval = 10 * time.Second
// CIDRAllocator is an interface implemented by things that know how
// to allocate/occupy/recycle CIDR for nodes.
type CIDRAllocator interface {
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid
// CIDR if it doesn't currently have one or mark the CIDR as used if
// the node already have one.
AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error
// ReleaseCIDR releases the CIDR of the removed node.
ReleaseCIDR(logger klog.Logger, node *v1.Node) error
// Run starts all the working logic of the allocator.
Run(ctx context.Context)
}
// CIDRAllocatorParams is parameters that's required for creating new
// cidr range allocator.
type CIDRAllocatorParams struct {
// ClusterCIDRs is list of cluster cidrs.
ClusterCIDRs []*net.IPNet
// ServiceCIDR is primary service cidr for cluster.
ServiceCIDR *net.IPNet
// SecondaryServiceCIDR is secondary service cidr for cluster.
SecondaryServiceCIDR *net.IPNet
// NodeCIDRMaskSizes is list of node cidr mask sizes.
NodeCIDRMaskSizes []int
}
// New creates a new CIDR range allocator.
func New(
ctx context.Context,
kubeClient clientset.Interface,
_ cloudprovider.Interface,
nodeInformer informers.NodeInformer,
allocatorType CIDRAllocatorType,
allocatorParams CIDRAllocatorParams,
) (CIDRAllocator, error) {
nodeList, err := listNodes(ctx, kubeClient)
if err != nil {
return nil, err
}
switch allocatorType {
case RangeAllocatorType:
return NewCIDRRangeAllocator(ctx, kubeClient, nodeInformer, allocatorParams, nodeList)
case CloudAllocatorType:
return NewCIDRCloudAllocator(ctx, kubeClient, nodeInformer, allocatorParams, nodeList)
default:
return nil, fmt.Errorf("invalid CIDR allocator type: %v", allocatorType)
}
}
func listNodes(ctx context.Context, kubeClient clientset.Interface) (*v1.NodeList, error) {
var nodeList *v1.NodeList
logger := klog.FromContext(ctx)
// We must poll because apiserver might not be up. This error causes
// controller manager to restart.
if pollErr := wait.PollUntilContextTimeout(ctx, nodePollInterval, apiserverStartupGracePeriod, true, func(ctx context.Context) (bool, error) {
var err error
nodeList, err = kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
logger.Error(err, "Failed to list all nodes")
return false, nil
}
return true, nil
}); pollErr != nil {
return nil, fmt.Errorf("failed to list all nodes in %v, cannot proceed without updating CIDR map",
apiserverStartupGracePeriod)
}
return nodeList, nil
}
// ipnetToStringList converts a slice of net.IPNet into a list of CIDR in string format.
func ipnetToStringList(inCIDRs []*net.IPNet) []string {
outCIDRs := make([]string, len(inCIDRs))
for idx, inCIDR := range inCIDRs {
outCIDRs[idx] = inCIDR.String()
}
return outCIDRs
}

View File

@@ -0,0 +1,297 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cidrset
import (
"encoding/binary"
"errors"
"fmt"
"math/big"
"math/bits"
"net"
"sync"
)
// CidrSet manages a set of CIDR ranges from which blocks of IPs can
// be allocated from.
type CidrSet struct {
sync.Mutex
// clusterCIDR is the CIDR assigned to the cluster
clusterCIDR *net.IPNet
// clusterMaskSize is the mask size, in bits, assigned to the cluster
// caches the mask size to avoid the penalty of calling clusterCIDR.Mask.Size()
clusterMaskSize int
// nodeMask is the network mask assigned to the nodes
nodeMask net.IPMask
// nodeMaskSize is the mask size, in bits,assigned to the nodes
// caches the mask size to avoid the penalty of calling nodeMask.Size()
nodeMaskSize int
// maxCIDRs is the maximum number of CIDRs that can be allocated
maxCIDRs int
// allocatedCIDRs counts the number of CIDRs allocated
allocatedCIDRs int
// nextCandidate points to the next CIDR that should be free
nextCandidate int
// used is a bitmap used to track the CIDRs allocated
used big.Int
// label is used to identify the metrics
label string
}
const (
// The subnet mask size cannot be greater than 16 more than the cluster mask size
// TODO: https://github.com/kubernetes/kubernetes/issues/44918
// clusterSubnetMaxDiff limited to 16 due to the uncompressed bitmap
// Due to this limitation the subnet mask for IPv6 cluster cidr needs to be >= 48
// as default mask size for IPv6 is 64.
clusterSubnetMaxDiff = 16
// halfIPv6Len is the half of the IPv6 length
halfIPv6Len = net.IPv6len / 2
)
var (
// ErrCIDRRangeNoCIDRsRemaining occurs when there is no more space
// to allocate CIDR ranges.
ErrCIDRRangeNoCIDRsRemaining = errors.New(
"CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
// ErrCIDRSetSubNetTooBig occurs when the subnet mask size is too
// big compared to the CIDR mask size.
ErrCIDRSetSubNetTooBig = errors.New(
"New CIDR set failed; the node CIDR size is too big")
)
// NewCIDRSet creates a new CidrSet.
func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) (*CidrSet, error) {
clusterMask := clusterCIDR.Mask
clusterMaskSize, bits := clusterMask.Size()
if (clusterCIDR.IP.To4() == nil) && (subNetMaskSize-clusterMaskSize > clusterSubnetMaxDiff) {
return nil, ErrCIDRSetSubNetTooBig
}
// register CidrSet metrics
registerCidrsetMetrics()
maxCIDRs := getMaxCIDRs(subNetMaskSize, clusterMaskSize)
cidrSet := &CidrSet{
clusterCIDR: clusterCIDR,
nodeMask: net.CIDRMask(subNetMaskSize, bits),
clusterMaskSize: clusterMaskSize,
maxCIDRs: maxCIDRs,
nodeMaskSize: subNetMaskSize,
label: clusterCIDR.String(),
}
cidrSetMaxCidrs.WithLabelValues(cidrSet.label).Set(float64(maxCIDRs))
return cidrSet, nil
}
func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet {
var ip []byte
switch /*v4 or v6*/ {
case s.clusterCIDR.IP.To4() != nil:
{
j := uint32(index) << uint32(32-s.nodeMaskSize)
ipInt := (binary.BigEndian.Uint32(s.clusterCIDR.IP)) | j
ip = make([]byte, net.IPv4len)
binary.BigEndian.PutUint32(ip, ipInt)
}
case s.clusterCIDR.IP.To16() != nil:
{
// leftClusterIP | rightClusterIP
// 2001:0DB8:1234:0000:0000:0000:0000:0000
const v6NBits = 128
const halfV6NBits = v6NBits / 2
leftClusterIP := binary.BigEndian.Uint64(s.clusterCIDR.IP[:halfIPv6Len])
rightClusterIP := binary.BigEndian.Uint64(s.clusterCIDR.IP[halfIPv6Len:])
ip = make([]byte, net.IPv6len)
if s.nodeMaskSize <= halfV6NBits {
// We only care about left side IP
leftClusterIP |= uint64(index) << uint(halfV6NBits-s.nodeMaskSize)
} else {
if s.clusterMaskSize < halfV6NBits {
// see how many bits are needed to reach the left side
btl := uint(s.nodeMaskSize - halfV6NBits)
indexMaxBit := uint(64 - bits.LeadingZeros64(uint64(index)))
if indexMaxBit > btl {
leftClusterIP |= uint64(index) >> btl
}
}
// the right side will be calculated the same way either the
// subNetMaskSize affects both left and right sides
rightClusterIP |= uint64(index) << uint(v6NBits-s.nodeMaskSize)
}
binary.BigEndian.PutUint64(ip[:halfIPv6Len], leftClusterIP)
binary.BigEndian.PutUint64(ip[halfIPv6Len:], rightClusterIP)
}
}
return &net.IPNet{
IP: ip,
Mask: s.nodeMask,
}
}
func (s *CidrSet) String() string {
return fmt.Sprintf("CIDRSet{used: %d}", s.allocatedCIDRs)
}
// AllocateNext allocates the next free CIDR range. This will set the range
// as occupied and return the allocated range.
func (s *CidrSet) AllocateNext() (*net.IPNet, error) {
s.Lock()
defer s.Unlock()
if s.allocatedCIDRs == s.maxCIDRs {
return nil, ErrCIDRRangeNoCIDRsRemaining
}
candidate := s.nextCandidate
var i int
for i = 0; i < s.maxCIDRs; i++ {
if s.used.Bit(candidate) == 0 {
break
}
candidate = (candidate + 1) % s.maxCIDRs
}
s.nextCandidate = (candidate + 1) % s.maxCIDRs
s.used.SetBit(&s.used, candidate, 1)
s.allocatedCIDRs++
// Update metrics
cidrSetAllocations.WithLabelValues(s.label).Inc()
cidrSetAllocationTriesPerRequest.WithLabelValues(s.label).Observe(float64(i))
cidrSetUsage.WithLabelValues(s.label).Set(float64(s.allocatedCIDRs) / float64(s.maxCIDRs))
return s.indexToCIDRBlock(candidate), nil
}
func (s *CidrSet) getBeginningAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
if cidr == nil {
return -1, -1, fmt.Errorf("error getting indices for cluster cidr %v, cidr is nil", s.clusterCIDR)
}
begin, end = 0, s.maxCIDRs-1
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
var ipSize int
if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) {
return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
}
if s.clusterMaskSize < maskSize {
ipSize = net.IPv4len
if cidr.IP.To4() == nil {
ipSize = net.IPv6len
}
begin, err = s.getIndexForIP(cidr.IP.Mask(s.nodeMask))
if err != nil {
return -1, -1, err
}
ip := make([]byte, ipSize)
if cidr.IP.To4() != nil {
ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask))
binary.BigEndian.PutUint32(ip, ipInt)
} else {
// ipIntLeft | ipIntRight
// 2001:0DB8:1234:0000:0000:0000:0000:0000
ipIntLeft := binary.BigEndian.Uint64(cidr.IP[:net.IPv6len/2]) | (^binary.BigEndian.Uint64(cidr.Mask[:net.IPv6len/2]))
ipIntRight := binary.BigEndian.Uint64(cidr.IP[net.IPv6len/2:]) | (^binary.BigEndian.Uint64(cidr.Mask[net.IPv6len/2:]))
binary.BigEndian.PutUint64(ip[:net.IPv6len/2], ipIntLeft)
binary.BigEndian.PutUint64(ip[net.IPv6len/2:], ipIntRight)
}
end, err = s.getIndexForIP(net.IP(ip).Mask(s.nodeMask))
if err != nil {
return -1, -1, err
}
}
return begin, end, nil
}
// Release releases the given CIDR range.
func (s *CidrSet) Release(cidr *net.IPNet) error {
begin, end, err := s.getBeginningAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
// Only change the counters if we change the bit to prevent
// double counting.
if s.used.Bit(i) != 0 {
s.used.SetBit(&s.used, i, 0)
s.allocatedCIDRs--
cidrSetReleases.WithLabelValues(s.label).Inc()
}
}
cidrSetUsage.WithLabelValues(s.label).Set(float64(s.allocatedCIDRs) / float64(s.maxCIDRs))
return nil
}
// Occupy marks the given CIDR range as used. Occupy succeeds even if the CIDR
// range was previously used.
func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginningAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
// Only change the counters if we change the bit to prevent
// double counting.
if s.used.Bit(i) == 0 {
s.used.SetBit(&s.used, i, 1)
s.allocatedCIDRs++
cidrSetAllocations.WithLabelValues(s.label).Inc()
}
}
cidrSetUsage.WithLabelValues(s.label).Set(float64(s.allocatedCIDRs) / float64(s.maxCIDRs))
return nil
}
func (s *CidrSet) getIndexForIP(ip net.IP) (int, error) {
if ip.To4() != nil {
cidrIndex := (binary.BigEndian.Uint32(s.clusterCIDR.IP) ^ binary.BigEndian.Uint32(ip.To4())) >> uint32(32-s.nodeMaskSize)
if cidrIndex >= uint32(s.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.nodeMaskSize)
}
return int(cidrIndex), nil
}
if ip.To16() != nil {
bigIP := big.NewInt(0).SetBytes(s.clusterCIDR.IP)
bigIP = bigIP.Xor(bigIP, big.NewInt(0).SetBytes(ip))
cidrIndexBig := bigIP.Rsh(bigIP, uint(net.IPv6len*8-s.nodeMaskSize))
cidrIndex := cidrIndexBig.Uint64()
if cidrIndex >= uint64(s.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.nodeMaskSize)
}
return int(cidrIndex), nil
}
return 0, fmt.Errorf("invalid IP: %v", ip)
}
// getMaxCIDRs returns the max number of CIDRs that can be obtained by subdividing a mask of size `clusterMaskSize`
// into subnets with mask of size `subNetMaskSize`.
func getMaxCIDRs(subNetMaskSize, clusterMaskSize int) int {
return 1 << uint32(subNetMaskSize-clusterMaskSize)
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,89 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cidrset
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const nodeIpamSubsystem = "node_ipam_controller"
var (
cidrSetAllocations = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: nodeIpamSubsystem,
Name: "cidrset_cidrs_allocations_total",
Help: "Counter measuring total number of CIDR allocations.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
cidrSetReleases = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: nodeIpamSubsystem,
Name: "cidrset_cidrs_releases_total",
Help: "Counter measuring total number of CIDR releases.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
// This is a gauge, as in theory, a limit can increase or decrease.
cidrSetMaxCidrs = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: nodeIpamSubsystem,
Name: "cirdset_max_cidrs",
Help: "Maximum number of CIDRs that can be allocated.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
cidrSetUsage = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: nodeIpamSubsystem,
Name: "cidrset_usage_cidrs",
Help: "Gauge measuring percentage of allocated CIDRs.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
cidrSetAllocationTriesPerRequest = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: nodeIpamSubsystem,
Name: "cidrset_allocation_tries_per_request",
Help: "Number of endpoints added on each Service sync",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(1, 5, 5),
},
[]string{"clusterCIDR"},
)
)
var registerMetrics sync.Once
// registerCidrsetMetrics the metrics that are to be monitored.
func registerCidrsetMetrics() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(cidrSetAllocations)
legacyregistry.MustRegister(cidrSetReleases)
legacyregistry.MustRegister(cidrSetMaxCidrs)
legacyregistry.MustRegister(cidrSetUsage)
legacyregistry.MustRegister(cidrSetAllocationTriesPerRequest)
})
}

View File

@@ -0,0 +1,648 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"time"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/ipam/cidrset"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/talosclient"
controllerutil "github.com/siderolabs/talos-cloud-controller-manager/pkg/utils/controller/node"
utilnode "github.com/siderolabs/talos-cloud-controller-manager/pkg/utils/node"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudproviderapi "k8s.io/cloud-provider/api"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
)
type cloudAllocator struct {
client clientset.Interface
// cluster cidrs as passed in during controller creation
clusterCIDRs []*net.IPNet
// for each entry in cidrSets we maintain a list of what is used and what is not
lock sync.Mutex
cidrSets map[netip.Prefix]*cidrset.CidrSet
// nodeLister is able to list/get nodes and is populated by the shared informer passed to controller
nodeLister corelisters.NodeLister
// nodesSynced returns true if the node shared informer has been synced at least once.
nodesSynced cache.InformerSynced
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// queues are where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
queue workqueue.RateLimitingInterface
}
var (
_ CIDRAllocator = &cloudAllocator{}
// UninitializedTaint is the taint that is added to nodes that are not yet initialized.
UninitializedTaint = &v1.Taint{
Key: cloudproviderapi.TaintExternalCloudProvider,
Effect: v1.TaintEffectNoSchedule,
}
)
// NewCIDRCloudAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs)
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
// Caller must always pass in a list of existing nodes so the new allocator.
// Caller must ensure that ClusterCIDRs are semantically correct e.g (1 for non DualStack, 2 for DualStack etc..)
// can initialize its CIDR map. NodeList is only nil in testing.
//
//nolint:wsl,predeclared,revive,errcheck
func NewCIDRCloudAllocator(
ctx context.Context,
client clientset.Interface,
nodeInformer informers.NodeInformer,
allocatorParams CIDRAllocatorParams,
nodeList *v1.NodeList,
) (CIDRAllocator, error) {
logger := klog.FromContext(ctx)
if client == nil {
logger.Error(nil, "kubeClient is nil when starting CIDRRangeAllocator")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
// create a cidrSet for each cidr we operate on
// cidrSet are mapped to clusterCIDR by CIDR
cidrSets := make(map[netip.Prefix]*cidrset.CidrSet, len(allocatorParams.ClusterCIDRs))
for idx, cidr := range allocatorParams.ClusterCIDRs {
cidrSet, err := cidrset.NewCIDRSet(cidr, allocatorParams.NodeCIDRMaskSizes[idx])
if err != nil {
return nil, err
}
cidrSets[netip.MustParsePrefix(cidr.String())] = cidrSet
}
ra := &cloudAllocator{
client: client,
clusterCIDRs: allocatorParams.ClusterCIDRs,
cidrSets: cidrSets,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
broadcaster: eventBroadcaster,
recorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cidrallocator_node"),
}
if allocatorParams.ServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR)
} else {
logger.Info("No Service CIDR provided. Skipping filtering out service addresses")
}
if allocatorParams.SecondaryServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR)
} else {
logger.Info("No Secondary Service CIDR provided. Skipping filtering out secondary service addresses")
}
if nodeList != nil {
for _, node := range nodeList.Items {
if len(node.Spec.PodCIDRs) == 0 {
logger.V(4).Info("Node has no CIDR, ignoring", "node", klog.KObj(&node))
continue
}
logger.V(4).Info("Node has CIDR, occupying it in CIDR map", "node", klog.KObj(&node), "podCIDRs", node.Spec.PodCIDRs)
if err := ra.occupyPodCIDRs(ctx, &node); err != nil {
// This will happen if:
// 1. We find garbage in the podCIDRs field. Retrying is useless.
// 2. CIDR out of range: This means a node CIDR has changed.
// This error will keep crashing controller-manager.
return nil, err
}
}
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ra.queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
ra.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// The informer cache no longer has the object, and since Node doesn't have a finalizer,
// we don't see the Update with DeletionTimestamp != 0.
// TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name
// and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures.
if err := ra.ReleaseCIDR(logger, obj.(*v1.Node)); err != nil {
utilruntime.HandleError(fmt.Errorf("error while processing CIDR Release: %w", err))
}
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ra.queue.Add(key)
}
},
})
return ra, nil
}
// Run starts the CIDRAllocator.
//
//nolint:dupl
func (r *cloudAllocator) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
r.broadcaster.StartStructuredLogging(3)
logger := klog.FromContext(ctx)
logger.Info("Sending events to api server")
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
defer r.broadcaster.Shutdown()
defer r.queue.ShutDown()
logger.Info("Starting range CIDR allocator")
defer logger.Info("Shutting down range CIDR allocator")
if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), r.nodesSynced) {
return
}
for i := 0; i < cidrUpdateWorkers; i++ {
go wait.UntilWithContext(ctx, r.runWorker, time.Second)
}
<-ctx.Done()
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// queue.
func (r *cloudAllocator) runWorker(ctx context.Context) {
for r.processNextNodeWorkItem(ctx) {
}
}
// processNextWorkItem will read a single work item off the queue and
// attempt to process it, by calling the syncHandler.
func (r *cloudAllocator) processNextNodeWorkItem(ctx context.Context) bool {
obj, shutdown := r.queue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer r.queue.Done.
err := func(logger klog.Logger, obj interface{}) error {
// We call Done here so the workNodeQueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the queue and attempted again after a back-off
// period.
defer r.queue.Done(obj)
var (
key string
ok bool
)
// We expect strings to come off the workNodeQueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workNodeQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workNodeQueue.
if key, ok = obj.(string); !ok {
// As the item in the workNodeQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
r.queue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncNode(ctx, key); err != nil {
// Put the item back on the queue to handle any transient errors.
r.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queue again until another change happens.
r.queue.Forget(obj)
logger.Info("Successfully synced", "key", key)
for k, cidrSet := range r.cidrSets {
logger.V(5).Info("IPAM status", "node", key, "subnet", k.String(), "size", cidrSet.String())
}
return nil
}(klog.FromContext(ctx), obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (r *cloudAllocator) syncNode(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := time.Now()
defer func() {
logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime))
}()
node, err := r.nodeLister.Get(key)
if apierrors.IsNotFound(err) {
logger.V(3).Info("node has been deleted", "node", key)
// TODO: obtain the node object information to call ReleaseCIDR from here
// and retry if there is an error.
return nil
}
if err != nil {
return err
}
// Check the DeletionTimestamp to determine if object is under deletion.
if !node.DeletionTimestamp.IsZero() {
logger.V(3).Info("node is being deleted", "node", key)
return r.ReleaseCIDR(logger, node)
}
for _, taint := range node.Spec.Taints {
if taint.MatchTaint(UninitializedTaint) {
logger.V(4).Info("Node has uninitialized taint, skipping CIDR allocation", "node", klog.KObj(node))
return nil
}
}
return r.AllocateOrOccupyCIDR(ctx, node)
}
// marks node.PodCIDRs[...] as used in allocator's tracked cidrSet.
func (r *cloudAllocator) occupyPodCIDRs(ctx context.Context, node *v1.Node) error {
if len(node.Spec.PodCIDRs) == 0 || len(node.Spec.PodCIDRs) > 2 {
return nil
}
for _, cidr := range node.Spec.PodCIDRs {
_, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
if err != nil {
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
ok, err := r.occupyCIDR(podCIDR)
if err != nil {
return fmt.Errorf("failed to mark cidr[%v] as occupied for node: %v: %v", podCIDR, node.Name, err)
}
if !ok {
_, err := r.defineNodeGlobalCIDRs(ctx, node)
if err != nil {
return fmt.Errorf("failed to find a CIDRSet for node %s, CIDR %s", node.Name, cidr)
}
}
}
return nil
}
func (r *cloudAllocator) occupyCIDR(cidr *net.IPNet) (bool, error) {
ip, _ := netip.AddrFromSlice(cidr.IP)
for k := range r.cidrSets {
if k.Contains(ip) {
if err := r.cidrSets[k].Occupy(cidr); err != nil {
return false, fmt.Errorf("failed to mark cidr %v as occupied in subnet %s: %v", cidr, k.String(), err)
}
return true, nil
}
}
return false, nil
}
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (r *cloudAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error {
if node == nil {
return nil
}
if len(node.Spec.PodCIDRs) > 0 {
return r.occupyPodCIDRs(ctx, node)
}
logger := klog.FromContext(ctx)
allocatedCIDRs := make([]*net.IPNet, len(r.clusterCIDRs))
globalCIDR, err := r.defineNodeGlobalCIDRs(ctx, node)
if err != nil {
return err
}
for idx, cidr := range r.clusterCIDRs {
ip := netip.MustParseAddr(cidr.IP.String())
if cidr.IP.To4() == nil && globalCIDR != "" {
ip = netip.MustParsePrefix(globalCIDR).Addr()
}
for k := range r.cidrSets {
if k.Contains(ip) {
podCIDR, err := r.cidrSets[k].AllocateNext()
if err != nil {
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
}
allocatedCIDRs[idx] = podCIDR
break
}
}
}
// queue the assignment
logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", ipnetToStringList(allocatedCIDRs))
return r.updateCIDRsAllocation(ctx, node.Name, allocatedCIDRs)
}
// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets.
func (r *cloudAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
if node == nil || len(node.Spec.PodCIDRs) == 0 {
return nil
}
logger.V(4).Info("Release CIDR for node", "CIDR", node.Spec.PodCIDRs, "node", klog.KObj(node))
return r.releaseCIDRs(node.Spec.PodCIDRs)
}
func (r *cloudAllocator) releaseCIDRs(addresses []string) error {
for _, cidr := range addresses {
_, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
if err != nil {
return fmt.Errorf("failed to parse CIDR %s: %v", cidr, err)
}
for k := range r.cidrSets {
ip := netip.MustParseAddr(podCIDR.IP.String())
if k.Contains(ip) {
if err := r.cidrSets[k].Release(podCIDR); err != nil {
return fmt.Errorf("error when releasing CIDR %v: %v", cidr, err)
}
break
}
}
}
return nil
}
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used across all cidrs
// so that they won't be assignable.
func (r *cloudAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster
// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
// clusterCIDR's Mask applied (this means that clusterCIDR contains
// serviceCIDR) or vice versa (which means that serviceCIDR contains
// clusterCIDR).
_, err := r.occupyCIDR(serviceCIDR)
if err != nil {
logger.Error(err, "Error filtering out service cidr out cluster cidr", "CIDR", serviceCIDR)
}
}
// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *cloudAllocator) updateCIDRsAllocation(ctx context.Context, nodeName string, allocatedCIDRs []*net.IPNet) error {
logger := klog.FromContext(ctx)
cidrsString := ipnetToStringList(allocatedCIDRs)
node, err := r.nodeLister.Get(nodeName)
if err != nil {
logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", nodeName))
return err
}
// if cidr list matches the proposed.
// then we possibly updated this node
// and just failed to ack the success.
if len(node.Spec.PodCIDRs) == len(allocatedCIDRs) {
match := true
for idx, cidr := range cidrsString {
if node.Spec.PodCIDRs[idx] != cidr {
match = false
break
}
}
if match {
logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "podCIDRs", cidrsString)
return nil
}
}
// node has cidrs, release the reserved
if len(node.Spec.PodCIDRs) != 0 {
logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs)
for _, cidr := range allocatedCIDRs {
addrs := []string{cidr.String()}
if releaseErr := r.releaseCIDRs(addrs); releaseErr != nil {
logger.Error(releaseErr, "Error when releasing CIDR", "CIDR", cidr)
}
}
return nil
}
// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = utilnode.PatchNodeCIDRs(ctx, r.client, types.NodeName(node.Name), cidrsString); err == nil {
logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
return nil
}
}
// failed release back to the pool
logger.Error(err, "Failed to update node PodCIDR after multiple attempts", "node", klog.KObj(node), "podCIDRs", cidrsString)
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leak CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node))
for _, cidr := range allocatedCIDRs {
addrs := []string{cidr.String()}
if releaseErr := r.releaseCIDRs(addrs); releaseErr != nil {
logger.Error(releaseErr, "Error when releasing CIDR", "CIDR", cidr)
}
}
}
return err
}
func (r *cloudAllocator) defineNodeGlobalCIDRs(ctx context.Context, node *v1.Node) (string, error) {
if node == nil {
return "", fmt.Errorf("node is nil")
}
nodeIPs, err := utilnode.GetNodeIPs(node)
if err != nil {
return "", fmt.Errorf("failed to get IPs for node %s: %v", node.Name, err)
}
if len(nodeIPs) == 0 {
return "", fmt.Errorf("node has no addresses")
}
logger := klog.FromContext(ctx)
logger.V(5).Info("Node has addresses", "node", klog.KObj(node), "addresses", nodeIPs)
talos, err := talosclient.New(ctx)
if err != nil {
return "", err
}
var ifaces []network.AddressStatusSpec
for _, ip := range nodeIPs {
ifaces, err = talos.GetNodeIfaces(ctx, ip.String())
if err == nil {
break
}
}
ipv6 := []netip.Addr{}
for _, nodeIP := range nodeIPs {
if nodeIP.Is6() && !nodeIP.IsPrivate() {
ipv6 = append(ipv6, nodeIP)
}
}
_, cidr := talosclient.NodeCIDRDiscovery(ipv6, ifaces)
logger.V(4).Info("Node has IPv6 CIDRs", "node", klog.KObj(node), "CIDRs", cidr)
if len(cidr) > 0 {
r.lock.Lock()
defer r.lock.Unlock()
subnets, err := netutils.ParseCIDRs(cidr)
if err != nil {
return "", err
}
for k := range r.cidrSets {
if k.Addr().Is4() || k.Addr().IsPrivate() {
continue
}
for _, subnet := range subnets {
if ip, ok := netip.AddrFromSlice(subnet.IP); ok && k.Contains(ip) {
return k.String(), nil
}
}
}
for _, subnet := range subnets {
logger.V(4).Info("Add IPv6 to CIDRSet", "node", klog.KObj(node), "CIDR", subnet)
err := r.addCIDRSet(subnet)
if err != nil {
return "", err
}
}
return cidr[0], nil
}
return "", nil
}
func (r *cloudAllocator) addCIDRSet(subnet *net.IPNet) error {
mask, _ := subnet.Mask.Size()
switch {
case mask < 64:
mask = 64
case mask > 120:
return fmt.Errorf("CIDRv6 is too small: %v", subnet.String())
default:
mask += 16
}
cidrSet, err := cidrset.NewCIDRSet(subnet, mask)
if err != nil {
return err
}
k := netip.MustParsePrefix(subnet.String())
if _, ok := r.cidrSets[k]; !ok {
r.cidrSets[netip.MustParsePrefix(subnet.String())] = cidrSet
}
return nil
}

30
pkg/nodeipam/ipam/doc.go Normal file
View File

@@ -0,0 +1,30 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package ipam provides different allocators for assigning IP ranges to nodes.
// We currently support several kinds of IPAM allocators (these are denoted by
// the CIDRAllocatorType):
// - RangeAllocator is an allocator that assigns PodCIDRs to nodes and works
// in conjunction with the RouteController to configure the network to get
// connectivity.
// - CloudAllocator is an allocator that synchronizes PodCIDRs from IP
// ranges assignments from the underlying cloud platform.
// - (Alpha only) IPAMFromCluster is an allocator that has the similar
// functionality as the RangeAllocator but also synchronizes cluster-managed
// ranges into the cloud platform.
// - (Alpha only) IPAMFromCloud is the same as CloudAllocator (synchronizes
// from cloud into the cluster.)
package ipam

View File

@@ -0,0 +1,442 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam // copy from kubernetes/pkg/controller/nodeipam/ipam/range_allocator.go
import (
"context"
"fmt"
"net"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/ipam/cidrset"
controllerutil "github.com/siderolabs/talos-cloud-controller-manager/pkg/utils/controller/node"
nodeutil "github.com/siderolabs/talos-cloud-controller-manager/pkg/utils/node"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
type rangeAllocator struct {
client clientset.Interface
// cluster cidrs as passed in during controller creation
clusterCIDRs []*net.IPNet
// for each entry in clusterCIDRs we maintain a list of what is used and what is not
cidrSets []*cidrset.CidrSet
// nodeLister is able to list/get nodes and is populated by the shared informer passed to controller
nodeLister corelisters.NodeLister
// nodesSynced returns true if the node shared informer has been synced at least once.
nodesSynced cache.InformerSynced
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// queues are where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
queue workqueue.RateLimitingInterface
}
var _ CIDRAllocator = &rangeAllocator{}
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs)
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
// Caller must always pass in a list of existing nodes so the new allocator.
// Caller must ensure that ClusterCIDRs are semantically correct e.g (1 for non DualStack, 2 for DualStack etc..)
// can initialize its CIDR map. NodeList is only nil in testing.
func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, nodeInformer informers.NodeInformer, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) {
logger := klog.FromContext(ctx)
if client == nil {
logger.Error(nil, "kubeClient is nil when starting CIDRRangeAllocator")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
// create a cidrSet for each cidr we operate on
// cidrSet are mapped to clusterCIDR by index
cidrSets := make([]*cidrset.CidrSet, len(allocatorParams.ClusterCIDRs))
for idx, cidr := range allocatorParams.ClusterCIDRs {
cidrSet, err := cidrset.NewCIDRSet(cidr, allocatorParams.NodeCIDRMaskSizes[idx])
if err != nil {
return nil, err
}
cidrSets[idx] = cidrSet
}
ra := &rangeAllocator{
client: client,
clusterCIDRs: allocatorParams.ClusterCIDRs,
cidrSets: cidrSets,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
broadcaster: eventBroadcaster,
recorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cidrallocator_node"),
}
if allocatorParams.ServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR)
} else {
logger.Info("No Service CIDR provided. Skipping filtering out service addresses")
}
if allocatorParams.SecondaryServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR)
} else {
logger.Info("No Secondary Service CIDR provided. Skipping filtering out secondary service addresses")
}
if nodeList != nil {
for _, node := range nodeList.Items {
if len(node.Spec.PodCIDRs) == 0 {
logger.V(4).Info("Node has no CIDR, ignoring", "node", klog.KObj(&node))
continue
}
logger.V(4).Info("Node has CIDR, occupying it in CIDR map", "node", klog.KObj(&node), "podCIDRs", node.Spec.PodCIDRs)
if err := ra.occupyCIDRs(&node); err != nil {
// This will happen if:
// 1. We find garbage in the podCIDRs field. Retrying is useless.
// 2. CIDR out of range: This means a node CIDR has changed.
// This error will keep crashing controller-manager.
return nil, err
}
}
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ra.queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
ra.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// The informer cache no longer has the object, and since Node doesn't have a finalizer,
// we don't see the Update with DeletionTimestamp != 0.
// TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name
// and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures.
if err := ra.ReleaseCIDR(logger, obj.(*v1.Node)); err != nil {
utilruntime.HandleError(fmt.Errorf("error while processing CIDR Release: %w", err))
}
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ra.queue.Add(key)
}
},
})
return ra, nil
}
func (r *rangeAllocator) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
r.broadcaster.StartStructuredLogging(3)
logger := klog.FromContext(ctx)
logger.Info("Sending events to api server")
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
defer r.broadcaster.Shutdown()
defer r.queue.ShutDown()
logger.Info("Starting range CIDR allocator")
defer logger.Info("Shutting down range CIDR allocator")
if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), r.nodesSynced) {
return
}
for i := 0; i < cidrUpdateWorkers; i++ {
go wait.UntilWithContext(ctx, r.runWorker, time.Second)
}
<-ctx.Done()
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// queue.
func (r *rangeAllocator) runWorker(ctx context.Context) {
for r.processNextNodeWorkItem(ctx) {
}
}
// processNextWorkItem will read a single work item off the queue and
// attempt to process it, by calling the syncHandler.
func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool {
obj, shutdown := r.queue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer r.queue.Done.
err := func(logger klog.Logger, obj interface{}) error {
// We call Done here so the workNodeQueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the queue and attempted again after a back-off
// period.
defer r.queue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workNodeQueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workNodeQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workNodeQueue.
if key, ok = obj.(string); !ok {
// As the item in the workNodeQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
r.queue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncNode(ctx, key); err != nil {
// Put the item back on the queue to handle any transient errors.
r.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queue again until another change happens.
r.queue.Forget(obj)
logger.Info("Successfully synced", "key", key)
return nil
}(klog.FromContext(ctx), obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (r *rangeAllocator) syncNode(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := time.Now()
defer func() {
logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime))
}()
node, err := r.nodeLister.Get(key)
if apierrors.IsNotFound(err) {
logger.V(3).Info("node has been deleted", "node", key)
// TODO: obtain the node object information to call ReleaseCIDR from here
// and retry if there is an error.
return nil
}
if err != nil {
return err
}
// Check the DeletionTimestamp to determine if object is under deletion.
if !node.DeletionTimestamp.IsZero() {
logger.V(3).Info("node is being deleted", "node", key)
return r.ReleaseCIDR(logger, node)
}
return r.AllocateOrOccupyCIDR(ctx, node)
}
// marks node.PodCIDRs[...] as used in allocator's tracked cidrSet
func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error {
if len(node.Spec.PodCIDRs) == 0 {
return nil
}
for idx, cidr := range node.Spec.PodCIDRs {
_, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
if err != nil {
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
// If node has a pre allocate cidr that does not exist in our cidrs.
// This will happen if cluster went from dualstack(multi cidrs) to non-dualstack
// then we have now way of locking it
if idx >= len(r.cidrSets) {
return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx)
}
if err := r.cidrSets[idx].Occupy(podCIDR); err != nil {
return fmt.Errorf("failed to mark cidr[%v] at idx [%v] as occupied for node: %v: %v", podCIDR, idx, node.Name, err)
}
}
return nil
}
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (r *rangeAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error {
if node == nil {
return nil
}
if len(node.Spec.PodCIDRs) > 0 {
return r.occupyCIDRs(node)
}
logger := klog.FromContext(ctx)
allocatedCIDRs := make([]*net.IPNet, len(r.cidrSets))
for idx := range r.cidrSets {
podCIDR, err := r.cidrSets[idx].AllocateNext()
if err != nil {
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
}
allocatedCIDRs[idx] = podCIDR
}
//queue the assignment
logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocatedCIDRs)
return r.updateCIDRsAllocation(ctx, node.Name, allocatedCIDRs)
}
// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets
func (r *rangeAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
if node == nil || len(node.Spec.PodCIDRs) == 0 {
return nil
}
for idx, cidr := range node.Spec.PodCIDRs {
_, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
if err != nil {
return fmt.Errorf("failed to parse CIDR %s on Node %v: %v", cidr, node.Name, err)
}
// If node has a pre allocate cidr that does not exist in our cidrs.
// This will happen if cluster went from dualstack(multi cidrs) to non-dualstack
// then we have now way of locking it
if idx >= len(r.cidrSets) {
return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx)
}
logger.V(4).Info("Release CIDR for node", "CIDR", cidr, "node", klog.KObj(node))
if err = r.cidrSets[idx].Release(podCIDR); err != nil {
return fmt.Errorf("error when releasing CIDR %v: %v", cidr, err)
}
}
return nil
}
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used across all cidrs
// so that they won't be assignable.
func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster
// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
// clusterCIDR's Mask applied (this means that clusterCIDR contains
// serviceCIDR) or vice versa (which means that serviceCIDR contains
// clusterCIDR).
for idx, cidr := range r.clusterCIDRs {
// if they don't overlap then ignore the filtering
if !cidr.Contains(serviceCIDR.IP.Mask(cidr.Mask)) && !serviceCIDR.Contains(cidr.IP.Mask(serviceCIDR.Mask)) {
continue
}
// at this point, len(cidrSet) == len(clusterCidr)
if err := r.cidrSets[idx].Occupy(serviceCIDR); err != nil {
logger.Error(err, "Error filtering out service cidr out cluster cidr", "CIDR", cidr, "index", idx, "serviceCIDR", serviceCIDR)
}
}
}
// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRsAllocation(ctx context.Context, nodeName string, allocatedCIDRs []*net.IPNet) error {
var err error
var node *v1.Node
logger := klog.FromContext(ctx)
cidrsString := ipnetToStringList(allocatedCIDRs)
node, err = r.nodeLister.Get(nodeName)
if err != nil {
logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", nodeName))
return err
}
// if cidr list matches the proposed.
// then we possibly updated this node
// and just failed to ack the success.
if len(node.Spec.PodCIDRs) == len(allocatedCIDRs) {
match := true
for idx, cidr := range cidrsString {
if node.Spec.PodCIDRs[idx] != cidr {
match = false
break
}
}
if match {
logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", allocatedCIDRs)
return nil
}
}
// node has cidrs, release the reserved
if len(node.Spec.PodCIDRs) != 0 {
logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs)
for idx, cidr := range allocatedCIDRs {
if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
logger.Error(releaseErr, "Error when releasing CIDR", "index", idx, "CIDR", cidr)
}
}
return nil
}
// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = nodeutil.PatchNodeCIDRs(ctx, r.client, types.NodeName(node.Name), cidrsString); err == nil {
logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
return nil
}
}
// failed release back to the pool
logger.Error(err, "Failed to update node PodCIDR after multiple attempts", "node", klog.KObj(node), "podCIDRs", cidrsString)
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leak CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node))
for idx, cidr := range allocatedCIDRs {
if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
logger.Error(releaseErr, "Error releasing allocated CIDR for node", "node", klog.KObj(node))
}
}
}
return err
}

View File

@@ -0,0 +1,148 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeipam
import (
"context"
"fmt"
"net"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/ipam"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/klog/v2"
)
// Controller is the controller that manages node ipam state.
type Controller struct {
allocatorType ipam.CIDRAllocatorType
cloud cloudprovider.Interface
clusterCIDRs []*net.IPNet
serviceCIDRs []*net.IPNet
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
cidrAllocator ipam.CIDRAllocator
}
// NewNodeIpamController returns a new node IP Address Management controller to
// sync instances from cloudprovider.
// This method returns an error if it is unable to initialize the CIDR bitmap with
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error.
func NewNodeIpamController(
ctx context.Context,
nodeInformer coreinformers.NodeInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
clusterCIDRs []*net.IPNet,
serviceCIDRs []*net.IPNet,
nodeCIDRMaskSizes []int,
allocatorType ipam.CIDRAllocatorType,
) (*Controller, error) {
if kubeClient == nil {
return nil, fmt.Errorf("kubeClient is nil when starting Controller")
}
// Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
if allocatorType != ipam.CloudAllocatorType {
if len(clusterCIDRs) == 0 {
return nil, fmt.Errorf("Controller: Must specify --cluster-cidr if --allocate-node-cidrs is set")
}
for idx, cidr := range clusterCIDRs {
mask := cidr.Mask
if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSizes[idx] {
return nil, fmt.Errorf("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than or equal to --node-cidr-mask-size configured for CIDR family")
}
}
}
ic := &Controller{
cloud: cloud,
kubeClient: kubeClient,
eventBroadcaster: record.NewBroadcaster(record.WithContext(ctx)),
clusterCIDRs: clusterCIDRs,
serviceCIDRs: serviceCIDRs,
allocatorType: allocatorType,
}
// TODO: Abstract this check into a generic controller manager should run method.
var err error
var secondaryServiceCIDR *net.IPNet
if len(clusterCIDRs) > 1 {
secondaryServiceCIDR = serviceCIDRs[1]
}
allocatorParams := ipam.CIDRAllocatorParams{
ClusterCIDRs: clusterCIDRs,
ServiceCIDR: ic.serviceCIDRs[0],
SecondaryServiceCIDR: secondaryServiceCIDR,
NodeCIDRMaskSizes: nodeCIDRMaskSizes,
}
ic.cidrAllocator, err = ipam.New(ctx, kubeClient, cloud, nodeInformer, ic.allocatorType, allocatorParams)
if err != nil {
return nil, err
}
ic.nodeLister = nodeInformer.Lister()
ic.nodeInformerSynced = nodeInformer.Informer().HasSynced
return ic, nil
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
//
//nolint:wsl
func (nc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
nc.eventBroadcaster.StartStructuredLogging(3)
nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
defer nc.eventBroadcaster.Shutdown()
klog.FromContext(ctx).Info("Starting ipam controller")
defer klog.FromContext(ctx).Info("Shutting down ipam controller")
if !cache.WaitForNamedCacheSync("node", ctx.Done(), nc.nodeInformerSynced) {
return
}
go nc.cidrAllocator.Run(ctx)
<-ctx.Done()
}
// RunWithMetrics is a wrapper for Run that also tracks starting and stopping of the nodeipam controller with additional metric.
func (nc *Controller) RunWithMetrics(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
controllerManagerMetrics.ControllerStarted("nodeipam")
defer controllerManagerMetrics.ControllerStopped("nodeipam")
nc.Run(ctx)
}

View File

@@ -1,92 +0,0 @@
package talos
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/resource"
clienttalos "github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
clientkubernetes "k8s.io/client-go/kubernetes"
)
type client struct {
config *cloudConfig
talos *clienttalos.Client
kclient clientkubernetes.Interface
}
func newClient(ctx context.Context, config *cloudConfig) (*client, error) {
clientOpts := []clienttalos.OptionFunc{}
if config == nil {
return nil, fmt.Errorf("talos cloudConfig is nil")
}
clientOpts = append(clientOpts, clienttalos.WithDefaultConfig())
if len(config.Global.Endpoints) > 0 {
clientOpts = append(clientOpts, clienttalos.WithEndpoints(config.Global.Endpoints...))
}
if config.Global.ClusterName != "" {
clientOpts = append(clientOpts, clienttalos.WithCluster(config.Global.ClusterName))
}
talos, err := clienttalos.New(ctx, clientOpts...)
if err != nil {
return nil, err
}
return &client{
config: config,
talos: talos,
}, nil
}
func (c *client) refreshTalosClient(ctx context.Context) error {
if _, err := c.talos.Version(ctx); err != nil {
talos, err := newClient(ctx, c.config)
if err != nil {
return fmt.Errorf("failed to reinitialized talos client: %v", err)
}
c.talos.Close() //nolint:errcheck
c.talos = talos.talos
}
return nil
}
func (c *client) getNodeMetadata(ctx context.Context, nodeIP string) (*runtime.PlatformMetadataSpec, error) {
nodeCtx := clienttalos.WithNode(ctx, nodeIP)
res, err := c.talos.COSI.Get(nodeCtx, resource.NewMetadata(runtime.NamespaceName, runtime.PlatformMetadataType, runtime.PlatformMetadataID, resource.VersionUndefined))
if err != nil {
return nil, err
}
meta := res.Spec().(*runtime.PlatformMetadataSpec).DeepCopy()
return &meta, nil
}
func (c *client) getNodeIfaces(ctx context.Context, nodeIP string) ([]network.AddressStatusSpec, error) {
nodeCtx := clienttalos.WithNode(ctx, nodeIP)
resources, err := c.talos.COSI.List(nodeCtx, resource.NewMetadata(network.NamespaceName, network.AddressStatusType, "", resource.VersionUndefined))
if err != nil {
return nil, fmt.Errorf("error get resources: %w", err)
}
iface := []network.AddressStatusSpec{}
for _, res := range resources.Items {
iface = append(iface, res.(*network.AddressStatus).TypedSpec().DeepCopy())
}
return iface, nil
}

View File

@@ -1,11 +0,0 @@
package talos
import (
"testing"
)
func TestGetNodeMetadata(*testing.T) {
}
func TestGetNodeIfaces(*testing.T) {
}

View File

@@ -1,12 +1,15 @@
// Package talos does something.
// Package talos is an implementation of Interface and InstancesV2 for Talos.
package talos
import (
"context"
"fmt"
"io"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/certificatesigningrequest"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/talosclient"
clientkubernetes "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
)
@@ -25,9 +28,10 @@ const (
ClusterNodeLifeCycleLabel = "node.cloudprovider.kubernetes.io/lifecycle"
)
type cloud struct {
cfg *cloudConfig
client *client
// Cloud is an implementation of cloudprovider interface for Talos CCM.
type Cloud struct {
client *client
instancesV2 cloudprovider.InstancesV2
csrController *certificatesigningrequest.Reconciler
@@ -35,6 +39,12 @@ type cloud struct {
stop func()
}
type client struct {
config *cloudConfig
talos *talosclient.Client
kclient clientkubernetes.Interface
}
func init() {
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
cfg, err := readCloudConfig(config)
@@ -56,17 +66,32 @@ func newCloud(config *cloudConfig) (cloudprovider.Interface, error) {
instancesInterface := newInstances(client)
return &cloud{
cfg: config,
return &Cloud{
client: client,
instancesV2: instancesInterface,
}, nil
}
func newClient(ctx context.Context, config *cloudConfig) (*client, error) {
if config == nil {
return nil, fmt.Errorf("talos cloudConfig is nil")
}
talos, err := talosclient.New(ctx)
if err != nil {
return nil, err
}
return &client{
config: config,
talos: talos,
}, nil
}
// Initialize provides the cloud with a kubernetes client builder and may spawn goroutines
// to perform housekeeping or run custom controllers specific to the cloud provider.
// Any tasks started here should be cleaned up when the stop channel closes.
func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
func (c *Cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
c.client.kclient = clientBuilder.ClientOrDie(ServiceAccountName)
klog.InfoS("clientset initialized")
@@ -75,21 +100,15 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
c.ctx = ctx
c.stop = cancel
if err := c.client.refreshTalosClient(c.ctx); err != nil {
klog.ErrorS(err, "failed to initialized talos client")
return
}
// Broadcast the upstream stop signal to all provider-level goroutines
// watching the provider's context for cancellation.
go func(provider *cloud) {
go func(provider *Cloud) {
<-stop
klog.V(3).InfoS("received cloud provider termination signal")
provider.stop()
}(c)
if c.cfg.Global.ApproveNodeCSR {
if c.client.config.Global.ApproveNodeCSR {
klog.InfoS("Started CSR Node controller")
c.csrController = certificatesigningrequest.NewCsrController(c.client.kclient, csrNodeChecks)
@@ -101,13 +120,13 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
// LoadBalancer returns a balancer interface.
// Also returns true if the interface is supported, false otherwise.
func (c *cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
func (c *Cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
}
// Instances returns an instances interface.
// Also returns true if the interface is supported, false otherwise.
func (c *cloud) Instances() (cloudprovider.Instances, bool) {
func (c *Cloud) Instances() (cloudprovider.Instances, bool) {
return nil, false
}
@@ -115,32 +134,32 @@ func (c *cloud) Instances() (cloudprovider.Instances, bool) {
// Implementing InstancesV2 is behaviorally identical to Instances but is optimized to significantly reduce
// API calls to the cloud provider when registering and syncing nodes.
// Also returns true if the interface is supported, false otherwise.
func (c *cloud) InstancesV2() (cloudprovider.InstancesV2, bool) {
func (c *Cloud) InstancesV2() (cloudprovider.InstancesV2, bool) {
return c.instancesV2, c.instancesV2 != nil
}
// Zones returns a zones interface.
// Also returns true if the interface is supported, false otherwise.
func (c *cloud) Zones() (cloudprovider.Zones, bool) {
func (c *Cloud) Zones() (cloudprovider.Zones, bool) {
return nil, false
}
// Clusters is not implemented.
func (c *cloud) Clusters() (cloudprovider.Clusters, bool) {
func (c *Cloud) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
}
// Routes is not implemented.
func (c *cloud) Routes() (cloudprovider.Routes, bool) {
func (c *Cloud) Routes() (cloudprovider.Routes, bool) {
return nil, false
}
// ProviderName returns the cloud provider ID.
func (c *cloud) ProviderName() string {
func (c *Cloud) ProviderName() string {
return ProviderName
}
// HasClusterID is not implemented.
func (c *cloud) HasClusterID() bool {
func (c *Cloud) HasClusterID() bool {
return true
}

View File

@@ -2,8 +2,6 @@ package talos
import (
"io"
"os"
"strings"
yaml "gopkg.in/yaml.v3"
@@ -24,8 +22,6 @@ type cloudConfigGlobal struct {
ApproveNodeCSR bool `yaml:"approveNodeCSR,omitempty"`
// Talos cluster name.
ClusterName string `yaml:"clusterName,omitempty"`
// Talos API endpoints.
Endpoints []string `yaml:"endpoints,omitempty"`
// Prefer IPv6.
PreferIPv6 bool `yaml:"preferIPv6,omitempty"`
}
@@ -39,11 +35,6 @@ func readCloudConfig(config io.Reader) (cloudConfig, error) {
}
}
endpoints := os.Getenv("TALOS_ENDPOINTS")
if endpoints != "" {
cfg.Global.Endpoints = strings.Split(endpoints, ",")
}
klog.V(4).InfoS("cloudConfig", "cfg", cfg)
return cfg, nil

View File

@@ -11,10 +11,6 @@ func TestReadCloudConfigEmpty(t *testing.T) {
t.Errorf("Should not fail when no config is provided: %s", err)
}
if len(cfg.Global.Endpoints) != 0 {
t.Errorf("incorrect endpoints: %s", cfg.Global.Endpoints)
}
if cfg.Global.PreferIPv6 {
t.Errorf("%v is not default value of preferIPv6", cfg.Global.PreferIPv6)
}
@@ -49,10 +45,6 @@ transformations:
t.Fatalf("Should succeed when a valid config is provided: %s", err)
}
if len(cfg.Global.Endpoints) != 2 {
t.Errorf("incorrect endpoints: %s", cfg.Global.Endpoints)
}
if !cfg.Global.PreferIPv6 {
t.Errorf("incorrect preferIPv6: %v", cfg.Global.PreferIPv6)
}

View File

@@ -8,7 +8,6 @@ import (
func config() cloudConfig {
cfg := cloudConfig{}
cfg.Global.Endpoints = []string{"127.0.0.1"}
return cfg
}

View File

@@ -158,7 +158,12 @@ func setTalosNodeLabels(c *client, meta *runtime.PlatformMetadataSpec) map[strin
labels[ClusterNodeLifeCycleLabel] = "spot"
}
if clusterName := c.talos.GetClusterName(); clusterName != "" {
clusterName := c.config.Global.ClusterName
if clusterName == "" {
clusterName = c.talos.GetClusterName()
}
if clusterName != "" {
labels[ClusterNameNodeLabel] = clusterName
}

View File

@@ -196,7 +196,6 @@ func TestSyncNodeLabels(t *testing.T) {
cfg := cloudConfig{
Global: cloudConfigGlobal{
ClusterName: "test-cluster",
Endpoints: []string{"127.0.0.1"},
},
Transformations: []transformer.NodeTerm{
{

View File

@@ -61,14 +61,10 @@ func (i *instances) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloud
nodeIP string
)
if err = i.c.refreshTalosClient(ctx); err != nil {
return nil, fmt.Errorf("error refreshing client connection: %w", err)
}
mc := metrics.NewMetricContext(runtime.PlatformMetadataID)
for _, ip := range nodeIPs {
meta, err = i.c.getNodeMetadata(ctx, ip)
meta, err = i.c.talos.GetNodeMetadata(ctx, ip)
if mc.ObserveRequest(err) == nil {
nodeIP = ip
@@ -110,7 +106,7 @@ func (i *instances) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloud
mc = metrics.NewMetricContext("addresses")
ifaces, err := i.c.getNodeIfaces(ctx, nodeIP)
ifaces, err := i.c.talos.GetNodeIfaces(ctx, nodeIP)
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("error getting interfaces list from the node %s: %w", node.Name, err)
}

234
pkg/talosclient/client.go Normal file
View File

@@ -0,0 +1,234 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package talosclient impelent talos client.
package talosclient
import (
"context"
"fmt"
"net/netip"
"os"
"slices"
"strings"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/siderolabs/go-retry/retry"
talos "github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/nethelpers"
"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
)
// Client is the interface for the Talos client.
type Client struct {
talos *talos.Client
}
// New is the interface for the Talos client.
func New(ctx context.Context) (*Client, error) {
clientOpts := []talos.OptionFunc{}
clientOpts = append(clientOpts, talos.WithDefaultConfig())
endpoints := os.Getenv("TALOS_ENDPOINTS")
if endpoints != "" {
clientOpts = append(clientOpts, talos.WithEndpoints(strings.Split(endpoints, ",")...))
}
talos, err := talos.New(ctx, clientOpts...)
if err != nil {
return nil, err
}
return &Client{
talos: talos,
}, nil
}
func (c *Client) refreshTalosClient(ctx context.Context) error {
if _, err := c.talos.Version(ctx); err != nil {
talos, err := New(ctx)
if err != nil {
return fmt.Errorf("failed to reinitialized talos client: %v", err)
}
c.talos.Close() //nolint:errcheck
c.talos = talos.talos
}
return nil
}
// GetPodCIDRs returns the pod CIDRs of the cluster.
func (c *Client) GetPodCIDRs(ctx context.Context) ([]string, error) {
res, err := c.talos.COSI.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.ControllerManagerConfigType, k8s.ControllerManagerID, resource.VersionUndefined))
if err != nil {
return nil, err
}
return res.Spec().(*k8s.ControllerManagerConfigSpec).PodCIDRs, nil
}
// GetServiceCIDRs returns the service CIDRs of the cluster.
func (c *Client) GetServiceCIDRs(ctx context.Context) ([]string, error) {
res, err := c.talos.COSI.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.ControllerManagerConfigType, k8s.ControllerManagerID, resource.VersionUndefined))
if err != nil {
return nil, err
}
return res.Spec().(*k8s.ControllerManagerConfigSpec).ServiceCIDRs, nil
}
// GetNodeIfaces returns the network interfaces of the node.
func (c *Client) GetNodeIfaces(ctx context.Context, nodeIP string) ([]network.AddressStatusSpec, error) {
nodeCtx := talos.WithNode(ctx, nodeIP)
var resources resource.List
err := retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
var listErr error
resources, listErr = c.talos.COSI.List(nodeCtx, resource.NewMetadata(network.NamespaceName, network.AddressStatusType, "", resource.VersionUndefined))
if listErr != nil {
err := c.refreshTalosClient(ctx) //nolint:errcheck
if err != nil {
return retry.ExpectedError(err)
}
return listErr
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error get resources: %w", err)
}
iface := []network.AddressStatusSpec{}
for _, res := range resources.Items {
iface = append(iface, res.(*network.AddressStatus).TypedSpec().DeepCopy())
}
return iface, nil
}
// GetNodeMetadata returns the metadata of the node.
func (c *Client) GetNodeMetadata(ctx context.Context, nodeIP string) (*runtime.PlatformMetadataSpec, error) {
nodeCtx := talos.WithNode(ctx, nodeIP)
var resources resource.Resource
err := retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
var getErr error
resources, getErr = c.talos.COSI.Get(nodeCtx, resource.NewMetadata(runtime.NamespaceName, runtime.PlatformMetadataType, runtime.PlatformMetadataID, resource.VersionUndefined))
if getErr != nil {
err := c.refreshTalosClient(ctx) //nolint:errcheck
if err != nil {
return retry.ExpectedError(err)
}
return getErr
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error get resources: %w", err)
}
meta := resources.Spec().(*runtime.PlatformMetadataSpec).DeepCopy()
return &meta, nil
}
// GetClusterName returns cluster name.
func (c *Client) GetClusterName() string {
return c.talos.GetClusterName()
}
// NodeIPDiscovery returns the public IPs of the node excluding the given IPs.
func NodeIPDiscovery(nodeIPs []string, ifaces []network.AddressStatusSpec) (publicIPv4s, publicIPv6s []string) {
for _, iface := range ifaces {
if iface.LinkName == constants.KubeSpanLinkName ||
iface.LinkName == constants.SideroLinkName ||
iface.LinkName == "lo" ||
strings.HasPrefix(iface.LinkName, "dummy") {
continue
}
ip := iface.Address.Addr()
if ip.IsGlobalUnicast() && !ip.IsPrivate() {
if slices.Contains(nodeIPs, ip.String()) {
continue
}
if ip.Is6() {
// Prioritize permanent IPv6 addresses
if nethelpers.AddressFlag(iface.Flags)&nethelpers.AddressPermanent != 0 {
publicIPv6s = append([]string{ip.String()}, publicIPv6s...)
} else {
publicIPv6s = append(publicIPv6s, ip.String())
}
} else {
publicIPv4s = append(publicIPv4s, ip.String())
}
}
}
return publicIPv4s, publicIPv6s
}
// NodeCIDRDiscovery returns the public CIDRs of the node with the given filter IPs.
func NodeCIDRDiscovery(filterIPs []netip.Addr, ifaces []network.AddressStatusSpec) (publicCIDRv4s, publicCIDRv6s []string) {
for _, iface := range ifaces {
if iface.LinkName == constants.KubeSpanLinkName ||
iface.LinkName == constants.SideroLinkName ||
iface.LinkName == "lo" ||
strings.HasPrefix(iface.LinkName, "dummy") {
continue
}
ip := iface.Address.Addr()
if ip.IsGlobalUnicast() && !ip.IsPrivate() {
if len(filterIPs) == 0 || slices.Contains(filterIPs, ip) {
cidr := iface.Address.Masked().String()
if ip.Is6() {
if slices.Contains(publicCIDRv6s, cidr) {
continue
}
// Prioritize permanent IPv6 addresses
if nethelpers.AddressFlag(iface.Flags)&nethelpers.AddressPermanent != 0 {
publicCIDRv6s = append([]string{cidr}, publicCIDRv6s...)
} else {
publicCIDRv6s = append(publicCIDRv6s, cidr)
}
} else {
publicCIDRv4s = append(publicCIDRv4s, cidr)
}
}
}
}
return publicCIDRv4s, publicCIDRv6s
}

View File

@@ -0,0 +1,109 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package node helper to manage node objects.
package node // copy from kubernetes/pkg/controller/util/node/controller_utils.go
import (
"fmt"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
)
// CreateAddNodeHandler creates an add node handler.
// nolint:nlreturn,nilerr,wsl,stylecheck
func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
node := originalObj.(*v1.Node).DeepCopy()
if err := f(node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %w", err))
}
}
}
// CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
// nolint:nlreturn,nilerr,wsl,stylecheck
func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
return func(origOldObj, origNewObj interface{}) {
node := origNewObj.(*v1.Node).DeepCopy()
prevNode := origOldObj.(*v1.Node).DeepCopy()
if err := f(prevNode, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %w", err))
}
}
}
// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
// nolint:nlreturn,nilerr,wsl,stylecheck
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
originalNode, isNode := originalObj.(*v1.Node)
// We can get DeletedFinalStateUnknown instead of *v1.Node here and
// we need to handle that correctly. #34692
if !isNode {
deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Received unexpected object: %v", originalObj)
return
}
originalNode, ok = deletedState.Obj.(*v1.Node)
if !ok {
klog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
node := originalNode.DeepCopy()
if err := f(node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %w", err))
}
}
}
// GetNodeCondition extracts the provided condition from the given status and returns that.
// Returns nil and -1 if the condition is not present, and the index of the located condition.
// nolint:nlreturn,nilerr,wsl
func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
if status == nil {
return -1, nil
}
for i := range status.Conditions {
if status.Conditions[i].Type == conditionType {
return i, &status.Conditions[i]
}
}
return -1, nil
}
// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
// nolint:nlreturn,nilerr,wsl
func RecordNodeStatusChange(logger klog.Logger, recorder record.EventRecorder, node *v1.Node, newStatus string) {
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
}
logger.V(2).Info("Recording status change event message for node", "status", newStatus, "node", node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
}

18
pkg/utils/node/doc.go Normal file
View File

@@ -0,0 +1,18 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package node was copied from staging/src/k8s.io/component-helpers/node/util/
package node

159
pkg/utils/node/node.go Normal file
View File

@@ -0,0 +1,159 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"context"
"encoding/json"
"fmt"
"net/netip"
"slices"
"strings"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
cloudproviderapi "k8s.io/cloud-provider/api"
"k8s.io/klog/v2"
)
type nodeForCIDRMergePatch struct {
Spec nodeSpecForMergePatch `json:"spec"`
}
type nodeSpecForMergePatch struct {
PodCIDR string `json:"podCIDR"`
PodCIDRs []string `json:"podCIDRs,omitempty"`
}
type nodeForConditionPatch struct {
Status nodeStatusForPatch `json:"status"`
}
type nodeStatusForPatch struct {
Conditions []v1.NodeCondition `json:"conditions"`
}
// PatchNodeCIDR patches the specified node's CIDR to the given value.
// nolint:nlreturn,nilerr,wsl
func PatchNodeCIDR(c clientset.Interface, node types.NodeName, cidr string) error {
patch := nodeForCIDRMergePatch{
Spec: nodeSpecForMergePatch{
PodCIDR: cidr,
},
}
patchBytes, err := json.Marshal(&patch)
if err != nil {
return fmt.Errorf("failed to json.Marshal CIDR: %w", err)
}
if _, err := c.CoreV1().Nodes().Patch(context.TODO(), string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch node CIDR: %w", err)
}
return nil
}
// PatchNodeCIDRs patches the specified node.CIDR=cidrs[0] and node.CIDRs to the given value.
// nolint:nlreturn,nilerr,wsl
func PatchNodeCIDRs(ctx context.Context, c clientset.Interface, node types.NodeName, cidrs []string) error {
// set the pod cidrs list and set the old pod cidr field
patch := nodeForCIDRMergePatch{
Spec: nodeSpecForMergePatch{
PodCIDR: cidrs[0],
PodCIDRs: cidrs,
},
}
patchBytes, err := json.Marshal(&patch)
if err != nil {
return fmt.Errorf("failed to json.Marshal CIDR: %v", err)
}
klog.FromContext(ctx).V(4).Info("cidrs patch bytes", "patchBytes", string(patchBytes))
if _, err := c.CoreV1().Nodes().Patch(ctx, string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch node CIDR: %v", err)
}
return nil
}
// SetNodeCondition updates specific node condition with patch operation.
// nolint:nlreturn,nilerr,wsl
func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.NodeCondition) error {
generatePatch := func(condition v1.NodeCondition) ([]byte, error) {
patch := nodeForConditionPatch{
Status: nodeStatusForPatch{
Conditions: []v1.NodeCondition{
condition,
},
},
}
patchBytes, err := json.Marshal(&patch)
if err != nil {
return nil, err
}
return patchBytes, nil
}
condition.LastHeartbeatTime = metav1.NewTime(time.Now())
patch, err := generatePatch(condition)
if err != nil {
return nil
}
_, err = c.CoreV1().Nodes().PatchStatus(context.TODO(), string(node), patch)
return err
}
// GetNodeIPs return the list of node IPs.
func GetNodeIPs(node *v1.Node) ([]netip.Addr, error) {
if node == nil {
return nil, fmt.Errorf("node is nil")
}
providedIPs := []string{}
if providedIP, ok := node.ObjectMeta.Annotations[cloudproviderapi.AnnotationAlphaProvidedIPAddr]; ok {
providedIPs = strings.Split(providedIP, ",")
}
nodeIPs := []netip.Addr{}
for _, v := range node.Status.Addresses {
if v.Type != v1.NodeExternalIP && v.Type != v1.NodeInternalIP {
continue
}
ip, err := netip.ParseAddr(v.Address)
if err != nil {
return nil, fmt.Errorf("failed to parse IP address: %v", err)
}
nodeIPs = append(nodeIPs, ip)
}
for _, nodeIP := range providedIPs {
ip, err := netip.ParseAddr(nodeIP)
if err != nil {
return nodeIPs, fmt.Errorf("failed to parse IP address: %v", err)
}
if !slices.Contains(nodeIPs, ip) {
nodeIPs = append(nodeIPs, ip)
}
}
return nodeIPs, nil
}