[kubernetes] Fix race-condition between two KubeVirt CCMs working with the services with identifical names (#722)

This PR fixes race condition when you have two clusters with two
services with simillar names, two eps-controllers might continiusly
conflict between each other.

Upstream issue
https://github.com/kubevirt/cloud-provider-kubevirt/pull/341

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced multi-cluster support with a new configuration option to
ensure that services and deployments are correctly scoped across
different environments.

- **Bug Fixes**
- Refined endpoint management by improving service port handling and
eliminating duplicate entries, resulting in more consistent and reliable
service routing.

- **Chores**
- Updated the image versioning strategy to a dynamic tag, streamlining
deployments and simplifying future upgrades.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Andrei Kvapil <kvapss@gmail.com>
This commit is contained in:
Andrei Kvapil
2025-04-01 18:47:01 +02:00
committed by GitHub
parent 116196b4d4
commit 0403f30cd6
6 changed files with 1309 additions and 134 deletions

View File

@@ -1 +1 @@
ghcr.io/cozystack/cozystack/kubevirt-cloud-provider:0.15.2@sha256:5e054eae6274963b6e84f87bf3330c94325103c6407b08bfb1189da721333b5c
ghcr.io/cozystack/cozystack/kubevirt-cloud-provider:latest@sha256:a2739ab1a400664207c96ca60d88004ce244dcbb81204a32a517ff59d0061459

View File

@@ -3,12 +3,11 @@ FROM --platform=linux/amd64 golang:1.20.6 AS builder
RUN git clone https://github.com/kubevirt/cloud-provider-kubevirt /go/src/kubevirt.io/cloud-provider-kubevirt \
&& cd /go/src/kubevirt.io/cloud-provider-kubevirt \
&& git checkout da9e0cf
&& git checkout 443a1fe
WORKDIR /go/src/kubevirt.io/cloud-provider-kubevirt
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/335
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/336
ADD patches /patches
RUN git apply /patches/*.diff
RUN go get 'k8s.io/endpointslice/util@v0.28' 'k8s.io/apiserver@v0.28'

View File

@@ -1,8 +1,8 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index a3c1aa33..95c31438 100644
index 6f6e3d32..53388eb8 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -412,11 +412,11 @@ func (c *Controller) reconcileByAddressType(service *v1.Service, tenantSlices []
@@ -474,11 +474,11 @@ func (c *Controller) reconcileByAddressType(service *v1.Service, tenantSlices []
// Create the desired port configuration
var desiredPorts []discovery.EndpointPort
@@ -18,3 +18,102 @@ index a3c1aa33..95c31438 100644
})
}
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go
index 1fb86e25..5326faa4 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller_test.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go
@@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/apimachinery/pkg/util/sets"
dfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/testing"
@@ -643,6 +644,86 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
})
+ g.It("Should correctly handle multiple unique ports in EndpointSlice", func() {
+ // Create a VMI in the infra cluster
+ createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
+
+ // Create an EndpointSlice in the tenant cluster
+ createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
+ *createPort("http", 80, v1.ProtocolTCP),
+ []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
+
+ // Define several unique ports for the Service
+ servicePorts := []v1.ServicePort{
+ {
+ Name: "client",
+ Protocol: v1.ProtocolTCP,
+ Port: 10001,
+ TargetPort: intstr.FromInt(30396),
+ NodePort: 30396,
+ AppProtocol: nil,
+ },
+ {
+ Name: "dashboard",
+ Protocol: v1.ProtocolTCP,
+ Port: 8265,
+ TargetPort: intstr.FromInt(31003),
+ NodePort: 31003,
+ AppProtocol: nil,
+ },
+ {
+ Name: "metrics",
+ Protocol: v1.ProtocolTCP,
+ Port: 8080,
+ TargetPort: intstr.FromInt(30452),
+ NodePort: 30452,
+ AppProtocol: nil,
+ },
+ }
+
+ // Create a Service with the first port
+ createAndAssertInfraServiceLB("infra-multiport-service", "tenant-service-name", "test-cluster",
+ servicePorts[0],
+ v1.ServiceExternalTrafficPolicyLocal)
+
+ // Update the Service by adding the remaining ports
+ svc, err := testVals.infraClient.CoreV1().Services(infraNamespace).Get(context.TODO(), "infra-multiport-service", metav1.GetOptions{})
+ Expect(err).To(BeNil())
+
+ svc.Spec.Ports = servicePorts
+
+ _, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
+ Expect(err).To(BeNil())
+
+ var epsListMultiPort *discoveryv1.EndpointSliceList
+
+ // Verify that the EndpointSlice is created with correct unique ports
+ Eventually(func() (bool, error) {
+ epsListMultiPort, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
+ if len(epsListMultiPort.Items) != 1 {
+ return false, err
+ }
+
+ createdSlice := epsListMultiPort.Items[0]
+ expectedPortNames := []string{"client", "dashboard", "metrics"}
+ foundPortNames := []string{}
+
+ for _, port := range createdSlice.Ports {
+ if port.Name != nil {
+ foundPortNames = append(foundPortNames, *port.Name)
+ }
+ }
+
+ // Verify that all expected ports are present and without duplicates
+ if len(foundPortNames) != len(expectedPortNames) {
+ return false, err
+ }
+
+ portSet := sets.NewString(foundPortNames...)
+ expectedPortSet := sets.NewString(expectedPortNames...)
+ return portSet.Equal(expectedPortSet), err
+ }).Should(BeTrue(), "EndpointSlice should contain all unique ports from the Service without duplicates")
+
g.It("Should reconcile after infra EndpointSlice deletion and restore it", func() {
// Create a VMI in the infra cluster
// This ensures that when tenant EndpointSlice is created, it can be reconciled properly

View File

@@ -1,129 +0,0 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index a3c1aa33..6f6e3d32 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request {
}
func (c *Controller) Init() error {
-
- // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function.
- // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly.
+ // Existing Service event handlers...
_, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
- // cast obj to Service
svc := obj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(AddReq, obj, nil))
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
- // cast obj to Service
newSvc := newObj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name)
c.queue.Add(newRequest(UpdateReq, newObj, oldObj))
}
},
DeleteFunc: func(obj interface{}) {
- // cast obj to Service
svc := obj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(DeleteReq, obj, nil))
@@ -144,7 +136,7 @@ func (c *Controller) Init() error {
return err
}
- // Monitor endpoint slices that we are interested in based on known services in the infra cluster
+ // Existing EndpointSlice event handlers in tenant cluster...
_, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eps := obj.(*discovery.EndpointSlice)
@@ -194,10 +186,80 @@ func (c *Controller) Init() error {
return err
}
- //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes
+ // Add an informer for EndpointSlices in the infra cluster
+ _, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ eps := obj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(AddReq, svc, nil))
+ }
+ }
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ eps := newObj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(UpdateReq, svc, nil))
+ }
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ eps := obj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(DeleteReq, svc, nil))
+ }
+ }
+ },
+ })
+ if err != nil {
+ return err
+ }
+
return nil
}
+// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice.
+// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond
+// to the Service name. If not found or if the Service doesn't exist, it returns nil.
+func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) {
+ svcName := eps.Labels[discovery.LabelServiceName]
+ if svcName == "" {
+ // No service name label found, can't determine infra service.
+ return nil, nil
+ }
+
+ svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{})
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // Service doesn't exist
+ return nil, nil
+ }
+ return nil, err
+ }
+
+ return svc, nil
+}
+
// Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster.
func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
defer utilruntime.HandleCrash()

View File

@@ -0,0 +1,437 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index 53388eb8..1ca419b9 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -382,74 +382,115 @@ func (c *Controller) reconcile(ctx context.Context, r *Request) error {
return errors.New("could not cast object to service")
}
+ /*
+ Skip if the given Service is not labeled with the keys that indicate
+ it was created/managed by this controller (i.e., not a LoadBalancer
+ that we handle).
+ */
if service.Labels[kubevirt.TenantServiceNameLabelKey] == "" ||
service.Labels[kubevirt.TenantServiceNamespaceLabelKey] == "" ||
service.Labels[kubevirt.TenantClusterNameLabelKey] == "" {
klog.Infof("This LoadBalancer Service: %s is not managed by the %s. Skipping.", service.Name, ControllerName)
return nil
}
+
klog.Infof("Reconciling: %v", service.Name)
+ /*
+ 1) Check if Service in the infra cluster is actually present.
+ If it's not found, mark it as 'deleted' so that we don't create new slices.
+ */
serviceDeleted := false
- svc, err := c.infraFactory.Core().V1().Services().Lister().Services(c.infraNamespace).Get(service.Name)
+ infraSvc, err := c.infraFactory.Core().V1().Services().Lister().Services(c.infraNamespace).Get(service.Name)
if err != nil {
- klog.Infof("Service %s in namespace %s is deleted.", service.Name, service.Namespace)
+ // The Service is not present in the infra lister => treat as deleted
+ klog.Infof("Service %s in namespace %s is deleted (or not found).", service.Name, service.Namespace)
serviceDeleted = true
} else {
- service = svc
+ // Use the actual object from the lister, so we have the latest state
+ service = infraSvc
}
+ /*
+ 2) Get all existing EndpointSlices in the infra cluster that belong to this LB Service.
+ We'll decide which of them should be updated or deleted.
+ */
infraExistingEpSlices, err := c.getInfraEPSFromInfraService(ctx, service)
if err != nil {
return err
}
- // At this point we have the current state of the 3 main objects we are interested in:
- // 1. The Service in the infra cluster, the one created by the KubevirtCloudController.
- // 2. The EndpointSlices in the tenant cluster, created for the tenant cluster's Service.
- // 3. The EndpointSlices in the infra cluster, managed by this controller.
-
slicesToDelete := []*discovery.EndpointSlice{}
slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice)
+ // For example, if the service is single-stack IPv4 => only AddressTypeIPv4
+ // or if dual-stack => IPv4 and IPv6, etc.
serviceSupportedAddressesTypes := getAddressTypesForService(service)
- // If the services switched to a different address type, we need to delete the old ones, because it's immutable.
- // If the services switched to a different externalTrafficPolicy, we need to delete the old ones.
+
+ /*
+ 3) Determine which slices to delete, and which to pass on to the normal
+ "reconcileByAddressType" logic.
+
+ - If 'serviceDeleted' is true OR service.Spec.Selector != nil, we remove them.
+ - Also, if the slice's address type is unsupported by the Service, we remove it.
+ */
for _, eps := range infraExistingEpSlices {
- if service.Spec.Selector != nil || serviceDeleted {
- klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has a selector", eps.Name, eps.Namespace)
- // to be sure we don't delete any slice that is not managed by us
+ // If service is deleted or has a non-nil selector => remove slices
+ if serviceDeleted || service.Spec.Selector != nil {
+ /*
+ Only remove if it is clearly labeled as managed by us:
+ we do not want to accidentally remove slices that are not
+ created by this controller.
+ */
if c.managedByController(eps) {
+ klog.Infof("Added for deletion EndpointSlice %s in namespace %s because service is deleted or has a selector",
+ eps.Name, eps.Namespace)
slicesToDelete = append(slicesToDelete, eps)
}
continue
}
+
+ // If the Service does not support this slice's AddressType => remove
if !serviceSupportedAddressesTypes.Has(eps.AddressType) {
- klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has an unsupported address type: %v", eps.Name, eps.Namespace, eps.AddressType)
+ klog.Infof("Added for deletion EndpointSlice %s in namespace %s because it has an unsupported address type: %v",
+ eps.Name, eps.Namespace, eps.AddressType)
slicesToDelete = append(slicesToDelete, eps)
continue
}
+
+ /*
+ Otherwise, this slice is potentially still valid for the given AddressType,
+ we'll send it to reconcileByAddressType for final merging and updates.
+ */
slicesByAddressType[eps.AddressType] = append(slicesByAddressType[eps.AddressType], eps)
}
- if !serviceDeleted {
- // Get tenant's endpoint slices for this service
+ /*
+ 4) If the Service was NOT deleted and has NO selector (i.e., it's a "no-selector" LB Service),
+ we proceed to handle creation and updates. That means:
+ - Gather Tenant's EndpointSlices
+ - Reconcile them by each AddressType
+ */
+ if !serviceDeleted && service.Spec.Selector == nil {
tenantEpSlices, err := c.getTenantEPSFromInfraService(ctx, service)
if err != nil {
return err
}
- // Reconcile the EndpointSlices for each address type e.g. ipv4, ipv6
+ // For each addressType (ipv4, ipv6, etc.) reconcile the infra slices
for addressType := range serviceSupportedAddressesTypes {
existingSlices := slicesByAddressType[addressType]
- err := c.reconcileByAddressType(service, tenantEpSlices, existingSlices, addressType)
- if err != nil {
+ if err := c.reconcileByAddressType(service, tenantEpSlices, existingSlices, addressType); err != nil {
return err
}
}
}
- // Delete the EndpointSlices that are no longer needed
+ /*
+ 5) Perform the actual deletion of all slices we flagged.
+ In many cases (serviceDeleted or .Spec.Selector != nil),
+ we end up with only "delete" actions and no new slice creation.
+ */
for _, eps := range slicesToDelete {
err := c.infraClient.DiscoveryV1().EndpointSlices(eps.Namespace).Delete(context.TODO(), eps.Name, metav1.DeleteOptions{})
if err != nil {
@@ -588,55 +629,114 @@ func ownedBy(endpointSlice *discovery.EndpointSlice, svc *v1.Service) bool {
return false
}
-func (c *Controller) finalize(service *v1.Service, slicesToCreate []*discovery.EndpointSlice, slicesToUpdate []*discovery.EndpointSlice, slicesToDelete []*discovery.EndpointSlice) error {
- // If there are slices to delete and slices to create, make them as update
- for i := 0; i < len(slicesToDelete); {
+func (c *Controller) finalize(
+ service *v1.Service,
+ slicesToCreate []*discovery.EndpointSlice,
+ slicesToUpdate []*discovery.EndpointSlice,
+ slicesToDelete []*discovery.EndpointSlice,
+) error {
+ /*
+ We try to turn a "delete + create" pair into a single "update" operation
+ if the original slice (slicesToDelete[i]) has the same address type as
+ the first slice in slicesToCreate, and is owned by the same Service.
+
+ However, we must re-check the lengths of slicesToDelete and slicesToCreate
+ within the loop to avoid an out-of-bounds index in slicesToCreate.
+ */
+
+ i := 0
+ for i < len(slicesToDelete) {
+ // If there is nothing to create, break early
if len(slicesToCreate) == 0 {
break
}
- if slicesToDelete[i].AddressType == slicesToCreate[0].AddressType && ownedBy(slicesToDelete[i], service) {
- slicesToCreate[0].Name = slicesToDelete[i].Name
+
+ sd := slicesToDelete[i]
+ sc := slicesToCreate[0] // We can safely do this now, because len(slicesToCreate) > 0
+
+ // If the address type matches, and the slice is owned by the same Service,
+ // then instead of deleting sd and creating sc, we'll transform it into an update:
+ // we rename sc with sd's name, remove sd from the delete list, remove sc from the create list,
+ // and add sc to the update list.
+ if sd.AddressType == sc.AddressType && ownedBy(sd, service) {
+ sliceToUpdate := sc
+ sliceToUpdate.Name = sd.Name
+
+ // Remove the first element from slicesToCreate
slicesToCreate = slicesToCreate[1:]
- slicesToUpdate = append(slicesToUpdate, slicesToCreate[0])
+
+ // Remove the slice from slicesToDelete
slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
+
+ // Now add the renamed slice to the list of slices we want to update
+ slicesToUpdate = append(slicesToUpdate, sliceToUpdate)
+
+ /*
+ Do not increment i here, because we've just removed an element from
+ slicesToDelete. The next slice to examine is now at the same index i.
+ */
} else {
+ // If they don't match, move on to the next slice in slicesToDelete.
i++
}
}
- // Create the new slices if service is not marked for deletion
+ /*
+ If the Service is not being deleted, create all remaining slices in slicesToCreate.
+ (If the Service has a DeletionTimestamp, it means it is going away, so we do not
+ want to create new EndpointSlices.)
+ */
if service.DeletionTimestamp == nil {
for _, slice := range slicesToCreate {
- createdSlice, err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
+ createdSlice, err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Create(
+ context.TODO(),
+ slice,
+ metav1.CreateOptions{},
+ )
if err != nil {
- klog.Errorf("Failed to create EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err)
+ klog.Errorf("Failed to create EndpointSlice %s in namespace %s: %v",
+ slice.Name, slice.Namespace, err)
+ // If the namespace is terminating, it's safe to ignore the error.
if k8serrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
- return nil
+ continue
}
return err
}
- klog.Infof("Created EndpointSlice %s in namespace %s", createdSlice.Name, createdSlice.Namespace)
+ klog.Infof("Created EndpointSlice %s in namespace %s",
+ createdSlice.Name, createdSlice.Namespace)
}
}
- // Update slices
+ // Update slices that are in the slicesToUpdate list.
for _, slice := range slicesToUpdate {
- _, err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Update(context.TODO(), slice, metav1.UpdateOptions{})
+ _, err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Update(
+ context.TODO(),
+ slice,
+ metav1.UpdateOptions{},
+ )
if err != nil {
- klog.Errorf("Failed to update EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err)
+ klog.Errorf("Failed to update EndpointSlice %s in namespace %s: %v",
+ slice.Name, slice.Namespace, err)
return err
}
- klog.Infof("Updated EndpointSlice %s in namespace %s", slice.Name, slice.Namespace)
+ klog.Infof("Updated EndpointSlice %s in namespace %s",
+ slice.Name, slice.Namespace)
}
- // Delete slices
+ // Finally, delete slices that are in slicesToDelete and are no longer needed.
for _, slice := range slicesToDelete {
- err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Delete(context.TODO(), slice.Name, metav1.DeleteOptions{})
+ err := c.infraClient.DiscoveryV1().EndpointSlices(slice.Namespace).Delete(
+ context.TODO(),
+ slice.Name,
+ metav1.DeleteOptions{},
+ )
if err != nil {
- klog.Errorf("Failed to delete EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err)
+ klog.Errorf("Failed to delete EndpointSlice %s in namespace %s: %v",
+ slice.Name, slice.Namespace, err)
return err
}
- klog.Infof("Deleted EndpointSlice %s in namespace %s", slice.Name, slice.Namespace)
+ klog.Infof("Deleted EndpointSlice %s in namespace %s",
+ slice.Name, slice.Namespace)
}
return nil
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go
index 5326faa4..c3167911 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller_test.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go
@@ -653,51 +653,43 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
- // Define several unique ports for the Service
+ // Define multiple ports for the Service
servicePorts := []v1.ServicePort{
{
- Name: "client",
- Protocol: v1.ProtocolTCP,
- Port: 10001,
- TargetPort: intstr.FromInt(30396),
- NodePort: 30396,
- AppProtocol: nil,
+ Name: "client",
+ Protocol: v1.ProtocolTCP,
+ Port: 10001,
+ TargetPort: intstr.FromInt(30396),
+ NodePort: 30396,
},
{
- Name: "dashboard",
- Protocol: v1.ProtocolTCP,
- Port: 8265,
- TargetPort: intstr.FromInt(31003),
- NodePort: 31003,
- AppProtocol: nil,
+ Name: "dashboard",
+ Protocol: v1.ProtocolTCP,
+ Port: 8265,
+ TargetPort: intstr.FromInt(31003),
+ NodePort: 31003,
},
{
- Name: "metrics",
- Protocol: v1.ProtocolTCP,
- Port: 8080,
- TargetPort: intstr.FromInt(30452),
- NodePort: 30452,
- AppProtocol: nil,
+ Name: "metrics",
+ Protocol: v1.ProtocolTCP,
+ Port: 8080,
+ TargetPort: intstr.FromInt(30452),
+ NodePort: 30452,
},
}
- // Create a Service with the first port
createAndAssertInfraServiceLB("infra-multiport-service", "tenant-service-name", "test-cluster",
- servicePorts[0],
- v1.ServiceExternalTrafficPolicyLocal)
+ servicePorts[0], v1.ServiceExternalTrafficPolicyLocal)
- // Update the Service by adding the remaining ports
svc, err := testVals.infraClient.CoreV1().Services(infraNamespace).Get(context.TODO(), "infra-multiport-service", metav1.GetOptions{})
Expect(err).To(BeNil())
svc.Spec.Ports = servicePorts
-
_, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
Expect(err).To(BeNil())
var epsListMultiPort *discoveryv1.EndpointSliceList
- // Verify that the EndpointSlice is created with correct unique ports
Eventually(func() (bool, error) {
epsListMultiPort, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
if len(epsListMultiPort.Items) != 1 {
@@ -714,7 +706,6 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
}
}
- // Verify that all expected ports are present and without duplicates
if len(foundPortNames) != len(expectedPortNames) {
return false, err
}
@@ -767,5 +758,81 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
return false, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be recreated by the controller after deletion")
})
+
+ g.It("Should remove EndpointSlices and not recreate them when a previously no-selector Service obtains a selector", func() {
+ testVals.infraClient.Fake.PrependReactor("create", "endpointslices", func(action testing.Action) (bool, runtime.Object, error) {
+ createAction := action.(testing.CreateAction)
+ slice := createAction.GetObject().(*discoveryv1.EndpointSlice)
+ if slice.Name == "" && slice.GenerateName != "" {
+ slice.Name = slice.GenerateName + "-fake001"
+ }
+ return false, slice, nil
+ })
+
+ createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
+
+ createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
+ *createPort("http", 80, v1.ProtocolTCP),
+ []discoveryv1.Endpoint{
+ *createEndpoint("123.45.67.89", "worker-0-test", true, true, false),
+ },
+ )
+
+ noSelectorSvcName := "svc-without-selector"
+ svc := &v1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: noSelectorSvcName,
+ Namespace: infraNamespace,
+ Labels: map[string]string{
+ kubevirt.TenantServiceNameLabelKey: "tenant-service-name",
+ kubevirt.TenantServiceNamespaceLabelKey: tenantNamespace,
+ kubevirt.TenantClusterNameLabelKey: "test-cluster",
+ },
+ },
+ Spec: v1.ServiceSpec{
+ Ports: []v1.ServicePort{
+ {
+ Name: "web",
+ Port: 80,
+ NodePort: 31900,
+ Protocol: v1.ProtocolTCP,
+ TargetPort: intstr.IntOrString{IntVal: 30390},
+ },
+ },
+ Type: v1.ServiceTypeLoadBalancer,
+ ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
+ },
+ }
+
+ _, err := testVals.infraClient.CoreV1().Services(infraNamespace).Create(context.TODO(), svc, metav1.CreateOptions{})
+ Expect(err).To(BeNil())
+
+ Eventually(func() (bool, error) {
+ epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).
+ List(context.TODO(), metav1.ListOptions{})
+ if err != nil {
+ return false, err
+ }
+ return len(epsList.Items) == 1, nil
+ }).Should(BeTrue(), "Controller should create an EndpointSlice in infra cluster for the no-selector LB service")
+
+ svcWithSelector, err := testVals.infraClient.CoreV1().Services(infraNamespace).Get(
+ context.TODO(), noSelectorSvcName, metav1.GetOptions{})
+ Expect(err).To(BeNil())
+
+ svcWithSelector.Spec.Selector = map[string]string{"app": "test-value"}
+ _, err = testVals.infraClient.CoreV1().Services(infraNamespace).
+ Update(context.TODO(), svcWithSelector, metav1.UpdateOptions{})
+ Expect(err).To(BeNil())
+
+ Eventually(func() (bool, error) {
+ epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).
+ List(context.TODO(), metav1.ListOptions{})
+ if err != nil {
+ return false, err
+ }
+ return len(epsList.Items) == 0, nil
+ }).Should(BeTrue(), "All EndpointSlices should be removed after Service acquires a selector (no new slices created)")
+ })
})
})

