Fix EndpointSlice reconciliation (#518)

Upstream fixes:

- https://github.com/kubevirt/cloud-provider-kubevirt/pull/335
- https://github.com/kubevirt/cloud-provider-kubevirt/pull/336

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

- **New Features**
  - Incremented Kubernetes chart version to 0.14.1.
- Introduced a new cloud provider controller for managing EndpointSlices
in KubeVirt, enhancing responsiveness to service changes.

- **Improvements**
- Updated Docker image tag for kubevirt-cloud-provider to use the latest
version.
- Enhanced handling of EndpointSlices for LoadBalancer services,
improving service management.

- **Bug Fixes**
- Improved error handling and logging for service retrieval and
EndpointSlice management.

- **Documentation**
- Updated version mappings in the versions map file for clarity and
tracking.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Andrei Kvapil <kvapss@gmail.com>
This commit is contained in:
Andrei Kvapil
2024-12-09 11:10:51 +01:00
committed by GitHub
parent d14b66cea5
commit aebf471103
7 changed files with 157 additions and 1895 deletions

View File

@@ -16,7 +16,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.14.0
version: 0.14.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to

View File

@@ -1 +1 @@
ghcr.io/aenix-io/cozystack/kubevirt-cloud-provider:0.14.0@sha256:55b78220b60773eefb7b7d3451d7ab9fe89fb6b989e8fe2ae214aab164f00293
ghcr.io/aenix-io/cozystack/kubevirt-cloud-provider:latest@sha256:8fc186c44669c15d001d84035caae2fe4676dc8eb0bce75496cff500d36e7570

View File

@@ -3,13 +3,14 @@ FROM --platform=linux/amd64 golang:1.20.6 AS builder
RUN git clone https://github.com/kubevirt/cloud-provider-kubevirt /go/src/kubevirt.io/cloud-provider-kubevirt \
&& cd /go/src/kubevirt.io/cloud-provider-kubevirt \
&& git checkout adbd6c27468b86b020cf38490e84f124ef24ab62
&& git checkout da9e0cf
WORKDIR /go/src/kubevirt.io/cloud-provider-kubevirt
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/291
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/335
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/336
ADD patches /patches
RUN git apply /patches/external-traffic-policy-local.diff
RUN git apply /patches/*.diff
RUN go get 'k8s.io/endpointslice/util@v0.28' 'k8s.io/apiserver@v0.28'
RUN go mod tidy
RUN go mod vendor

View File

@@ -0,0 +1,20 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index a3c1aa33..95c31438 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -412,11 +412,11 @@ func (c *Controller) reconcileByAddressType(service *v1.Service, tenantSlices []
// Create the desired port configuration
var desiredPorts []discovery.EndpointPort
- for _, port := range service.Spec.Ports {
+ for i := range service.Spec.Ports {
desiredPorts = append(desiredPorts, discovery.EndpointPort{
- Port: &port.TargetPort.IntVal,
- Protocol: &port.Protocol,
- Name: &port.Name,
+ Port: &service.Spec.Ports[i].TargetPort.IntVal,
+ Protocol: &service.Spec.Ports[i].Protocol,
+ Name: &service.Spec.Ports[i].Name,
})
}

View File

@@ -0,0 +1,129 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index a3c1aa33..6f6e3d32 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request {
}
func (c *Controller) Init() error {
-
- // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function.
- // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly.
+ // Existing Service event handlers...
_, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
- // cast obj to Service
svc := obj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(AddReq, obj, nil))
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
- // cast obj to Service
newSvc := newObj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name)
c.queue.Add(newRequest(UpdateReq, newObj, oldObj))
}
},
DeleteFunc: func(obj interface{}) {
- // cast obj to Service
svc := obj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(DeleteReq, obj, nil))
@@ -144,7 +136,7 @@ func (c *Controller) Init() error {
return err
}
- // Monitor endpoint slices that we are interested in based on known services in the infra cluster
+ // Existing EndpointSlice event handlers in tenant cluster...
_, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eps := obj.(*discovery.EndpointSlice)
@@ -194,10 +186,80 @@ func (c *Controller) Init() error {
return err
}
- //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes
+ // Add an informer for EndpointSlices in the infra cluster
+ _, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ eps := obj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(AddReq, svc, nil))
+ }
+ }
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ eps := newObj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(UpdateReq, svc, nil))
+ }
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ eps := obj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(DeleteReq, svc, nil))
+ }
+ }
+ },
+ })
+ if err != nil {
+ return err
+ }
+
return nil
}
+// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice.
+// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond
+// to the Service name. If not found or if the Service doesn't exist, it returns nil.
+func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) {
+ svcName := eps.Labels[discovery.LabelServiceName]
+ if svcName == "" {
+ // No service name label found, can't determine infra service.
+ return nil, nil
+ }
+
+ svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{})
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // Service doesn't exist
+ return nil, nil
+ }
+ return nil, err
+ }
+
+ return svc, nil
+}
+
// Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster.
func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
defer utilruntime.HandleCrash()

View File

@@ -41,7 +41,8 @@ kubernetes 0.11.1 4f430a90
kubernetes 0.12.0 74649f8
kubernetes 0.12.1 28fca4e
kubernetes 0.13.0 ced8e5b9
kubernetes 0.14.0 HEAD
kubernetes 0.14.0 bfbde07c
kubernetes 0.14.1 HEAD
mysql 0.1.0 f642698
mysql 0.2.0 8b975ff0
mysql 0.3.0 5ca8823