refactor: instance metadata

Store all important information in instanceInfo struct.

Signed-off-by: Serge Logvinov <serge.logvinov@sinextra.dev>
This commit is contained in:
Serge Logvinov
2025-09-07 23:18:48 +07:00
committed by Serge
parent 2066aa885e
commit b77455af4d
16 changed files with 400 additions and 284 deletions

View File

@@ -33,9 +33,9 @@ const (
var providerIDRegexp = regexp.MustCompile(`^` + ProviderName + `://([^/]*)/([^/]+)$`)
// GetProviderID returns the magic providerID for kubernetes node.
func GetProviderID(region string, vmr *proxmox.VmRef) string {
return fmt.Sprintf("%s://%s/%d", ProviderName, region, vmr.VmId())
// GetProviderIDFromID returns the magic providerID for kubernetes node.
func GetProviderIDFromID(region string, vmID int) string {
return fmt.Sprintf("%s://%s/%d", ProviderName, region, vmID)
}
// GetProviderIDFromUUID returns the magic providerID for kubernetes node.

View File

@@ -20,13 +20,12 @@ import (
"fmt"
"testing"
"github.com/Telmate/proxmox-api-go/proxmox"
"github.com/stretchr/testify/assert"
provider "github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/provider"
)
func TestGetProviderID(t *testing.T) {
func TestGetProviderIDFromID(t *testing.T) {
t.Parallel()
tests := []struct {
@@ -50,12 +49,10 @@ func TestGetProviderID(t *testing.T) {
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
t.Parallel()
providerID := provider.GetProviderID(testCase.region, proxmox.NewVmRef(testCase.vmID))
providerID := provider.GetProviderIDFromID(testCase.region, testCase.vmID)
assert.Equal(t, testCase.expectedProviderID, providerID)
})
@@ -109,8 +106,6 @@ func TestGetVmID(t *testing.T) {
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
t.Parallel()
@@ -118,7 +113,7 @@ func TestGetVmID(t *testing.T) {
if testCase.expectedError != nil {
assert.NotNil(t, err)
assert.Equal(t, err.Error(), testCase.expectedError.Error())
assert.EqualError(t, err, testCase.expectedError.Error())
} else {
assert.Equal(t, testCase.expectedvmID, VMID)
}
@@ -173,8 +168,6 @@ func TestParseProviderID(t *testing.T) {
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
t.Parallel()
@@ -182,7 +175,7 @@ func TestParseProviderID(t *testing.T) {
if testCase.expectedError != nil {
assert.NotNil(t, err)
assert.Equal(t, err.Error(), testCase.expectedError.Error())
assert.EqualError(t, err, testCase.expectedError.Error())
} else {
assert.NotNil(t, vmr)
assert.Equal(t, testCase.expectedvmID, vmr.VmId())

22
pkg/proxmox/annotation.go Normal file
View File

@@ -0,0 +1,22 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxmox
const (
// AnnotationProxmoxInstanceID is the annotation used to store the Proxmox node virtual machine ID.
AnnotationProxmoxInstanceID = Group + "/instance-id"
)

View File

@@ -36,17 +36,25 @@ const (
// ServiceAccountName is the service account name used in kube-system namespace.
ServiceAccountName = provider.ProviderName + "-cloud-controller-manager"
// Group name
Group = "proxmox.sinextra.dev"
)
type cloud struct {
client *pxpool.ProxmoxPool
kclient clientkubernetes.Interface
client *client
instancesV2 cloudprovider.InstancesV2
ctx context.Context //nolint:containedctx
stop func()
}
type client struct {
pxpool *pxpool.ProxmoxPool
kclient clientkubernetes.Interface
}
func init() {
cloudprovider.RegisterCloudProvider(provider.ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
cfg, err := ccmConfig.ReadCloudConfig(config)
@@ -61,7 +69,7 @@ func init() {
}
func newCloud(config *ccmConfig.ClustersConfig) (cloudprovider.Interface, error) {
client, err := pxpool.NewProxmoxPool(config.Clusters, nil)
client, err := newClient(config.Clusters)
if err != nil {
return nil, err
}
@@ -74,11 +82,22 @@ func newCloud(config *ccmConfig.ClustersConfig) (cloudprovider.Interface, error)
}, nil
}
func newClient(clusters []*pxpool.ProxmoxCluster) (*client, error) {
px, err := pxpool.NewProxmoxPool(clusters, nil)
if err != nil {
return nil, err
}
return &client{
pxpool: px,
}, nil
}
// Initialize provides the cloud with a kubernetes client builder and may spawn goroutines
// to perform housekeeping or run custom controllers specific to the cloud provider.
// Any tasks started here should be cleaned up when the stop channel closes.
func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
c.kclient = clientBuilder.ClientOrDie(ServiceAccountName)
c.client.kclient = clientBuilder.ClientOrDie(ServiceAccountName)
klog.InfoS("clientset initialized")
@@ -86,7 +105,7 @@ func (c *cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
c.ctx = ctx
c.stop = cancel
err := c.client.CheckClusters(ctx)
err := c.client.pxpool.CheckClusters(ctx)
if err != nil {
klog.ErrorS(err, "failed to check proxmox cluster")
}

View File

@@ -24,13 +24,14 @@ import (
ccmConfig "github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/config"
provider "github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/provider"
"github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/proxmoxpool"
)
func TestNewCloudError(t *testing.T) {
cloud, err := newCloud(&ccmConfig.ClustersConfig{})
assert.NotNil(t, err)
assert.Nil(t, cloud)
assert.EqualError(t, err, "no Proxmox clusters found")
assert.Equal(t, proxmoxpool.ErrClustersNotFound, err)
}
func TestCloud(t *testing.T) {

22
pkg/proxmox/errors.go Normal file
View File

@@ -0,0 +1,22 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxmox
import "github.com/pkg/errors"
// ErrKubeletExternalProvider is returned when a kubelet node does not have --cloud-provider=external argument
var ErrKubeletExternalProvider = errors.New("node does not have --cloud-provider=external argument")

View File

@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"net"
"slices"
"sort"
"strings"
@@ -38,19 +39,17 @@ const (
noSortPriority = 0
)
func (i *instances) addresses(ctx context.Context, node *v1.Node, vmRef *proxmox.VmRef, region string) []v1.NodeAddress {
klog.V(4).InfoS("instances.addresses() called", "node", klog.KObj(node))
func (i *instances) addresses(ctx context.Context, node *v1.Node, info *instanceInfo) []v1.NodeAddress {
var (
providedIP string
ok bool
)
if providedIP, ok = node.ObjectMeta.Annotations[cloudproviderapi.AnnotationAlphaProvidedIPAddr]; !ok {
klog.InfoS(fmt.Sprintf(
klog.ErrorS(ErrKubeletExternalProvider, fmt.Sprintf(
"instances.InstanceMetadata() called: annotation %s missing from node. Was kubelet started without --cloud-provider=external or --node-ip?",
cloudproviderapi.AnnotationAlphaProvidedIPAddr),
node, klog.KRef("", node.Name))
"node", klog.KRef("", node.Name))
}
// providedIP is supposed to be a single IP but some kubelets might set a comma separated list of IPs.
@@ -78,19 +77,18 @@ func (i *instances) addresses(ctx context.Context, node *v1.Node, vmRef *proxmox
}
if i.networkOpts.Mode == providerconfig.NetworkModeDefault {
// If the network mode is 'default', we only return the provided IPs.
klog.V(4).InfoS("instances.addresses() returning provided IPs", "node", klog.KObj(node))
return addresses
}
if i.networkOpts.Mode == providerconfig.NetworkModeOnlyQemu || i.networkOpts.Mode == providerconfig.NetworkModeAuto {
newAddresses, err := i.retrieveQemuAddresses(ctx, vmRef, region)
newAddresses, err := i.retrieveQemuAddresses(ctx, info)
if err != nil {
klog.ErrorS(err, "Failed to retrieve host addresses")
} else {
addToNodeAddresses(&addresses, newAddresses...)
}
addToNodeAddresses(&addresses, newAddresses...)
}
// Remove addresses that match the ignored CIDRs
@@ -109,30 +107,34 @@ func (i *instances) addresses(ctx context.Context, node *v1.Node, vmRef *proxmox
sortNodeAddresses(addresses, i.networkOpts.SortOrder)
klog.InfoS("instances.addresses() returning addresses", "addresses", addresses, "node", klog.KObj(node))
klog.V(4).InfoS("instances.addresses() returning addresses", "addresses", addresses, "node", klog.KObj(node))
return addresses
}
// retrieveQemuAddresses retrieves the addresses from the QEMU agent
func (i *instances) retrieveQemuAddresses(ctx context.Context, vmRef *proxmox.VmRef, region string) ([]v1.NodeAddress, error) {
func (i *instances) retrieveQemuAddresses(ctx context.Context, info *instanceInfo) ([]v1.NodeAddress, error) {
var addresses []v1.NodeAddress
klog.V(4).InfoS("retrieveQemuAddresses() retrieving addresses from QEMU agent")
vmRef := proxmox.NewVmRef(info.ID)
vmRef.SetNode(info.Node)
r, err := i.getInstanceNics(ctx, vmRef, region)
nics, err := i.getInstanceNics(ctx, vmRef, info.Region)
if err != nil {
return nil, err
}
for _, nic := range r {
for _, nic := range nics {
if slices.Contains([]string{"lo", "cilium_net", "cilium_host"}, nic.Name) ||
strings.HasPrefix(nic.Name, "dummy") {
continue
}
for _, ip := range nic.IpAddresses {
i.processIP(ctx, &addresses, ip)
}
}
klog.V(4).InfoS("retrieveQemuAddresses() retrieved instance nics", "nics", r)
return addresses, nil
}
@@ -141,40 +143,40 @@ func (i *instances) processIP(_ context.Context, addresses *[]v1.NodeAddress, ip
return
}
var isIPv6 bool
if ip.To4() == nil {
if i.networkOpts.IPv6SupportDisabled {
klog.V(4).InfoS("Skipping IPv6 address due to IPv6 support being disabled", "address", ip.String())
addressType := v1.NodeInternalIP
return
}
if isIPv6 = ip.To4() == nil; isIPv6 && i.networkOpts.IPv6SupportDisabled {
klog.V(4).InfoS("Skipping IPv6 address due to IPv6 support being disabled", "address", ip.String())
return // skip IPv6 addresses if IPv6 support is disabled
if ip.IsPrivate() || ip.IsLinkLocalUnicast() {
return
}
}
ipStr := ip.String()
// Check if the address is an external CIDR
addressType := v1.NodeInternalIP
if len(i.networkOpts.ExternalCIDRs) != 0 && isAddressInCIDRList(i.networkOpts.ExternalCIDRs, ip) {
addressType = v1.NodeExternalIP
}
*addresses = append(*addresses, v1.NodeAddress{
Type: addressType,
Address: ipStr,
Address: ip.String(),
})
}
func (i *instances) getInstanceNics(ctx context.Context, vmRef *proxmox.VmRef, region string) ([]proxmox.AgentNetworkInterface, error) {
px, err := i.c.GetProxmoxCluster(region)
result := make([]proxmox.AgentNetworkInterface, 0)
px, err := i.c.pxpool.GetProxmoxCluster(region)
if err != nil {
return result, err
}
mc := metrics.NewMetricContext("getVmInfo")
nicset, err := px.GetVmAgentNetworkInterfaces(ctx, vmRef)
nicset, err := vmRef.GetAgentInformation(ctx, px, false)
if mc.ObserveRequest(err) != nil {
return result, err
}

View File

@@ -33,7 +33,6 @@ import (
v1 "k8s.io/api/core/v1"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
"k8s.io/klog/v2"
)
@@ -45,15 +44,29 @@ type instanceNetops struct {
IPv6SupportDisabled bool
}
type instanceInfo struct {
ID int
UUID string
Name string
Type string
Node string
Region string
Zone string
}
type instances struct {
c *proxmoxpool.ProxmoxPool
c *client
provider providerconfig.Provider
networkOpts instanceNetops
}
var instanceTypeNameRegexp = regexp.MustCompile(`(^[a-zA-Z0-9_.-]+)$`)
func newInstances(client *proxmoxpool.ProxmoxPool, provider providerconfig.Provider, networkOpts providerconfig.NetworkOpts) *instances {
func newInstances(
client *client,
provider providerconfig.Provider,
networkOpts providerconfig.NetworkOpts,
) *instances {
externalIPCIDRs := ParseCIDRList(networkOpts.ExternalIPCIDRS)
if len(networkOpts.ExternalIPCIDRS) > 0 && len(externalIPCIDRs) == 0 {
klog.Warningf("Failed to parse external CIDRs: %v", networkOpts.ExternalIPCIDRS)
@@ -101,7 +114,7 @@ func (i *instances) InstanceExists(ctx context.Context, node *v1.Node) (bool, er
}
mc := metrics.NewMetricContext("getVmInfo")
if _, _, err := i.getInstance(ctx, node); mc.ObserveRequest(err) != nil {
if _, err := i.getInstanceInfo(ctx, node); mc.ObserveRequest(err) != nil {
if err == cloudprovider.InstanceNotFound {
klog.V(4).InfoS("instances.InstanceExists() instance not found", "node", klog.KObj(node), "providerID", node.Spec.ProviderID)
@@ -138,7 +151,7 @@ func (i *instances) InstanceShutdown(ctx context.Context, node *v1.Node) (bool,
return false, nil
}
px, err := i.c.GetProxmoxCluster(region)
px, err := i.c.pxpool.GetProxmoxCluster(region)
if err != nil {
klog.ErrorS(err, "instances.InstanceShutdown() failed to get Proxmox cluster", "region", region)
@@ -166,9 +179,8 @@ func (i *instances) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloud
klog.V(4).InfoS("instances.InstanceMetadata() called", "node", klog.KRef("", node.Name))
var (
vmRef *proxmox.VmRef
region string
err error
info *instanceInfo
err error
)
providerID := node.Spec.ProviderID
@@ -178,83 +190,115 @@ func (i *instances) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloud
return &cloudprovider.InstanceMetadata{}, nil
}
if providerID == "" && HasTaintWithEffect(node, cloudproviderapi.TaintExternalCloudProvider, "") {
uuid := node.Status.NodeInfo.SystemUUID
// if providerID == "" && HasTaintWithEffect(node, cloudproviderapi.TaintExternalCloudProvider, "") {
// }
klog.V(4).InfoS("instances.InstanceMetadata() empty providerID, trying find node", "node", klog.KObj(node), "uuid", uuid)
mc := metrics.NewMetricContext("getInstanceInfo")
mc := metrics.NewMetricContext("findVmByName")
info, err = i.getInstanceInfo(ctx, node)
if mc.ObserveRequest(err) != nil {
if err == proxmoxpool.ErrInstanceNotFound {
klog.V(4).InfoS("instances.InstanceMetadata() instance not found", "node", klog.KObj(node), "providerID", providerID)
vmRef, region, err = i.c.FindVMByNode(ctx, node)
if mc.ObserveRequest(err) != nil {
mc := metrics.NewMetricContext("findVmByUUID")
vmRef, region, err = i.c.FindVMByUUID(ctx, uuid)
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("instances.InstanceMetadata() - failed to find instance by name/uuid %s: %v, skipped", node.Name, err)
}
return &cloudprovider.InstanceMetadata{}, nil
}
return nil, err
}
if providerID == "" {
if i.provider == providerconfig.ProviderCapmox {
providerID = provider.GetProviderIDFromUUID(uuid)
providerID = provider.GetProviderIDFromUUID(info.UUID)
} else {
providerID = provider.GetProviderID(region, vmRef)
providerID = provider.GetProviderIDFromID(info.Region, info.ID)
}
annotations := map[string]string{
AnnotationProxmoxInstanceID: fmt.Sprintf("%d", info.ID),
}
if err := syncNodeAnnotations(ctx, i.c.kclient, node, annotations); err != nil {
klog.ErrorS(err, "error updating annotations for the node", "node", klog.KRef("", node.Name))
}
}
metadata := &cloudprovider.InstanceMetadata{
ProviderID: providerID,
NodeAddresses: i.addresses(ctx, node, info),
InstanceType: info.Type,
Zone: info.Zone,
Region: info.Region,
}
klog.V(5).InfoS("instances.InstanceMetadata()", "info", info, "metadata", metadata)
return metadata, nil
}
func (i *instances) getInstanceInfo(ctx context.Context, node *v1.Node) (*instanceInfo, error) {
klog.V(4).InfoS("instances.getInstanceInfo() called", "node", klog.KRef("", node.Name), "provider", i.provider)
var (
vmRef *proxmox.VmRef
region string
err error
)
providerID := node.Spec.ProviderID
if providerID == "" && node.Annotations[AnnotationProxmoxInstanceID] != "" {
region = node.Annotations[v1.LabelTopologyRegion]
vmID, err := strconv.Atoi(node.Annotations[AnnotationProxmoxInstanceID])
if err != nil {
return nil, fmt.Errorf("instances.getInstanceInfo() parse annotation error: %v", err)
}
if _, err := i.c.pxpool.GetProxmoxCluster(region); err == nil {
providerID = provider.GetProviderIDFromID(region, vmID)
klog.V(4).InfoS("instances.getInstanceInfo() set providerID", "node", klog.KObj(node), "providerID", providerID)
}
}
if providerID == "" {
klog.V(4).InfoS("instances.InstanceMetadata() empty providerID, omitting unmanaged node", "node", klog.KObj(node))
klog.V(4).InfoS("instances.getInstanceInfo() empty providerID, trying find node", "node", klog.KObj(node))
return &cloudprovider.InstanceMetadata{}, nil
mc := metrics.NewMetricContext("findVmByName")
vmRef, region, err = i.c.pxpool.FindVMByNode(ctx, node)
if mc.ObserveRequest(err) != nil {
mc := metrics.NewMetricContext("findVmByUUID")
vmRef, region, err = i.c.pxpool.FindVMByUUID(ctx, node.Status.NodeInfo.SystemUUID)
if mc.ObserveRequest(err) != nil {
return nil, err
}
}
if vmRef == nil {
return nil, cloudprovider.InstanceNotFound
}
providerID = provider.GetProviderIDFromID(region, vmRef.VmId())
}
if vmRef == nil {
mc := metrics.NewMetricContext("getVmInfo")
vmRef, region, err = i.getInstance(ctx, node)
if mc.ObserveRequest(err) != nil {
return nil, err
}
}
addresses := i.addresses(ctx, node, vmRef, region)
instanceType, err := i.getInstanceType(ctx, vmRef, region)
if err != nil {
instanceType = vmRef.GetVmType()
}
return &cloudprovider.InstanceMetadata{
ProviderID: providerID,
NodeAddresses: addresses,
InstanceType: instanceType,
Zone: vmRef.Node().String(),
Region: region,
}, nil
}
func (i *instances) getInstance(ctx context.Context, node *v1.Node) (*proxmox.VmRef, string, error) {
klog.V(4).InfoS("instances.getInstance() called", "node", klog.KRef("", node.Name), "provider", i.provider)
if i.provider == providerconfig.ProviderCapmox {
uuid := node.Status.NodeInfo.SystemUUID
vmRef, region, err := i.c.FindVMByUUID(ctx, uuid)
vmRef, region, err = provider.ParseProviderID(providerID)
if err != nil {
return nil, "", fmt.Errorf("instances.getInstance() error: %v", err)
if i.provider == providerconfig.ProviderDefault {
return nil, fmt.Errorf("instances.getInstanceInfo() error: %v", err)
}
vmRef, region, err = i.c.pxpool.FindVMByUUID(ctx, node.Status.NodeInfo.SystemUUID)
if err != nil {
return nil, fmt.Errorf("instances.getInstanceInfo() error: %v", err)
}
}
return vmRef, region, nil
}
vmRef, region, err := provider.ParseProviderID(node.Spec.ProviderID)
px, err := i.c.pxpool.GetProxmoxCluster(region)
if err != nil {
return nil, "", fmt.Errorf("instances.getInstance() error: %v", err)
}
px, err := i.c.GetProxmoxCluster(region)
if err != nil {
return nil, "", fmt.Errorf("instances.getInstance() error: %v", err)
return nil, fmt.Errorf("instances.getInstanceInfo() error: %v", err)
}
mc := metrics.NewMetricContext("getVmInfo")
@@ -262,51 +306,46 @@ func (i *instances) getInstance(ctx context.Context, node *v1.Node) (*proxmox.Vm
vmConfig, err := px.GetVmConfig(ctx, vmRef)
if mc.ObserveRequest(err) != nil {
if strings.Contains(err.Error(), "not found") {
return nil, "", cloudprovider.InstanceNotFound
return nil, cloudprovider.InstanceNotFound
}
return nil, "", err
return nil, err
}
if i.c.GetVMName(vmConfig) != node.Name || i.c.GetVMUUID(vmConfig) != node.Status.NodeInfo.SystemUUID {
klog.Errorf("instances.getInstance() vm.name(%s) != node.name(%s) with uuid=%s", i.c.GetVMName(vmConfig), node.Name, node.Status.NodeInfo.SystemUUID)
return nil, "", cloudprovider.InstanceNotFound
info := &instanceInfo{
ID: vmRef.VmId(),
UUID: i.c.pxpool.GetVMUUID(vmConfig),
Name: i.c.pxpool.GetVMName(vmConfig),
Node: vmRef.Node().String(),
Region: region,
Zone: vmRef.Node().String(),
}
klog.V(5).Infof("instances.getInstance() vmConfig %+v", vmConfig)
if info.UUID != node.Status.NodeInfo.SystemUUID {
klog.Errorf("instances.getInstanceInfo() node %s does not match SystemUUID=%s", info.Name, node.Status.NodeInfo.SystemUUID)
return vmRef, region, nil
}
func (i *instances) getInstanceType(ctx context.Context, vmRef *proxmox.VmRef, region string) (string, error) {
px, err := i.c.GetProxmoxCluster(region)
if err != nil {
return "", err
}
mc := metrics.NewMetricContext("getVmInfo")
vmConfig, err := px.GetVmConfig(ctx, vmRef)
if mc.ObserveRequest(err) != nil {
return "", err
}
sku := i.c.GetVMSKU(vmConfig)
if sku != "" && instanceTypeNameRegexp.MatchString(sku) {
return sku, nil
}
if vmConfig["cores"] == nil || vmConfig["memory"] == nil {
return "", fmt.Errorf("instances.getInstanceType() failed to get instance type")
}
memory, err := strconv.Atoi(vmConfig["memory"].(string)) //nolint:errcheck
if err != nil {
return "", err
}
return fmt.Sprintf("%.0fVCPU-%.0fGB",
vmConfig["cores"].(float64), //nolint:errcheck
float64(memory)/1024), nil
return nil, cloudprovider.InstanceNotFound
}
if !strings.HasPrefix(info.Name, node.Name) {
klog.Errorf("instances.getInstanceInfo() node %s does not match VM name=%s", node.Name, info.Name)
return nil, cloudprovider.InstanceNotFound
}
info.Type = i.c.pxpool.GetVMSKU(vmConfig)
if !instanceTypeNameRegexp.MatchString(info.Type) {
if vmConfig["cores"] != nil && vmConfig["memory"] != nil {
memory, err := strconv.Atoi(vmConfig["memory"].(string))
if err != nil {
return info, err
}
info.Type = fmt.Sprintf("%.0fVCPU-%.0fGB",
vmConfig["cores"].(float64), //nolint:errcheck
float64(memory)/1024)
}
}
return info, nil
}

View File

@@ -17,23 +17,20 @@ limitations under the License.
package proxmox
import (
"context"
"fmt"
"net/http"
"strings"
"testing"
"github.com/Telmate/proxmox-api-go/proxmox"
"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
providerconfig "github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/config"
"github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/provider"
"github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/proxmoxpool"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
)
@@ -62,7 +59,7 @@ clusters:
ts.T().Fatalf("failed to read config: %v", err)
}
httpmock.RegisterResponder("GET", "https://127.0.0.1:8006/api2/json/cluster/resources",
httpmock.RegisterResponderWithQuery("GET", "https://127.0.0.1:8006/api2/json/cluster/resources", "type=vm",
func(_ *http.Request) (*http.Response, error) {
return httpmock.NewJsonResponse(200, map[string]interface{}{
"data": []interface{}{
@@ -87,7 +84,7 @@ clusters:
},
)
httpmock.RegisterResponder("GET", "https://127.0.0.2:8006/api2/json/cluster/resources",
httpmock.RegisterResponderWithQuery("GET", "https://127.0.0.2:8006/api2/json/cluster/resources", "type=vm",
func(_ *http.Request) (*http.Response, error) {
return httpmock.NewJsonResponse(200, map[string]interface{}{
"data": []interface{}{
@@ -172,12 +169,17 @@ clusters:
},
)
cluster, err := proxmoxpool.NewProxmoxPool(cfg.Clusters, &http.Client{})
px, err := proxmoxpool.NewProxmoxPool(cfg.Clusters, &http.Client{})
if err != nil {
ts.T().Fatalf("failed to create cluster client: %v", err)
}
ts.i = newInstances(cluster, providerconfig.ProviderDefault, providerconfig.NetworkOpts{})
client := &client{
pxpool: px,
kclient: fake.NewSimpleClientset(),
}
ts.i = newInstances(client, providerconfig.ProviderDefault, providerconfig.NetworkOpts{})
}
func (ts *ccmTestSuite) TearDownTest() {
@@ -221,7 +223,7 @@ func (ts *ccmTestSuite) TestInstanceExists() {
},
},
expected: false,
expectedError: "instances.getInstance() error: proxmox cluster cluster-3 not found",
expectedError: "instances.getInstanceInfo() error: region not found",
},
{
msg: "NodeNotExists",
@@ -306,10 +308,8 @@ func (ts *ccmTestSuite) TestInstanceExists() {
}
for _, testCase := range tests {
testCase := testCase
ts.Run(fmt.Sprint(testCase.msg), func() {
exists, err := ts.i.InstanceExists(context.Background(), testCase.node)
exists, err := ts.i.InstanceExists(ts.T().Context(), testCase.node)
if testCase.expectedError != "" {
ts.Require().Error(err)
@@ -454,10 +454,8 @@ func (ts *ccmTestSuite) TestInstanceShutdown() {
}
for _, testCase := range tests {
testCase := testCase
ts.Run(fmt.Sprint(testCase.msg), func() {
exists, err := ts.i.InstanceShutdown(context.Background(), testCase.node)
exists, err := ts.i.InstanceShutdown(ts.T().Context(), testCase.node)
if testCase.expectedError != "" {
ts.Require().Error(err)
@@ -519,7 +517,7 @@ func (ts *ccmTestSuite) TestInstanceMetadata() {
},
},
expected: &cloudprovider.InstanceMetadata{},
expectedError: "instances.getInstance() error: proxmox cluster cluster-3 not found",
expectedError: "instances.getInstanceInfo() error: region not found",
},
{
msg: "NodeNotExists",
@@ -667,10 +665,8 @@ func (ts *ccmTestSuite) TestInstanceMetadata() {
}
for _, testCase := range tests {
testCase := testCase
ts.Run(fmt.Sprint(testCase.msg), func() {
meta, err := ts.i.InstanceMetadata(context.Background(), testCase.node)
meta, err := ts.i.InstanceMetadata(ts.T().Context(), testCase.node)
if testCase.expectedError != "" {
ts.Require().Error(err)
@@ -682,100 +678,3 @@ func (ts *ccmTestSuite) TestInstanceMetadata() {
})
}
}
func TestGetProviderID(t *testing.T) {
t.Parallel()
tests := []struct {
msg string
region string
vmr *proxmox.VmRef
expected string
}{
{
msg: "empty region",
region: "",
vmr: proxmox.NewVmRef(100),
expected: "proxmox:///100",
},
{
msg: "region",
region: "cluster1",
vmr: proxmox.NewVmRef(100),
expected: "proxmox://cluster1/100",
},
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
t.Parallel()
expected := provider.GetProviderID(testCase.region, testCase.vmr)
assert.Equal(t, expected, testCase.expected)
})
}
}
func TestParseProviderID(t *testing.T) {
t.Parallel()
tests := []struct {
msg string
magic string
expectedCluster string
expectedVmr *proxmox.VmRef
expectedError error
}{
{
msg: "Empty magic string",
magic: "",
expectedError: fmt.Errorf("foreign providerID or empty \"\""),
},
{
msg: "Wrong provider",
magic: "provider://region/100",
expectedError: fmt.Errorf("foreign providerID or empty \"provider://region/100\""),
},
{
msg: "Empty region",
magic: "proxmox:///100",
expectedCluster: "",
expectedVmr: proxmox.NewVmRef(100),
},
{
msg: "Empty region",
magic: "proxmox://100",
expectedError: fmt.Errorf("providerID \"proxmox://100\" didn't match expected format \"proxmox://region/InstanceID\""),
},
{
msg: "Cluster and InstanceID",
magic: "proxmox://cluster/100",
expectedCluster: "cluster",
expectedVmr: proxmox.NewVmRef(100),
},
{
msg: "Cluster and wrong InstanceID",
magic: "proxmox://cluster/name",
expectedError: fmt.Errorf("InstanceID have to be a number, but got \"name\""),
},
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
t.Parallel()
vmr, cluster, err := provider.ParseProviderID(testCase.magic)
if testCase.expectedError != nil {
assert.Equal(t, testCase.expectedError, err)
} else {
assert.Equal(t, testCase.expectedVmr, vmr)
assert.Equal(t, testCase.expectedCluster, cluster)
}
})
}
}

View File

@@ -17,12 +17,18 @@ limitations under the License.
package proxmox
import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
"unicode"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientkubernetes "k8s.io/client-go/kubernetes"
)
// ErrorCIDRConflict is the error message formatting string for CIDR conflicts
@@ -102,7 +108,7 @@ func ParseCIDRList(cidrList string) []*net.IPNet {
// HasTaintWithEffect checks if a node has a specific taint with the given key and effect.
// An empty effect string will match any effect for the specified key
func HasTaintWithEffect(node *v1.Node, key, effect string) bool {
func HasTaintWithEffect(node *corev1.Node, key, effect string) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == key {
if effect != "" {
@@ -119,3 +125,46 @@ func HasTaintWithEffect(node *v1.Node, key, effect string) bool {
func checkIPIntersects(n1, n2 *net.IPNet) bool {
return n2.Contains(n1.IP) || n1.Contains(n2.IP)
}
func syncNodeAnnotations(ctx context.Context, kclient clientkubernetes.Interface, node *corev1.Node, nodeAnnotations map[string]string) error {
nodeAnnotationsOrig := node.ObjectMeta.Annotations
annotationsToUpdate := map[string]string{}
for k, v := range nodeAnnotations {
if r, ok := nodeAnnotationsOrig[k]; !ok || r != v {
annotationsToUpdate[k] = v
}
}
if len(annotationsToUpdate) > 0 {
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal the existing node %#v: %w", node, err)
}
newNode := node.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = make(map[string]string)
}
for k, v := range annotationsToUpdate {
newNode.Annotations[k] = v
}
newData, err := json.Marshal(newNode)
if err != nil {
return fmt.Errorf("failed to marshal the new node %#v: %w", newNode, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &corev1.Node{})
if err != nil {
return fmt.Errorf("failed to create a two-way merge patch: %v", err)
}
if _, err := kclient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch the node: %v", err)
}
}
return nil
}

View File

@@ -67,8 +67,6 @@ func TestParseCIDRRuleset(t *testing.T) {
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
t.Parallel()

17
pkg/proxmoxpool/doc.go Normal file
View File

@@ -0,0 +1,17 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxmoxpool

30
pkg/proxmoxpool/errors.go Normal file
View File

@@ -0,0 +1,30 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxmoxpool
import "github.com/pkg/errors"
var (
// ErrClustersNotFound is returned when a cluster is not found in the Proxmox
ErrClustersNotFound = errors.New("clusters not found")
// ErrHAGroupNotFound is returned when a ha-group is not found in the Proxmox
ErrHAGroupNotFound = errors.New("ha-group not found")
// ErrRegionNotFound is returned when a region is not found in the Proxmox
ErrRegionNotFound = errors.New("region not found")
// ErrInstanceNotFound is returned when an instance is not found in the Proxmox
ErrInstanceNotFound = errors.New("instance not found")
)

View File

@@ -102,7 +102,7 @@ func NewProxmoxPool(config []*ProxmoxCluster, hClient *http.Client) (*ProxmoxPoo
}, nil
}
return nil, fmt.Errorf("no Proxmox clusters found")
return nil, ErrClustersNotFound
}
// CheckClusters checks if the Proxmox connection is working.
@@ -138,7 +138,34 @@ func (c *ProxmoxPool) GetProxmoxCluster(region string) (*proxmox.Client, error)
return c.clients[region], nil
}
return nil, fmt.Errorf("proxmox cluster %s not found", region)
return nil, ErrRegionNotFound
}
// GetNodeGroup returns a Proxmox node ha-group in a given region.
func (c *ProxmoxPool) GetNodeGroup(ctx context.Context, region string, node string) (string, error) {
px, err := c.GetProxmoxCluster(region)
if err != nil {
return "", err
}
haGroups, err := px.GetHAGroupList(ctx)
if err != nil {
return "", fmt.Errorf("error get ha-groups %v", err)
}
for _, g := range haGroups {
if g.Type != "group" {
continue
}
for _, n := range g.Nodes {
if node == strings.Split(n, ":")[0] {
return g.Group, nil
}
}
}
return "", ErrHAGroupNotFound
}
// FindVMByNode find a VM by kubernetes node resource in all Proxmox clusters.
@@ -165,7 +192,7 @@ func (c *ProxmoxPool) FindVMByNode(ctx context.Context, node *v1.Node) (*proxmox
}
}
return nil, "", fmt.Errorf("vm '%s' not found", node.Name)
return nil, "", ErrInstanceNotFound
}
// FindVMByName find a VM by name in all Proxmox clusters.
@@ -183,7 +210,7 @@ func (c *ProxmoxPool) FindVMByName(ctx context.Context, name string) (*proxmox.V
return vmr, region, nil
}
return nil, "", fmt.Errorf("vm '%s' not found", name)
return nil, "", ErrInstanceNotFound
}
// FindVMByUUID find a VM by uuid in all Proxmox clusters.
@@ -221,7 +248,7 @@ func (c *ProxmoxPool) FindVMByUUID(ctx context.Context, uuid string) (*proxmox.V
}
}
return nil, "", fmt.Errorf("vm with uuid '%s' not found", uuid)
return nil, "", ErrInstanceNotFound
}
// GetVMName returns the VM name.

View File

@@ -110,7 +110,7 @@ func TestCheckClusters(t *testing.T) {
pxapi, err := pClient.GetProxmoxCluster("test")
assert.NotNil(t, err)
assert.Nil(t, pxapi)
assert.Equal(t, "proxmox cluster test not found", err.Error())
assert.Equal(t, pxpool.ErrRegionNotFound, err)
pxapi, err = pClient.GetProxmoxCluster("cluster-1")
assert.Nil(t, err)
@@ -166,7 +166,7 @@ func TestFindVMByNameNonExist(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, "", cluster)
assert.Nil(t, vmr)
assert.Contains(t, err.Error(), "vm 'non-existing-vm' not found")
assert.Equal(t, pxpool.ErrInstanceNotFound, err)
}
func TestFindVMByNameExist(t *testing.T) {
@@ -235,8 +235,6 @@ func TestFindVMByNameExist(t *testing.T) {
}
for _, testCase := range tests {
testCase := testCase
t.Run(fmt.Sprint(testCase.msg), func(t *testing.T) {
vmr, cluster, err := pClient.FindVMByName(t.Context(), testCase.vmName)
@@ -249,7 +247,7 @@ func TestFindVMByNameExist(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, "", cluster)
assert.Nil(t, vmr)
assert.Contains(t, err.Error(), "vm 'non-existing-vm' not found")
assert.Equal(t, pxpool.ErrInstanceNotFound, err)
}
})
}