View File

@@ -0,0 +1,769 @@
diff --git a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go
index 74166b5d..4e744f8d 100644
--- a/cmd/kubevirt-cloud-controller-manager/kubevirteps.go
+++ b/cmd/kubevirt-cloud-controller-manager/kubevirteps.go
@@ -101,7 +101,18 @@ func startKubevirtCloudController(
klog.Infof("Setting up kubevirtEPSController")
- kubevirtEPSController := kubevirteps.NewKubevirtEPSController(tenantClient, infraClient, infraDynamic, kubevirtCloud.Namespace())
+ clusterName := ccmConfig.ComponentConfig.KubeCloudShared.ClusterName
+ if clusterName == "" {
+ klog.Fatalf("Required flag --cluster-name is missing")
+ }
+
+ kubevirtEPSController := kubevirteps.NewKubevirtEPSController(
+ tenantClient,
+ infraClient,
+ infraDynamic,
+ kubevirtCloud.Namespace(),
+ clusterName,
+ )
klog.Infof("Initializing kubevirtEPSController")
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index 1ca419b9..b56882c1 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -54,10 +54,10 @@ type Controller struct {
infraDynamic dynamic.Interface
infraFactory informers.SharedInformerFactory
- infraNamespace string
- queue workqueue.RateLimitingInterface
- maxRetries int
-
+ infraNamespace string
+ clusterName string
+ queue workqueue.RateLimitingInterface
+ maxRetries int
maxEndPointsPerSlice int
}
@@ -65,8 +65,9 @@ func NewKubevirtEPSController(
tenantClient kubernetes.Interface,
infraClient kubernetes.Interface,
infraDynamic dynamic.Interface,
- infraNamespace string) *Controller {
-
+ infraNamespace string,
+ clusterName string,
+) *Controller {
tenantFactory := informers.NewSharedInformerFactory(tenantClient, 0)
infraFactory := informers.NewSharedInformerFactoryWithOptions(infraClient, 0, informers.WithNamespace(infraNamespace))
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
@@ -79,6 +80,7 @@ func NewKubevirtEPSController(
infraDynamic: infraDynamic,
infraFactory: infraFactory,
infraNamespace: infraNamespace,
+ clusterName: clusterName,
queue: queue,
maxRetries: 25,
maxEndPointsPerSlice: 100,
@@ -320,22 +322,30 @@ func (c *Controller) processNextItem(ctx context.Context) bool {
// getInfraServiceFromTenantEPS returns the Service in the infra cluster that is associated with the given tenant endpoint slice.
func (c *Controller) getInfraServiceFromTenantEPS(ctx context.Context, slice *discovery.EndpointSlice) (*v1.Service, error) {
- infraServices, err := c.infraClient.CoreV1().Services(c.infraNamespace).List(ctx,
- metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s,%s=%s", kubevirt.TenantServiceNameLabelKey, slice.Labels["kubernetes.io/service-name"],
- kubevirt.TenantServiceNamespaceLabelKey, slice.Namespace)})
+ tenantServiceName := slice.Labels[discovery.LabelServiceName]
+ tenantServiceNamespace := slice.Namespace
+
+ labelSelector := fmt.Sprintf(
+ "%s=%s,%s=%s,%s=%s",
+ kubevirt.TenantServiceNameLabelKey, tenantServiceName,
+ kubevirt.TenantServiceNamespaceLabelKey, tenantServiceNamespace,
+ kubevirt.TenantClusterNameLabelKey, c.clusterName,
+ )
+
+ svcList, err := c.infraClient.CoreV1().Services(c.infraNamespace).List(ctx, metav1.ListOptions{
+ LabelSelector: labelSelector,
+ })
if err != nil {
- klog.Errorf("Failed to get Service in Infra for EndpointSlice %s in namespace %s: %v", slice.Name, slice.Namespace, err)
+ klog.Errorf("Failed to get Service in Infra for EndpointSlice %s in namespace %s: %v", slice.Name, tenantServiceNamespace, err)
return nil, err
}
- if len(infraServices.Items) > 1 {
- // This should never be possible, only one service should exist for a given tenant endpoint slice
- klog.Errorf("Multiple services found for tenant endpoint slice %s in namespace %s", slice.Name, slice.Namespace)
+ if len(svcList.Items) > 1 {
+ klog.Errorf("Multiple services found for tenant endpoint slice %s in namespace %s", slice.Name, tenantServiceNamespace)
return nil, errors.New("multiple services found for tenant endpoint slice")
}
- if len(infraServices.Items) == 1 {
- return &infraServices.Items[0], nil
+ if len(svcList.Items) == 1 {
+ return &svcList.Items[0], nil
}
- // No service found, possible if service is deleted.
return nil, nil
}
@@ -363,16 +373,27 @@ func (c *Controller) getTenantEPSFromInfraService(ctx context.Context, svc *v1.S
// getInfraEPSFromInfraService returns the EndpointSlices in the infra cluster that are associated with the given infra service.
func (c *Controller) getInfraEPSFromInfraService(ctx context.Context, svc *v1.Service) ([]*discovery.EndpointSlice, error) {
var infraEPSSlices []*discovery.EndpointSlice
- klog.Infof("Searching for endpoints on infra cluster for service %s in namespace %s.", svc.Name, svc.Namespace)
- result, err := c.infraClient.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx,
- metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discovery.LabelServiceName, svc.Name)})
+
+ klog.Infof("Searching for EndpointSlices in infra cluster for service %s/%s", svc.Namespace, svc.Name)
+
+ labelSelector := fmt.Sprintf(
+ "%s=%s,%s=%s",
+ discovery.LabelServiceName, svc.Name,
+ kubevirt.TenantClusterNameLabelKey, c.clusterName,
+ )
+
+ result, err := c.infraClient.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx, metav1.ListOptions{
+ LabelSelector: labelSelector,
+ })
if err != nil {
klog.Errorf("Failed to get EndpointSlices for Service %s in namespace %s: %v", svc.Name, svc.Namespace, err)
return nil, err
}
+
for _, eps := range result.Items {
infraEPSSlices = append(infraEPSSlices, &eps)
}
+
return infraEPSSlices, nil
}
@@ -382,15 +403,17 @@ func (c *Controller) reconcile(ctx context.Context, r *Request) error {
return errors.New("could not cast object to service")
}
- /*
- Skip if the given Service is not labeled with the keys that indicate
- it was created/managed by this controller (i.e., not a LoadBalancer
- that we handle).
- */
+ // Skip services not managed by this controller (missing required labels)
if service.Labels[kubevirt.TenantServiceNameLabelKey] == "" ||
service.Labels[kubevirt.TenantServiceNamespaceLabelKey] == "" ||
service.Labels[kubevirt.TenantClusterNameLabelKey] == "" {
- klog.Infof("This LoadBalancer Service: %s is not managed by the %s. Skipping.", service.Name, ControllerName)
+ klog.Infof("Service %s is not managed by this controller. Skipping.", service.Name)
+ return nil
+ }
+
+ // Skip services for other clusters
+ if service.Labels[kubevirt.TenantClusterNameLabelKey] != c.clusterName {
+ klog.Infof("Skipping Service %s: cluster label %q doesn't match our clusterName %q", service.Name, service.Labels[kubevirt.TenantClusterNameLabelKey], c.clusterName)
return nil
}
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go
index c3167911..7525aaa5 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller_test.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go
@@ -120,7 +120,8 @@ func createTenantEPSlice(
Name: name,
Namespace: tenantNamespace,
Labels: map[string]string{
- discoveryv1.LabelServiceName: labelServiceName,
+ discoveryv1.LabelServiceName: labelServiceName,
+ kubevirt.TenantClusterNameLabelKey: "test-cluster",
},
},
AddressType: addressType,
@@ -150,7 +151,7 @@ func createAndAssertVMI(node, nodeName, ip string) {
func createAndAssertTenantSlice(name, labelServiceName string, addressType discoveryv1.AddressType, port discoveryv1.EndpointPort, endpoints []discoveryv1.Endpoint) {
epSlice := createTenantEPSlice(name, labelServiceName, addressType, port, endpoints)
_, _ = testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Create(context.TODO(), epSlice, metav1.CreateOptions{})
- // Check if tenant Endpointslice is created
+ // Check if tenant EndpointSlice is created
Eventually(func() (bool, error) {
eps, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Get(context.TODO(), name, metav1.GetOptions{})
if err == nil || eps != nil {
@@ -190,7 +191,8 @@ func setupTestKubevirtEPSController() *testKubevirtEPSController {
}: "VirtualMachineInstanceList",
})
- controller := NewKubevirtEPSController(tenantClient, infraClient, infraDynamic, "test")
+ // Pass the cluster name ("test-cluster") as an argument.
+ controller := NewKubevirtEPSController(tenantClient, infraClient, infraDynamic, "test", "test-cluster")
err := controller.Init()
if err != nil {
@@ -262,16 +264,16 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
stop()
})
- g.It("Should reconcile a new Endpointslice on the infra cluster", func() {
+ g.It("Should reconcile a new EndpointSlice on the infra cluster", func() {
// Create VMI in infra cluster
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
- // Create Endpoinslices in tenant cluster
+ // Create EndpointSlice in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
- // Create service in infra cluster
+ // Create Service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
@@ -281,17 +283,13 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
// Check if the controller creates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13"))
})
- g.It("Should update the Endpointslice when a tenant Endpointslice is updated", func() {
+ g.It("Should update the EndpointSlice when a tenant EndpointSlice is updated", func() {
ipAddr1 := "123.45.67.11"
ipAddr2 := "123.99.99.99"
@@ -299,12 +297,12 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", ipAddr1)
createAndAssertVMI("worker-1-test", "ip-10-32-5-15", ipAddr2)
- // Create Endpoinslices in tenant cluster
+ // Create EndpointSlice in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint(ipAddr1, "worker-0-test", true, true, false)})
- // Create service in infra cluster
+ // Create Service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
@@ -316,12 +314,11 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
len(epsList.Items[0].Endpoints) == 1 &&
*epsList.Items[0].Endpoints[0].NodeName == "ip-10-32-5-13" {
return true, err
- } else {
- return false, err
}
+ return false, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
- // Update the tenant Endpointslice
+ // Update the tenant EndpointSlice
epSlice := createTenantEPSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{
@@ -331,119 +328,89 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
_, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Update(context.TODO(), epSlice, metav1.UpdateOptions{})
Expect(err).To(BeNil())
- // Check if tenant Endpointslice is updated
+ // Check if tenant EndpointSlice is updated
Eventually(func() (bool, error) {
epsList, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2, err
}).Should(BeTrue(), "EndpointSlice in tenant cluster should be updated")
// Check if the controller updates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be updated by the controller reconciler")
})
- g.It("Should update the Endpointslice when the infra Service external traffic policy changes.", func() {
+ g.It("Should update the EndpointSlice when the infra Service external traffic policy changes", func() {
// Create VMI in infra cluster
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
- // Create Endpoinslices in tenant cluster
+ // Create EndpointSlice in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
- // Create service in infra cluster
+ // Create Service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
var epsList *discoveryv1.EndpointSliceList
var err error
- // Check if the controller creates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
-
Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13"))
- // Update the service's external traffic policy to Cluster
+ // Update the Service's external traffic policy to Cluster
svc := createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyCluster)
-
_, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 0 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 0, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be deleted by the controller reconciler")
- // Update the service's external traffic policy to Local
+ // Update the Service's external traffic policy back to Local
svc = createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
-
_, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 {
- return true, err
- } else {
- return false, err
- }
- }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
+ return len(epsList.Items) == 1, err
+ }).Should(BeTrue(), "EndpointSlice in infra cluster should be recreated by the controller reconciler")
})
- g.It("Should update the Endpointslice when the infra Service labels are updated.", func() {
+ g.It("Should update the EndpointSlice when the infra Service labels are updated", func() {
// Create VMI in infra cluster
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
- // Create Endpoinslices in tenant cluster
+ // Create EndpointSlice in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
- // Create service in infra cluster
+ // Create Service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
var epsList *discoveryv1.EndpointSliceList
var err error
- // Check if the controller creates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
-
Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13"))
- // Update the service's labels
+ // Update the Service's labels
svc := createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
@@ -460,17 +427,14 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
return true, err
}
return false, err
- } else {
- return false, err
}
+ return false, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should have the two added labels")
- // Update the service's external traffic policy to Cluster
svc = createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
svc.Labels["test-label"] = "test-value"
-
_, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
Expect(err).To(BeNil())
@@ -481,29 +445,22 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
return true, err
}
return false, err
- } else {
- return false, err
}
+ return false, err
}).Should(BeTrue(), "EndpointSlice in infra cluster still has the two added labels")
})
- g.It("Should update the Endpointslice when the infra Service port is updated.", func() {
- // Create VMI in infra cluster
+ g.It("Should update the EndpointSlice when the infra Service port is updated", func() {
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
-
- // Create Endpoinslices in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
-
- // Create service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
var epsList *discoveryv1.EndpointSliceList
var err error
- // Check if the controller creates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
if len(epsList.Items) == 1 {
@@ -511,18 +468,14 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
return true, err
}
return false, err
- } else {
- return false, err
}
+ return false, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
-
Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13"))
- // Update the service's port
svc := createInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30440}},
v1.ServiceExternalTrafficPolicyLocal)
-
_, err = testVals.infraClient.CoreV1().Services(infraNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
Expect(err).To(BeNil())
@@ -533,29 +486,22 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
return true, err
}
return false, err
- } else {
- return false, err
}
- }).Should(BeTrue(), "EndpointSlice in infra cluster should have the two added labels")
+ return false, err
+ }).Should(BeTrue(), "EndpointSlice in infra cluster should reflect updated port")
})
- g.It("Should delete the Endpointslice when the Service in infra is deleted", func() {
- // Create VMI in infra cluster
+ g.It("Should delete the EndpointSlice when the Service in infra is deleted", func() {
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
-
- // Create Endpoinslices in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
-
- // Create service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
var epsList *discoveryv1.EndpointSliceList
var err error
- // Check if the controller creates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
if len(epsList.Items) == 1 {
@@ -563,72 +509,47 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
return true, err
}
return false, err
- } else {
- return false, err
}
- }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
-
+ return false, err
+ }).Should(BeTrue(), "EndpointSlice should be created by the controller")
Expect(*epsList.Items[0].Endpoints[0].NodeName).To(Equal("ip-10-32-5-13"))
- // Delete the service
err = testVals.infraClient.CoreV1().Services(infraNamespace).Delete(context.TODO(), "infra-service-name", metav1.DeleteOptions{})
Expect(err).To(BeNil())
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 0 {
- return true, err
- } else {
- return false, err
- }
- }).Should(BeTrue(), "EndpointSlice in infra cluster should be deleted.")
+ return len(epsList.Items) == 0, err
+ }).Should(BeTrue(), "EndpointSlice should be deleted when the Service is removed")
})
- g.It("Should not update the Endpointslice on the infra cluster because VMI is not present", func() {
- // Create VMI in infra cluster
+ g.It("Should not update the EndpointSlice on the infra cluster because VMI is not present", func() {
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
-
- // Create Endpoinslices in tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
-
- // Create service in infra cluster
createAndAssertInfraServiceLB("infra-service-name", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, v1.ServiceExternalTrafficPolicyLocal)
- // Check if the controller creates the EndpointSlice in the infra cluster
Eventually(func() (bool, error) {
epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 {
- return true, err
- } else {
- return false, err
- }
- }).Should(BeTrue(), "EndpointSlice in infra cluster should be created by the controller reconciler")
+ return len(epsList.Items) == 1, err
+ }).Should(BeTrue(), "EndpointSlice should be created by the controller")
- //
epSlice := createTenantEPSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{
*createEndpoint("123.45.67.89", "worker-0-test", true, true, false),
*createEndpoint("112.34.56.78", "worker-1-test", true, true, false),
})
-
_, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).Update(context.TODO(), epSlice, metav1.UpdateOptions{})
Expect(err).To(BeNil())
- // Check if tenant Endpointslice is updated
Eventually(func() (bool, error) {
epsList, err := testVals.tenantClient.DiscoveryV1().EndpointSlices(tenantNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1 && len(epsList.Items[0].Endpoints) == 2, err
}).Should(BeTrue(), "EndpointSlice in tenant cluster should be updated")
- //Expect call to the infraDynamic.Get to return the VMI
Eventually(func() (bool, error) {
for _, action := range testVals.infraDynamic.Actions() {
if action.Matches("get", "virtualmachineinstances") &&
@@ -641,19 +562,14 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
}
return false, nil
}).Should(BeTrue(), "Expect call to the infraDynamic.Get to return the VMI")
-
})
g.It("Should correctly handle multiple unique ports in EndpointSlice", func() {
- // Create a VMI in the infra cluster
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
-
- // Create an EndpointSlice in the tenant cluster
createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
- // Define multiple ports for the Service
servicePorts := []v1.ServicePort{
{
Name: "client",
@@ -689,7 +605,6 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
Expect(err).To(BeNil())
var epsListMultiPort *discoveryv1.EndpointSliceList
-
Eventually(func() (bool, error) {
epsListMultiPort, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
if len(epsListMultiPort.Items) != 1 {
@@ -698,34 +613,26 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
createdSlice := epsListMultiPort.Items[0]
expectedPortNames := []string{"client", "dashboard", "metrics"}
- foundPortNames := []string{}
-
+ var foundPortNames []string
for _, port := range createdSlice.Ports {
if port.Name != nil {
foundPortNames = append(foundPortNames, *port.Name)
}
}
-
if len(foundPortNames) != len(expectedPortNames) {
return false, err
}
-
portSet := sets.NewString(foundPortNames...)
expectedPortSet := sets.NewString(expectedPortNames...)
return portSet.Equal(expectedPortSet), err
}).Should(BeTrue(), "EndpointSlice should contain all unique ports from the Service without duplicates")
+ })
g.It("Should reconcile after infra EndpointSlice deletion and restore it", func() {
- // Create a VMI in the infra cluster
- // This ensures that when tenant EndpointSlice is created, it can be reconciled properly
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
-
- // Create an EndpointSlice in the tenant cluster representing the desired state
createAndAssertTenantSlice("test-epslice-infra", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})
-
- // Create a Service in the infra cluster that should trigger the creation of an EndpointSlice in the infra cluster
createAndAssertInfraServiceLB("infra-service-restore", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)
@@ -733,106 +640,38 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
var epsList *discoveryv1.EndpointSliceList
var err error
- // Wait until the infra EndpointSlice is created by the controller
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- if len(epsList.Items) == 1 {
- return true, err
- } else {
- return false, err
- }
+ return len(epsList.Items) == 1, err
}).Should(BeTrue(), "Infra EndpointSlice should be created by the controller")
- // Now, simulate an external deletion of the EndpointSlice in the infra cluster
err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).Delete(context.TODO(), epsList.Items[0].Name, metav1.DeleteOptions{})
Expect(err).To(BeNil(), "Deleting infra EndpointSlice should succeed")
- // The controller, now watching infra EndpointSlices, should detect the removal
- // and trigger a reconcile to restore it.
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
- // After some time, we expect exactly one EndpointSlice to be recreated.
- if err == nil && len(epsList.Items) == 1 {
- return true, nil
- }
- return false, err
- }).Should(BeTrue(), "EndpointSlice in infra cluster should be recreated by the controller after deletion")
+ return len(epsList.Items) == 1, err
+ }).Should(BeTrue(), "EndpointSlice should be recreated after deletion")
})
- g.It("Should remove EndpointSlices and not recreate them when a previously no-selector Service obtains a selector", func() {
- testVals.infraClient.Fake.PrependReactor("create", "endpointslices", func(action testing.Action) (bool, runtime.Object, error) {
- createAction := action.(testing.CreateAction)
- slice := createAction.GetObject().(*discoveryv1.EndpointSlice)
- if slice.Name == "" && slice.GenerateName != "" {
- slice.Name = slice.GenerateName + "-fake001"
- }
- return false, slice, nil
- })
-
- createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")
-
- createAndAssertTenantSlice("test-epslice", "tenant-service-name", discoveryv1.AddressTypeIPv4,
- *createPort("http", 80, v1.ProtocolTCP),
- []discoveryv1.Endpoint{
- *createEndpoint("123.45.67.89", "worker-0-test", true, true, false),
- },
- )
-
- noSelectorSvcName := "svc-without-selector"
- svc := &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: noSelectorSvcName,
- Namespace: infraNamespace,
- Labels: map[string]string{
- kubevirt.TenantServiceNameLabelKey: "tenant-service-name",
- kubevirt.TenantServiceNamespaceLabelKey: tenantNamespace,
- kubevirt.TenantClusterNameLabelKey: "test-cluster",
- },
- },
- Spec: v1.ServiceSpec{
- Ports: []v1.ServicePort{
- {
- Name: "web",
- Port: 80,
- NodePort: 31900,
- Protocol: v1.ProtocolTCP,
- TargetPort: intstr.IntOrString{IntVal: 30390},
- },
- },
- Type: v1.ServiceTypeLoadBalancer,
- ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal,
- },
- }
-
+ // New test: verify that Services with a different cluster label are ignored
+ g.It("Should ignore Services from a different cluster", func() {
+ // Create a Service with cluster label "other-cluster"
+ svc := createInfraServiceLB("infra-service-conflict", "tenant-service-name", "other-cluster",
+ v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
+ v1.ServiceExternalTrafficPolicyLocal)
_, err := testVals.infraClient.CoreV1().Services(infraNamespace).Create(context.TODO(), svc, metav1.CreateOptions{})
Expect(err).To(BeNil())
+ // The controller should ignore this Service, so no EndpointSlice should be created.
Eventually(func() (bool, error) {
- epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).
- List(context.TODO(), metav1.ListOptions{})
- if err != nil {
- return false, err
- }
- return len(epsList.Items) == 1, nil
- }).Should(BeTrue(), "Controller should create an EndpointSlice in infra cluster for the no-selector LB service")
-
- svcWithSelector, err := testVals.infraClient.CoreV1().Services(infraNamespace).Get(
- context.TODO(), noSelectorSvcName, metav1.GetOptions{})
- Expect(err).To(BeNil())
-
- svcWithSelector.Spec.Selector = map[string]string{"app": "test-value"}
- _, err = testVals.infraClient.CoreV1().Services(infraNamespace).
- Update(context.TODO(), svcWithSelector, metav1.UpdateOptions{})
- Expect(err).To(BeNil())
-
- Eventually(func() (bool, error) {
- epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).
- List(context.TODO(), metav1.ListOptions{})
+ epsList, err := testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
+ // Expect zero slices since cluster label does not match "test-cluster"
return len(epsList.Items) == 0, nil
- }).Should(BeTrue(), "All EndpointSlices should be removed after Service acquires a selector (no new slices created)")
+ }).Should(BeTrue(), "Services with a different cluster label should be ignored")
})
})
})