diff --git a/Makefile b/Makefile index fd5fa28e..cd94d5db 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ build: build-deps make -C packages/system/cilium image make -C packages/system/kubeovn image make -C packages/system/kubeovn-webhook image + make -C packages/system/kubeovn-plunger image make -C packages/system/dashboard image make -C packages/system/metallb image make -C packages/system/kamaji image diff --git a/cmd/kubeovn-plunger/main.go b/cmd/kubeovn-plunger/main.go new file mode 100644 index 00000000..210405ce --- /dev/null +++ b/cmd/kubeovn-plunger/main.go @@ -0,0 +1,176 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "crypto/tls" + "flag" + "os" + + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) + // to ensure that exec-entrypoint and run can make use of them. + _ "k8s.io/client-go/plugin/pkg/client/auth" + + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/webhook" + + "github.com/cozystack/cozystack/internal/controller/kubeovnplunger" + // +kubebuilder:scaffold:imports +) + +var ( + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + // +kubebuilder:scaffold:scheme +} + +func main() { + var metricsAddr string + var enableLeaderElection bool + var probeAddr string + var kubeOVNNamespace string + var ovnCentralName string + var secureMetrics bool + var enableHTTP2 bool + var disableTelemetry bool + var tlsOpts []func(*tls.Config) + flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ + "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") + flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.StringVar(&kubeOVNNamespace, "kube-ovn-namespace", "cozy-kubeovn", "Namespace where kube-OVN is deployed.") + flag.StringVar(&ovnCentralName, "ovn-central-name", "ovn-central", "Ovn-central deployment name.") + flag.BoolVar(&enableLeaderElection, "leader-elect", false, + "Enable leader election for controller manager. "+ + "Enabling this will ensure there is only one active controller manager.") + flag.BoolVar(&secureMetrics, "metrics-secure", true, + "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.") + flag.BoolVar(&enableHTTP2, "enable-http2", false, + "If set, HTTP/2 will be enabled for the metrics and webhook servers") + flag.BoolVar(&disableTelemetry, "disable-telemetry", false, + "Disable telemetry collection") + opts := zap.Options{ + Development: false, + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + + // if the enable-http2 flag is false (the default), http/2 should be disabled + // due to its vulnerabilities. More specifically, disabling http/2 will + // prevent from being vulnerable to the HTTP/2 Stream Cancellation and + // Rapid Reset CVEs. For more information see: + // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3 + // - https://github.com/advisories/GHSA-4374-p667-p6c8 + disableHTTP2 := func(c *tls.Config) { + setupLog.Info("disabling http/2") + c.NextProtos = []string{"http/1.1"} + } + + if !enableHTTP2 { + tlsOpts = append(tlsOpts, disableHTTP2) + } + + webhookServer := webhook.NewServer(webhook.Options{ + TLSOpts: tlsOpts, + }) + + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: metricsAddr, + SecureServing: secureMetrics, + TLSOpts: tlsOpts, + } + + if secureMetrics { + // FilterProvider is used to protect the metrics endpoint with authn/authz. + // These configurations ensure that only authorized users and service accounts + // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info: + // https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/filters#WithAuthenticationAndAuthorization + metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization + + // TODO(user): If CertDir, CertName, and KeyName are not specified, controller-runtime will automatically + // generate self-signed certificates for the metrics server. While convenient for development and testing, + // this setup is not recommended for production. + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Metrics: metricsServerOptions, + WebhookServer: webhookServer, + HealthProbeBindAddress: probeAddr, + LeaderElection: enableLeaderElection, + LeaderElectionID: "29a0338b.cozystack.io", + // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily + // when the Manager ends. This requires the binary to immediately end when the + // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly + // speeds up voluntary leader transitions as the new leader don't have to wait + // LeaseDuration time first. + // + // In the default scaffold provided, the program ends immediately after + // the manager stops, so would be fine to enable this option. However, + // if you are doing or is intended to do any operation such as perform cleanups + // after the manager stops then its usage might be unsafe. + // LeaderElectionReleaseOnCancel: true, + }) + if err != nil { + setupLog.Error(err, "unable to create manager") + os.Exit(1) + } + + if err = (&kubeovnplunger.KubeOVNPlunger{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Registry: metrics.Registry, + }).SetupWithManager(mgr, kubeOVNNamespace, ovnCentralName); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "KubeOVNPlunger") + os.Exit(1) + } + + // +kubebuilder:scaffold:builder + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } + + setupLog.Info("starting manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } +} diff --git a/go.mod b/go.mod index 35f9bc60..041aa96a 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/imdario/mergo v0.3.6 // indirect @@ -66,9 +67,11 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/moby/spdystream v0.4.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.1 // indirect diff --git a/go.sum b/go.sum index c7aaa336..a1090f31 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -115,6 +117,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/moby/spdystream v0.4.0 h1:Vy79D6mHeJJjiPdFEL2yku1kl0chZpJfZcPpb16BRl8= +github.com/moby/spdystream v0.4.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -122,6 +126,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= diff --git a/internal/controller/kubeovnplunger/kubeovn_plunger.go b/internal/controller/kubeovnplunger/kubeovn_plunger.go new file mode 100644 index 00000000..5d4b3d1d --- /dev/null +++ b/internal/controller/kubeovnplunger/kubeovn_plunger.go @@ -0,0 +1,280 @@ +package kubeovnplunger + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "time" + + "github.com/cozystack/cozystack/internal/sse" + "github.com/cozystack/cozystack/pkg/ovnstatus" + "github.com/prometheus/client_golang/prometheus" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +var ( + srv *sse.Server +) + +const ( + rescanInterval = 1 * time.Minute +) + +// KubeOVNPlunger watches the ovn-central cluster members +type KubeOVNPlunger struct { + client.Client + Scheme *runtime.Scheme + ClientSet kubernetes.Interface + REST *rest.Config + Registry prometheus.Registerer + metrics metrics + lastLeader map[string]string + seenCIDs map[string]map[string]struct{} +} + +// Reconcile runs the checks on the ovn-central members to see if their views of the cluster are consistent +func (r *KubeOVNPlunger) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + l := log.FromContext(ctx) + + deploy := &appsv1.Deployment{} + if err := r.Get(ctx, req.NamespacedName, deploy); err != nil { + return ctrl.Result{}, err + } + + iphints := map[string]string{} + for _, env := range deploy.Spec.Template.Spec.Containers[0].Env { + if env.Name != "NODE_IPS" { + continue + } + for _, ip := range strings.Split(env.Value, ",") { + iphints[ip] = "" + } + break + } + if len(iphints) == 0 { + l.Info("WARNING: running without IP hints, some error conditions cannot be detected") + } + pods := &corev1.PodList{} + + if err := r.List(ctx, pods, client.InNamespace(req.Namespace), client.MatchingLabels(map[string]string{"app": req.Name})); err != nil { + return ctrl.Result{}, fmt.Errorf("list ovn-central pods: %w", err) + } + + nbmv := make([]ovnstatus.MemberView, 0, len(pods.Items)) + sbmv := make([]ovnstatus.MemberView, 0, len(pods.Items)) + nbSnaps := make([]ovnstatus.HealthSnapshot, 0, len(pods.Items)) + sbSnaps := make([]ovnstatus.HealthSnapshot, 0, len(pods.Items)) + // TODO: get real iphints + for i := range pods.Items { + o := ovnstatus.OVNClient{} + o.ApplyDefaults() + o.Runner = func(ctx context.Context, bin string, args ...string) (string, error) { + cmd := append([]string{bin}, args...) + eo := ExecOptions{ + Namespace: req.Namespace, + Pod: pods.Items[i].Name, + Container: pods.Items[i].Spec.Containers[0].Name, + Command: cmd, + } + res, err := r.ExecPod(ctx, eo) + if err != nil { + return "", err + } + return res.Stdout, nil + } + nb, sb, err1, err2 := o.HealthBoth(ctx) + if err1 != nil || err2 != nil { + l.Error(fmt.Errorf("health check failed: nb=%w, sb=%w", err1, err2), "pod", pods.Items[i].Name) + continue + } + nbSnaps = append(nbSnaps, nb) + sbSnaps = append(sbSnaps, sb) + nbmv = append(nbmv, ovnstatus.BuildMemberView(nb)) + sbmv = append(sbmv, ovnstatus.BuildMemberView(sb)) + } + r.recordAndPruneCIDs("nb", cidFromSnaps(nbSnaps)) + r.recordAndPruneCIDs("sb", cidFromSnaps(sbSnaps)) + nbmv = ovnstatus.NormalizeViews(nbmv) + sbmv = ovnstatus.NormalizeViews(sbmv) + nbecv := ovnstatus.AnalyzeConsensusWithIPHints(nbmv, &ovnstatus.Hints{ExpectedIPs: iphints}) + sbecv := ovnstatus.AnalyzeConsensusWithIPHints(sbmv, &ovnstatus.Hints{ExpectedIPs: iphints}) + expected := len(iphints) + r.WriteClusterMetrics("nb", nbSnaps, nbecv, expected) + r.WriteClusterMetrics("sb", sbSnaps, sbecv, expected) + r.WriteMemberMetrics("nb", nbSnaps, nbmv, nbecv) + r.WriteMemberMetrics("sb", sbSnaps, sbmv, sbecv) + srv.Publish(nbecv.PrettyString() + sbecv.PrettyString()) + return ctrl.Result{}, nil +} + +// SetupWithManager attaches a generic ticker to trigger a reconcile every seconds +func (r *KubeOVNPlunger) SetupWithManager(mgr ctrl.Manager, kubeOVNNamespace, appName string) error { + r.REST = rest.CopyConfig(mgr.GetConfig()) + cs, err := kubernetes.NewForConfig(r.REST) + if err != nil { + return fmt.Errorf("build clientset: %w", err) + } + r.ClientSet = cs + ch := make(chan event.GenericEvent, 10) + mapFunc := func(context.Context, client.Object) []reconcile.Request { + return []reconcile.Request{{ + NamespacedName: types.NamespacedName{Namespace: kubeOVNNamespace, Name: appName}, + }} + } + mapper := handler.EnqueueRequestsFromMapFunc(mapFunc) + srv = sse.New(sse.Options{ + Addr: ":18080", + AllowCORS: true, + }) + r.initMetrics() + r.lastLeader = make(map[string]string) + r.seenCIDs = map[string]map[string]struct{}{"nb": {}, "sb": {}} + if err := ctrl.NewControllerManagedBy(mgr). + Named("kubeovnplunger"). + WatchesRawSource(source.Channel(ch, mapper)). + Complete(r); err != nil { + return err + } + _ = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + go srv.ListenAndServe() + <-ctx.Done() + _ = srv.Shutdown(context.Background()) + return nil + })) + return mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + ticker := time.NewTicker(rescanInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + ch <- event.GenericEvent{ + Object: &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kubeOVNNamespace, + Name: appName, + }, + }, + } + } + } + })) +} + +type ExecOptions struct { + Namespace string + Pod string + Container string + Command []string // e.g. []string{"sh", "-c", "echo hi"} + Stdin io.Reader // optional + TTY bool // if true, stderr is merged into stdout + Timeout time.Duration // optional overall timeout +} + +type ExecResult struct { + Stdout string + Stderr string + ExitCode *int // nil if not determinable +} + +// ExecPod runs a command in a pod and returns stdout/stderr/exit code. +func (r *KubeOVNPlunger) ExecPod(ctx context.Context, opts ExecOptions) (*ExecResult, error) { + if opts.Namespace == "" || opts.Pod == "" || opts.Container == "" { + return nil, fmt.Errorf("namespace, pod, and container are required") + } + + req := r.ClientSet.CoreV1().RESTClient(). + Post(). + Resource("pods"). + Namespace(opts.Namespace). + Name(opts.Pod). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: opts.Container, + Command: opts.Command, + Stdin: opts.Stdin != nil, + Stdout: true, + Stderr: !opts.TTY, + TTY: opts.TTY, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(r.REST, "POST", req.URL()) + if err != nil { + return nil, fmt.Errorf("spdy executor: %w", err) + } + + var stdout, stderr bytes.Buffer + streamCtx := ctx + if opts.Timeout > 0 { + var cancel context.CancelFunc + streamCtx, cancel = context.WithTimeout(ctx, opts.Timeout) + defer cancel() + } + + streamErr := exec.StreamWithContext(streamCtx, remotecommand.StreamOptions{ + Stdin: opts.Stdin, + Stdout: &stdout, + Stderr: &stderr, + Tty: opts.TTY, + }) + + res := &ExecResult{Stdout: stdout.String(), Stderr: stderr.String()} + if streamErr != nil { + // Try to surface exit code instead of treating all failures as transport errors + type exitCoder interface{ ExitStatus() int } + if ec, ok := streamErr.(exitCoder); ok { + code := ec.ExitStatus() + res.ExitCode = &code + return res, nil + } + return res, fmt.Errorf("exec stream: %w", streamErr) + } + zero := 0 + res.ExitCode = &zero + return res, nil +} + +func (r *KubeOVNPlunger) recordAndPruneCIDs(db, currentCID string) { + + // Mark current as seen + if r.seenCIDs[db] == nil { + r.seenCIDs[db] = map[string]struct{}{} + } + if currentCID != "" { + r.seenCIDs[db][currentCID] = struct{}{} + } + + // Build a set of "still active" CIDs this cycle (could be none if you failed to collect) + active := map[string]struct{}{} + if currentCID != "" { + active[currentCID] = struct{}{} + } + + // Any seen CID that isn't active now is stale -> delete all its series + for cid := range r.seenCIDs[db] { + if _, ok := active[cid]; ok { + continue + } + r.deleteAllFor(db, cid) + delete(r.seenCIDs[db], cid) + } +} diff --git a/internal/controller/kubeovnplunger/kubeovn_plunger_test.go b/internal/controller/kubeovnplunger/kubeovn_plunger_test.go new file mode 100644 index 00000000..d548c9d4 --- /dev/null +++ b/internal/controller/kubeovnplunger/kubeovn_plunger_test.go @@ -0,0 +1,34 @@ +package kubeovnplunger + +import ( + "context" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +var testPlunger *KubeOVNPlunger + +func init() { + scheme := runtime.NewScheme() + cfg := config.GetConfigOrDie() + c, _ := client.New(cfg, client.Options{}) + cs, _ := kubernetes.NewForConfig(cfg) + testPlunger = &KubeOVNPlunger{ + Client: c, + Scheme: scheme, + ClientSet: cs, + REST: cfg, + } +} + +func TestPlungerGetsStatuses(t *testing.T) { + _, err := testPlunger.Reconcile(context.Background(), ctrl.Request{}) + if err != nil { + t.Errorf("error should be nil but it's %s", err) + } +} diff --git a/internal/controller/kubeovnplunger/metrics.go b/internal/controller/kubeovnplunger/metrics.go new file mode 100644 index 00000000..0bdd5970 --- /dev/null +++ b/internal/controller/kubeovnplunger/metrics.go @@ -0,0 +1,423 @@ +package kubeovnplunger + +import ( + "time" + + "github.com/cozystack/cozystack/pkg/ovnstatus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type metrics struct { + // --- Core cluster health (per DB/cid) --- + clusterQuorum *prometheus.GaugeVec // 1/0 + allAgree *prometheus.GaugeVec // 1/0 + membersExpected *prometheus.GaugeVec + membersObserved *prometheus.GaugeVec + ipsExpected *prometheus.GaugeVec + ipsObserved *prometheus.GaugeVec + excessMembers *prometheus.GaugeVec + missingMembers *prometheus.GaugeVec + unexpectedIPsCount *prometheus.GaugeVec + missingExpectedIPsCount *prometheus.GaugeVec + ipConflictsCount *prometheus.GaugeVec + sidAddrDisagreements *prometheus.GaugeVec + + // --- Consensus summary (per DB/cid) --- + consensusMajoritySize *prometheus.GaugeVec + consensusMinoritySize *prometheus.GaugeVec + consensusDiffsTotal *prometheus.GaugeVec + + // --- Detail exports (sparse, keyed by IP/SID) --- + unexpectedIPGauge *prometheus.GaugeVec // {db,cid,ip} -> 1 + missingExpectedIPGauge *prometheus.GaugeVec // {db,cid,ip} -> 1 + ipConflictGauge *prometheus.GaugeVec // {db,cid,ip} -> count(sids) + suspectStaleGauge *prometheus.GaugeVec // {db,cid,sid} -> 1 + + // --- Per-member liveness/freshness (per DB/cid/sid[/ip]) --- + memberConnected *prometheus.GaugeVec // {db,cid,sid,ip} + memberLeader *prometheus.GaugeVec // {db,cid,sid} + memberLastMsgMs *prometheus.GaugeVec // {db,cid,sid} + memberIndex *prometheus.GaugeVec // {db,cid,sid} + memberIndexGap *prometheus.GaugeVec // {db,cid,sid} + memberReporter *prometheus.GaugeVec // {db,cid,sid} + memberMissingReporter *prometheus.GaugeVec // {db,cid,sid} + + // --- Ops/housekeeping --- + leaderTransitionsTotal *prometheus.CounterVec // {db,cid} + collectErrorsTotal *prometheus.CounterVec // {db,cid} + publishEventsTotal *prometheus.CounterVec // {db,cid} + snapshotTimestampSec *prometheus.GaugeVec // {db,cid} +} + +func (r *KubeOVNPlunger) initMetrics() { + p := promauto.With(r.Registry) + + ns := "ovn" + + // --- Core cluster health --- + r.metrics.clusterQuorum = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "quorum", + Help: "1 if cluster has quorum, else 0", + }, []string{"db", "cid"}) + + r.metrics.allAgree = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "all_agree", + Help: "1 if all members report identical membership", + }, []string{"db", "cid"}) + + r.metrics.membersExpected = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "members_expected", + Help: "Expected cluster size (replicas)", + }, []string{"db", "cid"}) + + r.metrics.membersObserved = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "members_observed", + Help: "Observed members (distinct SIDs across views)", + }, []string{"db", "cid"}) + + r.metrics.ipsExpected = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "ips_expected", + Help: "Expected distinct member IPs (from k8s hints)", + }, []string{"db", "cid"}) + + r.metrics.ipsObserved = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "ips_observed", + Help: "Observed distinct member IPs (from OVN views)", + }, []string{"db", "cid"}) + + r.metrics.excessMembers = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "excess_members", + Help: "Members over expected (>=0)", + }, []string{"db", "cid"}) + + r.metrics.missingMembers = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "missing_members", + Help: "Members short of expected (>=0)", + }, []string{"db", "cid"}) + + r.metrics.unexpectedIPsCount = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "unexpected_ips", + Help: "Count of IPs in OVN not present in k8s expected set", + }, []string{"db", "cid"}) + + r.metrics.missingExpectedIPsCount = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "missing_expected_ips", + Help: "Count of expected IPs not found in OVN", + }, []string{"db", "cid"}) + + r.metrics.ipConflictsCount = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "ip_conflicts", + Help: "Number of IPs claimed by multiple SIDs", + }, []string{"db", "cid"}) + + r.metrics.sidAddrDisagreements = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "cluster", Name: "sid_address_disagreements", + Help: "Number of SIDs seen with >1 distinct addresses", + }, []string{"db", "cid"}) + + // --- Consensus summary --- + r.metrics.consensusMajoritySize = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "majority_size", + Help: "Majority group size (0 if none)", + }, []string{"db", "cid"}) + + r.metrics.consensusMinoritySize = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "minority_size", + Help: "Minority group size", + }, []string{"db", "cid"}) + + r.metrics.consensusDiffsTotal = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "diffs_total", + Help: "Total per-reporter differences vs truth (missing + extra + mismatches)", + }, []string{"db", "cid"}) + + // --- Detail exports (sparse) --- + r.metrics.unexpectedIPGauge = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "unexpected_ip", + Help: "Unexpected IP present in OVN; value fixed at 1", + }, []string{"db", "cid", "ip"}) + + r.metrics.missingExpectedIPGauge = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "missing_expected_ip", + Help: "Expected IP missing from OVN; value fixed at 1", + }, []string{"db", "cid", "ip"}) + + r.metrics.ipConflictGauge = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "ip_conflict", + Help: "Number of SIDs claiming the same IP for this key", + }, []string{"db", "cid", "ip"}) + + r.metrics.suspectStaleGauge = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "consensus", Name: "suspect_stale", + Help: "Suspected stale SID candidate for kick; value fixed at 1 (emit only when remediation is warranted)", + }, []string{"db", "cid", "sid"}) + + // --- Per-member liveness/freshness --- + r.metrics.memberConnected = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "connected", + Help: "1 if local server reports connected/quorum, else 0", + }, []string{"db", "cid", "sid", "ip"}) + + r.metrics.memberLeader = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "leader", + Help: "1 if this member is leader, else 0", + }, []string{"db", "cid", "sid"}) + + r.metrics.memberLastMsgMs = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "last_msg_ms", + Help: "Follower->leader 'last msg' age in ms (legacy heuristic). NaN/omit if unknown", + }, []string{"db", "cid", "sid"}) + + r.metrics.memberIndex = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "index", + Help: "Local Raft log index", + }, []string{"db", "cid", "sid"}) + + r.metrics.memberIndexGap = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "index_gap", + Help: "Leader index minus local index (>=0)", + }, []string{"db", "cid", "sid"}) + + r.metrics.memberReporter = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "reporter", + Help: "1 if a self-view from this SID was collected in the scrape cycle", + }, []string{"db", "cid", "sid"}) + + r.metrics.memberMissingReporter = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "member", Name: "missing_reporter", + Help: "1 if SID appears in union but produced no self-view", + }, []string{"db", "cid", "sid"}) + + // --- Ops/housekeeping --- + r.metrics.leaderTransitionsTotal = p.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, Subsystem: "ops", Name: "leader_transitions_total", + Help: "Count of observed leader SID changes", + }, []string{"db", "cid"}) + + r.metrics.collectErrorsTotal = p.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, Subsystem: "ops", Name: "collect_errors_total", + Help: "Count of errors during health collection/analysis", + }, []string{"db", "cid"}) + + r.metrics.publishEventsTotal = p.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, Subsystem: "ops", Name: "publish_events_total", + Help: "Count of SSE publish events (optional)", + }, []string{"db", "cid"}) + + r.metrics.snapshotTimestampSec = p.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, Subsystem: "ops", Name: "snapshot_timestamp_seconds", + Help: "Unix timestamp of the last successful consensus snapshot", + }, []string{"db", "cid"}) +} + +func (r *KubeOVNPlunger) WriteClusterMetrics(db string, snaps []ovnstatus.HealthSnapshot, ecv ovnstatus.ExtendedConsensusResult, expectedReplicas int) { + cid := cidFromSnaps(snaps) + + // Core cluster health + r.metrics.clusterQuorum.WithLabelValues(db, cid).Set(b2f(ecv.HasMajority)) + r.metrics.allAgree.WithLabelValues(db, cid).Set(b2f(ecv.AllAgree)) + r.metrics.membersExpected.WithLabelValues(db, cid).Set(float64(expectedReplicas)) + r.metrics.membersObserved.WithLabelValues(db, cid).Set(float64(ecv.MembersCount)) + r.metrics.ipsExpected.WithLabelValues(db, cid).Set(float64(len(ecv.ConsensusResult.TruthView.Members))) // optional; or len(hints.ExpectedIPs) + r.metrics.ipsObserved.WithLabelValues(db, cid).Set(float64(ecv.DistinctIPCount)) + r.metrics.excessMembers.WithLabelValues(db, cid).Set(float64(ecv.ExpectedExcess)) + r.metrics.missingMembers.WithLabelValues(db, cid).Set(float64(ecv.ExpectedShortfall)) + r.metrics.unexpectedIPsCount.WithLabelValues(db, cid).Set(float64(len(ecv.UnexpectedIPs))) + r.metrics.missingExpectedIPsCount.WithLabelValues(db, cid).Set(float64(len(ecv.MissingExpectedIPs))) + r.metrics.ipConflictsCount.WithLabelValues(db, cid).Set(float64(len(ecv.IPConflicts))) + + // Count SIDs with >1 distinct addresses + disagree := 0 + for _, n := range ecv.SIDAddressDisagreements { + if n > 1 { + disagree++ + } + } + r.metrics.sidAddrDisagreements.WithLabelValues(db, cid).Set(float64(disagree)) + + // Consensus summary + r.metrics.consensusMajoritySize.WithLabelValues(db, cid).Set(float64(len(ecv.MajorityMembers))) + r.metrics.consensusMinoritySize.WithLabelValues(db, cid).Set(float64(len(ecv.MinorityMembers))) + + // Sum diffs across reporters (missing + extra + mismatches) + totalDiffs := 0 + for _, d := range ecv.Diffs { + totalDiffs += len(d.MissingSIDs) + len(d.ExtraSIDs) + len(d.AddressMismatches) + } + r.metrics.consensusDiffsTotal.WithLabelValues(db, cid).Set(float64(totalDiffs)) + + // Sparse per-key exports (reset then re-emit for this {db,cid}) + r.metrics.unexpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + for _, ip := range ecv.UnexpectedIPs { + r.metrics.unexpectedIPGauge.WithLabelValues(db, cid, ip).Set(1) + } + + r.metrics.missingExpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + for _, ip := range ecv.MissingExpectedIPs { + r.metrics.missingExpectedIPGauge.WithLabelValues(db, cid, ip).Set(1) + } + + r.metrics.ipConflictGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + for ip, sids := range ecv.IPConflicts { + r.metrics.ipConflictGauge.WithLabelValues(db, cid, ip).Set(float64(len(sids))) + } + + // Only emit suspects when remediation is warranted (e.g., TooManyMembers / unexpected IPs / conflicts) + r.metrics.suspectStaleGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + if ecv.TooManyMembers || len(ecv.UnexpectedIPs) > 0 || len(ecv.IPConflicts) > 0 { + for _, sid := range ecv.SuspectStaleSIDs { + r.metrics.suspectStaleGauge.WithLabelValues(db, cid, sid).Set(1) + } + } + + // Snapshot timestamp + r.metrics.snapshotTimestampSec.WithLabelValues(db, cid).Set(float64(time.Now().Unix())) +} + +func (r *KubeOVNPlunger) WriteMemberMetrics(db string, snaps []ovnstatus.HealthSnapshot, views []ovnstatus.MemberView, ecv ovnstatus.ExtendedConsensusResult) { + cid := cidFromSnaps(snaps) + + // Figure out current leader SID (prefer local view from any leader snapshot) + curLeader := "" + for _, s := range snaps { + if s.Local.Leader { + curLeader = s.Local.SID + break + } + } + // Leader transitions + key := db + "|" + cid + if prev, ok := r.lastLeader[key]; ok && prev != "" && curLeader != "" && prev != curLeader { + r.metrics.leaderTransitionsTotal.WithLabelValues(db, cid).Inc() + } + if curLeader != "" { + r.lastLeader[key] = curLeader + } + + // Build quick maps for reporter set & IP per SID (best-effort) + reporter := map[string]struct{}{} + for _, v := range views { + if v.FromSID != "" { + reporter[v.FromSID] = struct{}{} + } + } + sidToIP := map[string]string{} + for _, v := range views { + for sid, addr := range v.Members { + if sidToIP[sid] == "" && addr != "" { + sidToIP[sid] = ovnstatus.AddrToIP(addr) // expose addrToIP or wrap here + } + } + } + + // Reset member vectors for this {db,cid} (avoid stale series) + r.metrics.memberConnected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberLeader.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberLastMsgMs.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberIndex.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberIndexGap.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberMissingReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + + // Leader index (to compute gaps) + lIdx := leaderIndex(snaps, curLeader) + + // Emit one series per snapshot (self view) + for _, s := range snaps { + sid := s.Local.SID + ip := sidToIP[sid] + if ip == "" { + ip = "unknown" + } + + r.metrics.memberConnected.WithLabelValues(db, cid, sid, ip).Set(b2f(s.Local.Connected)) + r.metrics.memberLeader.WithLabelValues(db, cid, sid).Set(b2f(s.Local.Leader)) + r.metrics.memberIndex.WithLabelValues(db, cid, sid).Set(float64(s.Local.Index)) + + if lIdx != nil && s.Local.Index >= 0 { + gap := *lIdx - s.Local.Index + if gap < 0 { + gap = 0 + } + r.metrics.memberIndexGap.WithLabelValues(db, cid, sid).Set(float64(gap)) + } + + // Reporter presence + _, isReporter := reporter[sid] + r.metrics.memberReporter.WithLabelValues(db, cid, sid).Set(b2f(isReporter)) + } + + // “Missing reporter” SIDs = union − reporters (from ecv) + reporterSet := map[string]struct{}{} + for sid := range reporter { + reporterSet[sid] = struct{}{} + } + unionSet := map[string]struct{}{} + for _, sid := range ecv.UnionMembers { + unionSet[sid] = struct{}{} + } + for sid := range unionSet { + if _, ok := reporterSet[sid]; !ok { + r.metrics.memberMissingReporter.WithLabelValues(db, cid, sid).Set(1) + } + } + + // Legacy follower freshness (if you kept LastMsgMs in servers parsing) + // We only know LastMsgMs from the Full.Servers in each snapshot; pick the freshest per SID. + lastMsg := map[string]int64{} + for _, s := range snaps { + for _, srv := range s.Full.Servers { + if srv.LastMsgMs != nil { + cur, ok := lastMsg[srv.SID] + if !ok || *srv.LastMsgMs < cur { + lastMsg[srv.SID] = *srv.LastMsgMs + } + } + } + } + for sid, ms := range lastMsg { + r.metrics.memberLastMsgMs.WithLabelValues(db, cid, sid).Set(float64(ms)) + } +} + +func (r *KubeOVNPlunger) deleteAllFor(db, cid string) { + // Cluster-level vecs (db,cid) + r.metrics.clusterQuorum.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.allAgree.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.membersExpected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.membersObserved.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.ipsExpected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.ipsObserved.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.excessMembers.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.missingMembers.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.unexpectedIPsCount.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.missingExpectedIPsCount.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.ipConflictsCount.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.sidAddrDisagreements.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + + r.metrics.consensusMajoritySize.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.consensusMinoritySize.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.consensusDiffsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + + // Sparse detail vecs (db,cid,*) + r.metrics.unexpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.missingExpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.ipConflictGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.suspectStaleGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + + // Per-member vecs (db,cid,*) + r.metrics.memberConnected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberLeader.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberLastMsgMs.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberIndex.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberIndexGap.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.memberMissingReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + + // Ops vecs (db,cid) + r.metrics.leaderTransitionsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.collectErrorsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.publishEventsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) + r.metrics.snapshotTimestampSec.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid}) +} diff --git a/internal/controller/kubeovnplunger/util.go b/internal/controller/kubeovnplunger/util.go new file mode 100644 index 00000000..9d2d1090 --- /dev/null +++ b/internal/controller/kubeovnplunger/util.go @@ -0,0 +1,31 @@ +package kubeovnplunger + +import "github.com/cozystack/cozystack/pkg/ovnstatus" + +func b2f(b bool) float64 { + if b { + return 1 + } + return 0 +} + +// Pull a cluster UUID (cid) from any snapshots’ Local.CID (falls back to "") +func cidFromSnaps(snaps []ovnstatus.HealthSnapshot) string { + for _, s := range snaps { + if s.Local.CID != "" { + return s.Local.CID + } + } + return "" +} + +// Map SID -> last local index to compute gaps (optional) +func leaderIndex(snaps []ovnstatus.HealthSnapshot, leaderSID string) (idx *int64) { + for _, s := range snaps { + if s.Local.SID == leaderSID && s.Local.Index > 0 { + v := s.Local.Index + return &v + } + } + return nil +} diff --git a/internal/sse/server.go b/internal/sse/server.go new file mode 100644 index 00000000..60d983f5 --- /dev/null +++ b/internal/sse/server.go @@ -0,0 +1,293 @@ +// Package sse provides a tiny Server-Sent Events server with pluggable routes. +// No external deps; safe for quick demos and small dashboards. +package sse + +import ( + "context" + "fmt" + "html/template" + "log" + "net/http" + "strings" + "sync" + "time" +) + +// Options configures the SSE server. +type Options struct { + // Addr is the listening address, e.g. ":8080" or "127.0.0.1:0". + Addr string + + // IndexPath is the path serving a minimal live HTML page ("" to disable). + // e.g. "/" or "/status" + IndexPath string + + // StreamPath is the SSE endpoint path, e.g. "/stream". + StreamPath string + + // Title for the index page (cosmetic). + Title string + + // AllowCORS, if true, sets Access-Control-Allow-Origin: * for /stream. + AllowCORS bool + + // ClientBuf is the per-client buffered message queue size. + // If 0, defaults to 16. When full, new messages are dropped for that client. + ClientBuf int + + // Heartbeat sends a comment line every interval to keep connections alive. + // If 0, defaults to 25s. + Heartbeat time.Duration + + // Logger (optional). If nil, log.Printf is used. + Logger *log.Logger +} + +// Server is a simple SSE broadcaster. +type Server struct { + opts Options + mux *http.ServeMux + http *http.Server + + clientsMu sync.RWMutex + clients map[*client]struct{} + + // latest holds the most recent payload (sent to new clients on connect). + latestMu sync.RWMutex + latest string +} + +type client struct { + ch chan string + closeCh chan struct{} + flusher http.Flusher + w http.ResponseWriter + req *http.Request + logf func(string, ...any) + heartbeat time.Duration +} + +func New(opts Options) *Server { + if opts.ClientBuf <= 0 { + opts.ClientBuf = 16 + } + if opts.Heartbeat <= 0 { + opts.Heartbeat = 25 * time.Second + } + if opts.Addr == "" { + opts.Addr = ":8080" + } + if opts.StreamPath == "" { + opts.StreamPath = "/stream" + } + if opts.IndexPath == "" { + opts.IndexPath = "/" + } + s := &Server{ + opts: opts, + mux: http.NewServeMux(), + clients: make(map[*client]struct{}), + } + s.routes() + s.http = &http.Server{ + Addr: opts.Addr, + Handler: s.mux, + ReadHeaderTimeout: 10 * time.Second, + } + return s +} + +func (s *Server) routes() { + if s.opts.IndexPath != "" { + s.mux.HandleFunc(s.opts.IndexPath, s.handleIndex) + } + s.mux.HandleFunc(s.opts.StreamPath, s.handleStream) +} + +func (s *Server) logf(format string, args ...any) { + if s.opts.Logger != nil { + s.opts.Logger.Printf(format, args...) + } else { + log.Printf(format, args...) + } +} + +// ListenAndServe starts the HTTP server (blocking). +func (s *Server) ListenAndServe() error { + s.logf("sse: listening on http://%s (index=%s, stream=%s)", s.http.Addr, s.opts.IndexPath, s.opts.StreamPath) + return s.http.ListenAndServe() +} + +// Shutdown gracefully stops the server. +func (s *Server) Shutdown(ctx context.Context) error { + s.clientsMu.Lock() + for c := range s.clients { + close(c.closeCh) + } + s.clientsMu.Unlock() + return s.http.Shutdown(ctx) +} + +// Publish broadcasts a new payload to all clients and stores it as latest. +func (s *Server) Publish(payload string) { + // Store latest + s.latestMu.Lock() + s.latest = payload + s.latestMu.Unlock() + + // Broadcast + s.clientsMu.RLock() + defer s.clientsMu.RUnlock() + for c := range s.clients { + select { + case c.ch <- payload: + default: + // Drop if client is slow (buffer full) + if s.opts.Logger != nil { + s.opts.Logger.Printf("sse: dropping message to slow client %p", c) + } + } + } +} + +func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + page := indexTemplate(s.opts.Title, s.opts.StreamPath) + _, _ = w.Write([]byte(page)) +} + +func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { + // Required SSE headers + if s.opts.AllowCORS { + w.Header().Set("Access-Control-Allow-Origin", "*") + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + c := &client{ + ch: make(chan string, s.opts.ClientBuf), + closeCh: make(chan struct{}), + flusher: flusher, + w: w, + req: r, + logf: s.logf, + heartbeat: s.opts.Heartbeat, + } + + // Register client + s.clientsMu.Lock() + s.clients[c] = struct{}{} + s.clientsMu.Unlock() + + // Initial comment to open the stream for some proxies + fmt.Fprintf(w, ": connected %s\n\n", time.Now().Format(time.RFC3339)) + flusher.Flush() + + // Send latest if any + s.latestMu.RLock() + latest := s.latest + s.latestMu.RUnlock() + if latest != "" { + writeSSE(w, latest) + flusher.Flush() + } + + // Start pump + go c.pump() + + // Block until client disconnects + <-r.Context().Done() + + // Unregister client + close(c.closeCh) + s.clientsMu.Lock() + delete(s.clients, c) + s.clientsMu.Unlock() +} + +func (c *client) pump() { + t := time.NewTicker(c.heartbeat) + defer t.Stop() + for { + select { + case <-c.closeCh: + return + case msg := <-c.ch: + writeSSE(c.w, msg) + c.flusher.Flush() + case <-t.C: + // heartbeat comment (keeps connections alive through proxies) + fmt.Fprint(c.w, ": hb\n\n") + c.flusher.Flush() + } + } +} + +func writeSSE(w http.ResponseWriter, msg string) { + // Split on lines; each needs its own "data:" field per the SSE spec + lines := strings.Split(strings.TrimRight(msg, "\n"), "\n") + for _, ln := range lines { + fmt.Fprintf(w, "data: %s\n", ln) + } + fmt.Fprint(w, "\n") +} + +// Minimal index page with live updates +func indexTemplate(title, streamPath string) string { + if title == "" { + title = "SSE Stream" + } + if streamPath == "" { + streamPath = "/stream" + } + const tpl = ` + + + +{{.Title}} + + + +

{{.Title}}

+
Connecting…
+

+
+
+`
+	page, _ := template.New("idx").Parse(tpl)
+	var b strings.Builder
+	_ = page.Execute(&b, map[string]any{
+		"Title":  title,
+		"Stream": streamPath,
+	})
+	return b.String()
+}
diff --git a/packages/core/platform/bundles/paas-full.yaml b/packages/core/platform/bundles/paas-full.yaml
index f5b5ab53..6947a3a8 100644
--- a/packages/core/platform/bundles/paas-full.yaml
+++ b/packages/core/platform/bundles/paas-full.yaml
@@ -70,6 +70,12 @@ releases:
   privileged: true
   dependsOn: [cilium,kubeovn,cert-manager]
 
+- name: kubeovn-plunger
+  releaseName: kubeovn-plunger
+  chart: cozy-kubeovn-plunger
+  namespace: cozy-kubeovn
+  dependsOn: [cilium,kubeovn]
+
 - name: cozy-proxy
   releaseName: cozystack
   chart: cozy-cozy-proxy
diff --git a/packages/system/kubeovn-plunger/Chart.yaml b/packages/system/kubeovn-plunger/Chart.yaml
new file mode 100644
index 00000000..ce13b1fe
--- /dev/null
+++ b/packages/system/kubeovn-plunger/Chart.yaml
@@ -0,0 +1,6 @@
+apiVersion: v2
+name: cozy-kubeovn-plunger
+description: External monitoring agent for Kube-OVN ovn-central; collects cluster state and exposes metrics/alerts
+type: application
+version: 0.1.0
+appVersion: "1.0.0"
diff --git a/packages/system/kubeovn-plunger/Makefile b/packages/system/kubeovn-plunger/Makefile
new file mode 100644
index 00000000..a80c7e06
--- /dev/null
+++ b/packages/system/kubeovn-plunger/Makefile
@@ -0,0 +1,19 @@
+export NAME=kubeovn-plunger
+export NAMESPACE=cozy-kubeovn
+
+include ../../../scripts/common-envs.mk
+include ../../../scripts/package.mk
+
+image:
+	docker buildx build -f images/kubeovn-plunger/Dockerfile ../../../ \
+		--provenance false \
+		--tag $(REGISTRY)/kubeovn-plunger:$(call settag,$(TAG)) \
+		--cache-from type=registry,ref=$(REGISTRY)/kubeovn-plunger:latest \
+		--cache-to type=inline \
+		--metadata-file images/kubeovn-plunger.json \
+		--push=$(PUSH) \
+		--label "org.opencontainers.image.source=https://github.com/cozystack/cozystack" \
+		--load=$(LOAD)
+	IMAGE="$(REGISTRY)/kubeovn-plunger:$(call settag,$(TAG))@$$(yq e '."containerimage.digest"' images/kubeovn-plunger.json -o json -r)" \
+		yq -i '.image = strenv(IMAGE)' values.yaml
+	rm -f images/kubeovn-plunger.json
diff --git a/packages/system/kubeovn-plunger/images/kubeovn-plunger/Dockerfile b/packages/system/kubeovn-plunger/images/kubeovn-plunger/Dockerfile
new file mode 100644
index 00000000..a6264ae1
--- /dev/null
+++ b/packages/system/kubeovn-plunger/images/kubeovn-plunger/Dockerfile
@@ -0,0 +1,22 @@
+FROM golang:1.24-alpine AS builder
+
+ARG TARGETOS
+ARG TARGETARCH
+
+WORKDIR /workspace
+
+COPY go.mod go.sum ./
+RUN GOOS=$TARGETOS GOARCH=$TARGETARCH go mod download
+
+COPY pkg pkg/
+COPY cmd cmd/
+COPY internal internal/
+
+RUN GOOS=$TARGETOS GOARCH=$TARGETARCH CGO_ENABLED=0 go build -ldflags="-extldflags=-static" -o /kubeovn-plunger cmd/kubeovn-plunger/main.go
+
+FROM scratch
+
+COPY --from=builder /kubeovn-plunger /kubeovn-plunger
+COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
+
+ENTRYPOINT ["/kubeovn-plunger"]
diff --git a/packages/system/kubeovn-plunger/templates/deployment.yaml b/packages/system/kubeovn-plunger/templates/deployment.yaml
new file mode 100644
index 00000000..dd1430d2
--- /dev/null
+++ b/packages/system/kubeovn-plunger/templates/deployment.yaml
@@ -0,0 +1,35 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: kube-ovn-plunger
+  labels:
+    app.kubernetes.io/name: kube-ovn-plunger
+    app.kubernetes.io/instance: {{ .Release.Name }}
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app.kubernetes.io/name: kube-ovn-plunger
+      app.kubernetes.io/instance: {{ .Release.Name }}
+  template:
+    metadata:
+      labels:
+        app.kubernetes.io/name: kube-ovn-plunger
+        app.kubernetes.io/instance: {{ .Release.Name }}
+    spec:
+      serviceAccountName: kube-ovn-plunger
+      containers:
+      - name: kube-ovn-plunger
+        image: "{{ .Values.image }}"
+        args:
+        {{- if .Values.debug }}
+        - --zap-log-level=debug
+        {{- else }}
+        - --zap-log-level=info
+        {{- end }}
+        - --metrics-bind-address=:8080
+        - --metrics-secure=false
+        - --kube-ovn-namespace={{ .Release.Namespace }}
+        ports:
+        - name: metrics
+          containerPort: 8080
diff --git a/packages/system/kubeovn-plunger/templates/podscrape.yaml b/packages/system/kubeovn-plunger/templates/podscrape.yaml
new file mode 100644
index 00000000..e23b8dcc
--- /dev/null
+++ b/packages/system/kubeovn-plunger/templates/podscrape.yaml
@@ -0,0 +1,11 @@
+apiVersion: operator.victoriametrics.com/v1beta1
+kind: VMPodScrape
+metadata:
+  name: kube-ovn-plunger
+spec:
+  podMetricsEndpoints:
+    - port: metrics
+  selector:
+    matchLabels:
+      app.kubernetes.io/name: kube-ovn-plunger
+      app.kubernetes.io/instance: {{ .Release.Name }}
diff --git a/packages/system/kubeovn-plunger/templates/rbac.yaml b/packages/system/kubeovn-plunger/templates/rbac.yaml
new file mode 100644
index 00000000..a0c5527b
--- /dev/null
+++ b/packages/system/kubeovn-plunger/templates/rbac.yaml
@@ -0,0 +1,39 @@
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+  name: kube-ovn-plunger
+rules:
+- apiGroups:
+  - ""
+  resources:
+  - pods
+  - pods/exec
+  verbs:
+  - get
+  - list
+  - watch
+  - create
+- apiGroups:
+  - apps
+  resources:
+  - deployments
+  verbs:
+  - get
+  - list
+  - watch
+  resourceNames:
+  - {{ .Values.ovnCentralName }}
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+  name: kube-ovn-plunger
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: Role
+  name: kube-ovn-plunger
+subjects:
+- kind: ServiceAccount
+  name: kube-ovn-plunger
+  namespace: {{ .Release.Namespace }}
diff --git a/packages/system/kubeovn-plunger/templates/service.yaml b/packages/system/kubeovn-plunger/templates/service.yaml
new file mode 100644
index 00000000..f2b9dffc
--- /dev/null
+++ b/packages/system/kubeovn-plunger/templates/service.yaml
@@ -0,0 +1,17 @@
+apiVersion: v1
+kind: Service
+metadata:
+  name: kube-ovn-plunger
+  labels:
+    app.kubernetes.io/name: kube-ovn-plunger
+    app.kubernetes.io/instance: {{ .Release.Name }}
+spec:
+  type: ClusterIP
+  ports:
+    - port: 8080
+      targetPort: metrics
+      protocol: TCP
+      name: metrics
+  selector:
+    app.kubernetes.io/name: kube-ovn-plunger
+    app.kubernetes.io/instance: {{ .Release.Name }}
diff --git a/packages/system/kubeovn-plunger/templates/serviceaccount.yaml b/packages/system/kubeovn-plunger/templates/serviceaccount.yaml
new file mode 100644
index 00000000..76b3b228
--- /dev/null
+++ b/packages/system/kubeovn-plunger/templates/serviceaccount.yaml
@@ -0,0 +1,8 @@
+apiVersion: v1
+automountServiceAccountToken: true
+kind: ServiceAccount
+metadata:
+  name: kube-ovn-plunger
+  labels:
+    app.kubernetes.io/name: kube-ovn-plunger
+    app.kubernetes.io/instance: {{ .Release.Name }}
diff --git a/packages/system/kubeovn-plunger/values.yaml b/packages/system/kubeovn-plunger/values.yaml
new file mode 100644
index 00000000..42135285
--- /dev/null
+++ b/packages/system/kubeovn-plunger/values.yaml
@@ -0,0 +1,4 @@
+portSecurity: true
+routes: ""
+image: ghcr.io/cozystack/cozystack/kubeovn-plunger:latest@sha256:a3733b86b3c60fa73cb6749e69d6399736f1ab875ec5fc7887caa8b73aa8b0b2
+ovnCentralName: ovn-central
diff --git a/packages/system/monitoring-agents/alerts/kubeovn-plunger.yaml b/packages/system/monitoring-agents/alerts/kubeovn-plunger.yaml
new file mode 100644
index 00000000..92517a3f
--- /dev/null
+++ b/packages/system/monitoring-agents/alerts/kubeovn-plunger.yaml
@@ -0,0 +1,56 @@
+apiVersion: operator.victoriametrics.com/v1beta1
+kind: VMRule
+metadata:
+  name: alerts-kubeovn-plunger
+spec:
+  groups:
+  - name: kubeovn-plunger
+    params: {}
+    rules:
+    - alert: OVNMemberNotConnected
+      expr: ovn_member_connected == 0
+      for: 2m
+      labels: { severity: warning }
+      annotations:
+        summary: "OVN {{ $labels.db }} member not connected"
+        description: "Member {{ $labels.sid }} (ip={{ $labels.ip }}) reports no cluster connectivity."
+    
+    - alert: OVNFollowerStale
+      expr: ovn_member_last_msg_ms > 10000
+      for: 1m
+      labels: { severity: warning }
+      annotations:
+        summary: "OVN {{ $labels.db }} follower stale"
+        description: "Follower {{ $labels.sid }} has last_msg_ms={{ $value }} (>10s) to leader."
+    
+    - alert: OVNMemberLagging
+      expr: ovn_member_index_gap > 1000
+      for: 2m
+      labels: { severity: warning }
+      annotations:
+        summary: "OVN {{ $labels.db }} member lagging"
+        description: "Log index gap {{ $value }} behind leader (sid={{ $labels.sid }}) is high."
+    
+    - alert: OVNMemberMissingSelfReporter
+      expr: ovn_member_missing_reporter == 1
+      for: 10m
+      labels: { severity: warning }
+      annotations:
+        summary: "OVN {{ $labels.db }} member not reporting"
+        description: "SID {{ $labels.sid }} appears in DB but produced no self-view for ≥10m."
+    
+    - alert: OVNConsensusSplitView
+      expr: ovn_cluster_all_agree == 0 and on (db, cid) ovn_cluster_quorum == 1
+      for: 5m
+      labels: { severity: warning }
+      annotations:
+        summary: "OVN {{ $labels.db }} inconsistent views"
+        description: "Majority exists but not all members agree. Investigate minority nodes."
+    
+    - alert: OVNSuspectStale
+      expr: sum by (db,cid) (ovn_consensus_suspect_stale) > 0
+      for: 2m
+      labels: { severity: warning }
+      annotations:
+        summary: "OVN {{ $labels.db }} stale member(s) suspected"
+        description: "Candidates exist to kick from cluster membership."
diff --git a/packages/system/monitoring-agents/templates/kube-ovn-plunger-scrape.yaml b/packages/system/monitoring-agents/templates/kube-ovn-plunger-scrape.yaml
new file mode 100644
index 00000000..9c7253eb
--- /dev/null
+++ b/packages/system/monitoring-agents/templates/kube-ovn-plunger-scrape.yaml
@@ -0,0 +1,42 @@
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: coredns
+  namespace: kube-system
+  labels:
+    app: coredns
+spec:
+  clusterIP: None
+  ports:
+    - name: http-metrics
+      port: 9153
+      protocol: TCP
+      targetPort: 9153
+  selector:
+    k8s-app: kube-dns
+---
+apiVersion: operator.victoriametrics.com/v1beta1
+kind: VMServiceScrape
+metadata:
+  name: coredns
+  namespace: cozy-monitoring
+spec:
+  selector:
+    matchLabels:
+      app: coredns
+  namespaceSelector:
+    matchNames:
+      - "kube-system"
+  endpoints:
+  - bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
+    port: http-metrics
+    relabelConfigs:
+    - action: labeldrop
+      regex: (endpoint|namespace|pod|container)
+    - replacement: kube-dns
+      targetLabel: job
+    - sourceLabels: [__meta_kubernetes_pod_node_name]
+      targetLabel: node
+    - targetLabel: tier
+      replacement: cluster
diff --git a/pkg/ovnstatus/normalize.go b/pkg/ovnstatus/normalize.go
new file mode 100644
index 00000000..2ed999a4
--- /dev/null
+++ b/pkg/ovnstatus/normalize.go
@@ -0,0 +1,115 @@
+package ovnstatus
+
+import "strings"
+
+// ----- SID normalization (handles legacy "b007" style SIDs) -----
+
+// NormalizeViews expands truncated SIDs in each MemberView's Members map,
+// using IP->fullSID learned from reporters and unique-prefix fallback.
+type sidCanon struct{ raw, canon string }
+
+func NormalizeViews(views []MemberView) []MemberView {
+	// 1) Learn IP -> fullSID from reporters (self entries)
+	ipToFull := map[string]string{}
+	fullSIDs := map[string]struct{}{}
+
+	for _, v := range views {
+		if v.FromSID != "" {
+			fullSIDs[v.FromSID] = struct{}{}
+		}
+		if v.FromAddress != "" {
+			ip := AddrToIP(v.FromAddress)
+			if ip != "" && v.FromSID != "" {
+				ipToFull[ip] = v.FromSID
+			}
+		}
+	}
+
+	// Build a slice for prefix-matching fallback (hyphenless, lowercase)
+	var known []sidCanon
+	for fsid := range fullSIDs {
+		known = append(known, sidCanon{
+			raw:   fsid,
+			canon: canonizeSID(fsid),
+		})
+	}
+
+	// 2) Normalize each view's Members by replacing short SIDs with full SIDs
+	out := make([]MemberView, 0, len(views))
+	for _, v := range views {
+		mv := MemberView{
+			FromSID:     normalizeOneSID(v.FromSID, v.FromAddress, ipToFull, known),
+			FromAddress: v.FromAddress,
+			Members:     make(map[string]string, len(v.Members)),
+		}
+		for sid, addr := range v.Members {
+			full := normalizeOneSIDWithAddr(sid, addr, ipToFull, known)
+			// If remapping causes a collision, prefer keeping the address
+			// from the entry that matches the full SID (no-op), otherwise last write wins.
+			mv.Members[full] = addr
+		}
+		out = append(out, mv)
+	}
+	return out
+}
+
+func normalizeOneSIDWithAddr(sid, addr string, ipToFull map[string]string, known []sidCanon) string {
+	// If it's already full-ish, return as-is
+	if looksFullSID(sid) {
+		return sid
+	}
+	// First try IP mapping
+	if ip := AddrToIP(addr); ip != "" {
+		if fsid, ok := ipToFull[ip]; ok {
+			return fsid
+		}
+	}
+	// Fallback: unique prefix match against known full SIDs (hyphens ignored)
+	return expandByUniquePrefix(sid, known)
+}
+
+func normalizeOneSID(sid, selfAddr string, ipToFull map[string]string, known []sidCanon) string {
+	if looksFullSID(sid) {
+		return sid
+	}
+	if ip := AddrToIP(selfAddr); ip != "" {
+		if fsid, ok := ipToFull[ip]; ok {
+			return fsid
+		}
+	}
+	return expandByUniquePrefix(sid, known)
+}
+
+func looksFullSID(s string) bool {
+	// Heuristic: a v4 UUID with hyphens is 36 chars.
+	// Some builds may print full without hyphens (32). Treat >= 32 hex-ish as "full".
+	cs := canonizeSID(s)
+	return len(cs) >= 32
+}
+
+func canonizeSID(s string) string {
+	// lower + drop hyphens for prefix comparisons
+	s = strings.ToLower(s)
+	return strings.ReplaceAll(s, "-", "")
+}
+
+func expandByUniquePrefix(short string, known []sidCanon) string {
+	p := canonizeSID(short)
+	if p == "" {
+		return short
+	}
+	matches := make([]string, 0, 2)
+	for _, k := range known {
+		if strings.HasPrefix(k.canon, p) {
+			matches = append(matches, k.raw)
+			if len(matches) > 1 {
+				break
+			}
+		}
+	}
+	if len(matches) == 1 {
+		return matches[0]
+	}
+	// ambiguous or none → leave as-is (will still be visible in diagnostics)
+	return short
+}
diff --git a/pkg/ovnstatus/ovncluster.go b/pkg/ovnstatus/ovncluster.go
new file mode 100644
index 00000000..9cab7d82
--- /dev/null
+++ b/pkg/ovnstatus/ovncluster.go
@@ -0,0 +1,604 @@
+package ovnstatus
+
+import (
+	"fmt"
+	"sort"
+	"strings"
+)
+
+// ---- Public API ------------------------------------------------------------
+
+// MemberView is a normalized membership view (from one member's perspective).
+type MemberView struct {
+	FromSID     string            // the reporter's SID (hs.Local.SID)
+	FromAddress string            // best-effort: address of self from Servers (if present)
+	Members     map[string]string // SID -> Address (as reported by this member)
+}
+
+// ViewDiff is the difference between one view and a chosen "truth" view.
+type ViewDiff struct {
+	MissingSIDs       []string             // SIDs absent in this view but present in truth
+	ExtraSIDs         []string             // SIDs present in this view but absent in truth
+	AddressMismatches map[string][2]string // SID -> [truthAddr, thisAddr] when both have SID but addresses differ
+}
+
+// ConsensusResult summarizes cluster agreement across views.
+type ConsensusResult struct {
+	AllAgree        bool                // true if all views are identical
+	HasMajority     bool                // true if some view is held by >= quorum
+	QuorumSize      int                 // floor(n/2)+1
+	MajorityKey     string              // canonical key of the majority view (if any)
+	MajorityMembers []string            // SIDs of reporters in the majority
+	MinorityMembers []string            // SIDs of reporters not in the majority
+	TruthView       MemberView          // the majority's canonical view (if HasMajority); empty otherwise
+	Diffs           map[string]ViewDiff // per-reporter diffs vs TruthView (only meaningful if HasMajority)
+}
+
+// BuildMemberView extracts a normalized view for one snapshot.
+// It uses hs.Full.Servers as the authoritative list this reporter sees.
+func BuildMemberView(hs HealthSnapshot) MemberView {
+	mv := MemberView{
+		FromSID: hs.Local.SID,
+		Members: make(map[string]string, len(hs.Full.Servers)),
+	}
+
+	// Fill Members map and try to capture self address.
+	for _, s := range hs.Full.Servers {
+		if s.SID == "" || s.Address == "" {
+			continue
+		}
+		mv.Members[s.SID] = s.Address
+		if s.Self {
+			mv.FromAddress = s.Address
+		}
+	}
+	return mv
+}
+
+// AnalyzeConsensus checks agreement across a slice of views for one cluster.
+// It answers:
+//  1. do all views agree exactly?
+//  2. if not, is there a majority agreement?
+//  3. who’s in the minority, and how does each minority view differ?
+func AnalyzeConsensus(views []MemberView) ConsensusResult {
+	n := len(views)
+	cr := ConsensusResult{
+		QuorumSize: (n / 2) + 1,
+		Diffs:      make(map[string]ViewDiff, n),
+	}
+	if n == 0 {
+		return cr
+	}
+
+	// Fingerprint each view's Members map; group reporters by fingerprint key.
+	type group struct {
+		key   string
+		views []MemberView
+	}
+	groupsByKey := map[string]*group{}
+
+	for _, v := range views {
+		key := fingerprintMembers(v.Members)
+		g, ok := groupsByKey[key]
+		if !ok {
+			g = &group{key: key}
+			groupsByKey[key] = g
+		}
+		g.views = append(g.views, v)
+	}
+
+	// If only one unique fingerprint → everyone agrees.
+	if len(groupsByKey) == 1 {
+		for _, g := range groupsByKey {
+			cr.AllAgree = true
+			cr.HasMajority = true
+			cr.MajorityKey = g.key
+			cr.TruthView = g.views[0] // any member in this group shares the same map
+			for _, v := range g.views {
+				cr.MajorityMembers = append(cr.MajorityMembers, v.FromSID)
+				cr.Diffs[v.FromSID] = ViewDiff{} // empty
+			}
+			return cr
+		}
+	}
+
+	// Pick the largest group as a candidate majority.
+	var maxG *group
+	for _, g := range groupsByKey {
+		if maxG == nil || len(g.views) > len(maxG.views) {
+			maxG = g
+		}
+	}
+	if maxG != nil && len(maxG.views) >= cr.QuorumSize {
+		cr.HasMajority = true
+		cr.MajorityKey = maxG.key
+		cr.TruthView = maxG.views[0] // canonical truth view
+		for _, v := range maxG.views {
+			cr.MajorityMembers = append(cr.MajorityMembers, v.FromSID)
+			cr.Diffs[v.FromSID] = ViewDiff{} // empty
+		}
+		// Minority: everyone not in the majority group
+		majoritySet := map[string]struct{}{}
+		for _, v := range maxG.views {
+			majoritySet[v.FromSID] = struct{}{}
+		}
+		for _, v := range views {
+			if _, ok := majoritySet[v.FromSID]; !ok {
+				cr.MinorityMembers = append(cr.MinorityMembers, v.FromSID)
+				cr.Diffs[v.FromSID] = diffViews(cr.TruthView.Members, v.Members)
+			}
+		}
+		return cr
+	}
+
+	// No majority -> pick the largest group as "reference" for diffs (optional).
+	// We'll still fill Diffs vs that reference to aid debugging.
+	if maxG != nil {
+		cr.TruthView = maxG.views[0]
+		for _, v := range views {
+			cr.Diffs[v.FromSID] = diffViews(cr.TruthView.Members, v.Members)
+		}
+		// Populate members lists (no majority)
+		for _, v := range maxG.views {
+			cr.MajorityMembers = append(cr.MajorityMembers, v.FromSID)
+		}
+		for _, v := range views {
+			found := false
+			for _, m := range cr.MajorityMembers {
+				if m == v.FromSID {
+					found = true
+					break
+				}
+			}
+			if !found {
+				cr.MinorityMembers = append(cr.MinorityMembers, v.FromSID)
+			}
+		}
+	}
+	return cr
+}
+
+// ---- Internals -------------------------------------------------------------
+
+func fingerprintMembers(m map[string]string) string {
+	// Produce a stable "SID=Addr" joined string.
+	if len(m) == 0 {
+		return ""
+	}
+	keys := make([]string, 0, len(m))
+	for sid := range m {
+		keys = append(keys, sid)
+	}
+	sort.Strings(keys)
+	parts := make([]string, 0, len(keys))
+	for _, sid := range keys {
+		parts = append(parts, sid+"="+m[sid])
+	}
+	return strings.Join(parts, "|")
+}
+
+func diffViews(truth, other map[string]string) ViewDiff {
+	var d ViewDiff
+	d.AddressMismatches = make(map[string][2]string)
+
+	// Build sets
+	truthKeys := make([]string, 0, len(truth))
+	otherKeys := make([]string, 0, len(other))
+	for k := range truth {
+		truthKeys = append(truthKeys, k)
+	}
+	for k := range other {
+		otherKeys = append(otherKeys, k)
+	}
+	sort.Strings(truthKeys)
+	sort.Strings(otherKeys)
+
+	// Missing & mismatches
+	for _, sid := range truthKeys {
+		tAddr := truth[sid]
+		oAddr, ok := other[sid]
+		if !ok {
+			d.MissingSIDs = append(d.MissingSIDs, sid)
+			continue
+		}
+		if tAddr != oAddr {
+			d.AddressMismatches[sid] = [2]string{tAddr, oAddr}
+		}
+	}
+	// Extra
+	for _, sid := range otherKeys {
+		if _, ok := truth[sid]; !ok {
+			d.ExtraSIDs = append(d.ExtraSIDs, sid)
+		}
+	}
+	return d
+}
+
+// ---- Pretty helpers (optional) --------------------------------------------
+
+func (cr ConsensusResult) String() string {
+	var b strings.Builder
+	fmt.Fprintf(&b, "AllAgree=%v, HasMajority=%v (quorum=%d)\n", cr.AllAgree, cr.HasMajority, cr.QuorumSize)
+	if cr.HasMajority {
+		fmt.Fprintf(&b, "MajorityMembers: %v\n", cr.MajorityMembers)
+		if len(cr.MinorityMembers) > 0 {
+			fmt.Fprintf(&b, "MinorityMembers: %v\n", cr.MinorityMembers)
+		}
+	}
+	for sid, d := range cr.Diffs {
+		if len(d.MissingSIDs) == 0 && len(d.ExtraSIDs) == 0 && len(d.AddressMismatches) == 0 {
+			continue
+		}
+		fmt.Fprintf(&b, "- %s diffs:\n", sid)
+		if len(d.MissingSIDs) > 0 {
+			fmt.Fprintf(&b, "  missing: %v\n", d.MissingSIDs)
+		}
+		if len(d.ExtraSIDs) > 0 {
+			fmt.Fprintf(&b, "  extra:   %v\n", d.ExtraSIDs)
+		}
+		if len(d.AddressMismatches) > 0 {
+			fmt.Fprintf(&b, "  addr mismatches:\n")
+			for k, v := range d.AddressMismatches {
+				fmt.Fprintf(&b, "    %s: truth=%s this=%s\n", k, v[0], v[1])
+			}
+		}
+	}
+	return b.String()
+}
+
+// Hints about the cluster from outside OVN (e.g., Kubernetes).
+type Hints struct {
+	// ExpectedReplicas, if >0, is the intended cluster size; if 0 and ExpectedIPs provided,
+	// we derive ExpectedReplicas = len(ExpectedIPs).
+	ExpectedReplicas int
+
+	// ExpectedIPs is the set of node IPs you expect to participate (unique per member).
+	// Optional label can be a pod/node name for reporting (empty string is fine).
+	ExpectedIPs map[string]string // ip -> label
+}
+
+// ExtendedConsensusResult augments ConsensusResult with IP-centric signals.
+type ExtendedConsensusResult struct {
+	ConsensusResult
+
+	// Union across all views (what anyone reported).
+	UnionMembers []string // SIDs (sorted)
+	UnionIPs     []string // IPs  (sorted)
+
+	// Reporters (SIDs that produced a HealthSnapshot / self-view).
+	Reporters []string // SIDs (sorted)
+
+	// Members that appear in UnionMembers but for which we have no reporter snapshot.
+	MissingReporters []string // SIDs (sorted)
+
+	// IPs seen in union but NOT in hints.ExpectedIPs (if provided).
+	UnexpectedIPs []string // sorted
+
+	// Expected IPs that did NOT appear anywhere in union.
+	MissingExpectedIPs []string // sorted
+
+	// Size checks; MembersCount is distinct SIDs; DistinctIPCount is distinct IPs.
+	MembersCount      int
+	DistinctIPCount   int
+	TooManyMembers    bool // MembersCount > ExpectedReplicas
+	TooFewMembers     bool // MembersCount < ExpectedReplicas
+	ExpectedShortfall int  // ExpectedReplicas - MembersCount (>=0)
+	ExpectedExcess    int  // MembersCount - ExpectedReplicas (>=0)
+
+	// IPConflicts: an IP mapped to multiple SIDs (shouldn’t happen if identity is clean).
+	IPConflicts map[string][]string // ip -> []sids
+
+	// SIDAddressDisagreements: number of distinct addresses observed for a SID.
+	SIDAddressDisagreements map[string]int // sid -> count(address variants)
+
+	// Suspect stale SIDs: candidates to kick (heuristic, IP-focused).
+	// Ranked by: (1) IP not expected, (2) not self-reporting, (3) lowest reference count.
+	SuspectStaleSIDs []string // sorted by suspicion
+}
+
+// AddrToIP extracts the host/IP from strings like:
+//
+//	"tcp:10.0.0.1:6641", "ssl:[192.168.100.12]:6643", "tcp:[fe80::1]:6641"
+func AddrToIP(addr string) string {
+	a := strings.TrimSpace(addr)
+	// Strip scheme prefix
+	if i := strings.Index(a, ":"); i != -1 && (strings.HasPrefix(a, "tcp:") || strings.HasPrefix(a, "ssl:")) {
+		a = a[i+1:]
+	}
+	// If bracketed IPv6: [fe80::1]:6641
+	if strings.HasPrefix(a, "[") {
+		if j := strings.Index(a, "]"); j != -1 {
+			return a[1:j]
+		}
+	}
+	// IPv4 or unbracketed IPv6 with :port → split last colon safely
+	if i := strings.LastIndex(a, ":"); i != -1 {
+		return a[:i]
+	}
+	return a // fallback
+}
+
+func setKeys(m map[string]struct{}) []string {
+	out := make([]string, 0, len(m))
+	for k := range m {
+		out = append(out, k)
+	}
+	sort.Strings(out)
+	return out
+}
+func setDiff(a, b map[string]struct{}) []string {
+	out := []string{}
+	for k := range a {
+		if _, ok := b[k]; !ok {
+			out = append(out, k)
+		}
+	}
+	sort.Strings(out)
+	return out
+}
+
+// AnalyzeConsensusWithIPHints extends AnalyzeConsensus using ExpectedIPs instead of ExpectedSIDs.
+func AnalyzeConsensusWithIPHints(views []MemberView, hints *Hints) ExtendedConsensusResult {
+	base := AnalyzeConsensus(views) // keeps majority/minority, per-view diffs (SID->addr)
+
+	// Build unions and stats
+	unionSID := map[string]struct{}{}
+	unionIP := map[string]struct{}{}
+	reporterSID := map[string]struct{}{}
+	refCountSID := map[string]int{}                     // how many times a SID is referenced across all views
+	addrVariantsSID := map[string]map[string]struct{}{} // SID -> set(address strings)
+	ipToSIDs := map[string]map[string]struct{}{}        // ip -> set(SID)
+
+	for _, v := range views {
+		if v.FromSID != "" {
+			reporterSID[v.FromSID] = struct{}{}
+		}
+		for sid, addr := range v.Members {
+			if sid == "" || addr == "" {
+				continue
+			}
+			unionSID[sid] = struct{}{}
+			refCountSID[sid]++
+			// address canon
+			if _, ok := addrVariantsSID[sid]; !ok {
+				addrVariantsSID[sid] = map[string]struct{}{}
+			}
+			addrVariantsSID[sid][addr] = struct{}{}
+			// IP canon
+			ip := AddrToIP(addr)
+			if ip != "" {
+				unionIP[ip] = struct{}{}
+				if _, ok := ipToSIDs[ip]; !ok {
+					ipToSIDs[ip] = map[string]struct{}{}
+				}
+				ipToSIDs[ip][sid] = struct{}{}
+			}
+		}
+	}
+
+	// Prepare hint set for IPs
+	var expectedIPsSet map[string]struct{}
+	expectedReplicas := 0
+	if hints != nil {
+		if len(hints.ExpectedIPs) > 0 {
+			expectedIPsSet = make(map[string]struct{}, len(hints.ExpectedIPs))
+			for ip := range hints.ExpectedIPs {
+				expectedIPsSet[ip] = struct{}{}
+			}
+			expectedReplicas = len(hints.ExpectedIPs)
+		}
+		if hints.ExpectedReplicas > 0 {
+			expectedReplicas = hints.ExpectedReplicas
+		}
+	}
+
+	unionSIDs := setKeys(unionSID)
+	unionIPs := setKeys(unionIP)
+	reporters := setKeys(reporterSID)
+	missingReporters := setDiff(unionSID, reporterSID) // SIDs seen but no self-view
+
+	// IP-based unexpected / missing vs hints
+	var unexpectedIPs, missingExpectedIPs []string
+	if expectedIPsSet != nil {
+		unexpectedIPs = setDiff(unionIP, expectedIPsSet)
+		missingExpectedIPs = setDiff(expectedIPsSet, unionIP)
+	}
+
+	// Size checks (by SIDs)
+	membersCount := len(unionSID)
+	distinctIPCount := len(unionIP)
+	tooMany, tooFew := false, false
+	shortfall, excess := 0, 0
+	if expectedReplicas > 0 {
+		if membersCount > expectedReplicas {
+			tooMany = true
+			excess = membersCount - expectedReplicas
+		} else if membersCount < expectedReplicas {
+			tooFew = true
+			shortfall = expectedReplicas - membersCount
+		}
+	}
+
+	// IP conflicts: same IP claimed under multiple SIDs
+	ipConflicts := map[string][]string{}
+	for ip, sids := range ipToSIDs {
+		if len(sids) > 1 {
+			ipConflicts[ip] = setKeys(sids)
+		}
+	}
+
+	// SID address disagreements: how many distinct addresses per SID
+	sidAddrDisagree := map[string]int{}
+	for sid, addrs := range addrVariantsSID {
+		sidAddrDisagree[sid] = len(addrs)
+	}
+
+	// --- Suspect stale SIDs -------------------------------------------------
+	//
+	// Only produce suspects when there is evidence of staleness:
+	// - too many members (over expected replicas), or
+	// - unexpected IPs exist, or
+	// - IP conflicts exist.
+	// Then rank by (unexpected IP) > (not self-reporting) > (low reference count)
+	// and trim to the number we actually need to remove (ExpectedExcess).
+	produceSuspects := tooMany || len(unexpectedIPs) > 0 || len(ipConflicts) > 0
+
+	suspectList := []string{}
+	if produceSuspects {
+		suspectScore := map[string]int{}
+		for sid := range unionSID {
+			score := 0
+
+			// Representative IP for this SID (pick lexicographically smallest addr -> ip)
+			var sidIP string
+			if av := addrVariantsSID[sid]; len(av) > 0 {
+				addrs := setKeys(av)
+				sort.Strings(addrs)
+				sidIP = AddrToIP(addrs[0])
+			}
+
+			// Strongest signal: IP not expected
+			if expectedIPsSet != nil && sidIP != "" {
+				if _, ok := expectedIPsSet[sidIP]; !ok {
+					score += 1000
+				}
+			}
+			// Not self-reporting is suspicious (but not fatal by itself)
+			if _, ok := reporterSID[sid]; !ok {
+				score += 100
+			}
+			// Fewer references → a bit more suspicious
+			score += 10 - min(refCountSID[sid], 10)
+
+			suspectScore[sid] = score
+		}
+
+		suspectList = make([]string, 0, len(suspectScore))
+		for sid := range suspectScore {
+			suspectList = append(suspectList, sid)
+		}
+		sort.Slice(suspectList, func(i, j int) bool {
+			if suspectScore[suspectList[i]] != suspectScore[suspectList[j]] {
+				return suspectScore[suspectList[i]] > suspectScore[suspectList[j]]
+			}
+			return suspectList[i] < suspectList[j]
+		})
+
+		// Trim to just what we need to remediate if we’re over capacity.
+		if tooMany && excess > 0 && len(suspectList) > excess {
+			suspectList = suspectList[:excess]
+		}
+	}
+
+	return ExtendedConsensusResult{
+		ConsensusResult:         base,
+		UnionMembers:            unionSIDs,
+		UnionIPs:                unionIPs,
+		Reporters:               reporters,
+		MissingReporters:        missingReporters,
+		UnexpectedIPs:           unexpectedIPs,
+		MissingExpectedIPs:      missingExpectedIPs,
+		MembersCount:            membersCount,
+		DistinctIPCount:         distinctIPCount,
+		TooManyMembers:          tooMany,
+		TooFewMembers:           tooFew,
+		ExpectedShortfall:       shortfall,
+		ExpectedExcess:          excess,
+		IPConflicts:             ipConflicts,
+		SIDAddressDisagreements: sidAddrDisagree,
+		SuspectStaleSIDs:        suspectList,
+	}
+}
+
+func min(a, b int) int {
+	if a < b {
+		return a
+	}
+	return b
+}
+
+// PrettyString renders a human-friendly multi-line summary of ExtendedConsensusResult.
+// It combines consensus status with IP/SID hints.
+func (res ExtendedConsensusResult) PrettyString() string {
+	var b strings.Builder
+
+	fmt.Fprintf(&b, "Consensus summary:\n")
+	fmt.Fprintf(&b, "  AllAgree: %v\n", res.AllAgree)
+	fmt.Fprintf(&b, "  HasMajority: %v (quorum=%d)\n", res.HasMajority, res.QuorumSize)
+	fmt.Fprintf(&b, "  MembersCount: %d (distinct IPs=%d)\n", res.MembersCount, res.DistinctIPCount)
+
+	if res.TooManyMembers {
+		fmt.Fprintf(&b, "  ⚠ Too many members: expected %d, found %d (excess=%d)\n",
+			res.MembersCount-res.ExpectedExcess, res.MembersCount, res.ExpectedExcess)
+	}
+	if res.TooFewMembers {
+		fmt.Fprintf(&b, "  ⚠ Too few members: expected %d, found %d (shortfall=%d)\n",
+			res.MembersCount+res.ExpectedShortfall, res.MembersCount, res.ExpectedShortfall)
+	}
+
+	if len(res.MajorityMembers) > 0 {
+		fmt.Fprintf(&b, "  MajorityMembers (SIDs): %v\n", res.MajorityMembers)
+	}
+	if len(res.MinorityMembers) > 0 {
+		fmt.Fprintf(&b, "  MinorityMembers (SIDs): %v\n", res.MinorityMembers)
+	}
+
+	if len(res.UnionIPs) > 0 {
+		fmt.Fprintf(&b, "  Union IPs: %v\n", res.UnionIPs)
+	}
+	if len(res.Reporters) > 0 {
+		fmt.Fprintf(&b, "  Reporters (self-SIDs): %v\n", res.Reporters)
+	}
+	if len(res.MissingReporters) > 0 {
+		fmt.Fprintf(&b, "  ⚠ MissingReporters (no self-view): %v\n", res.MissingReporters)
+	}
+
+	if len(res.UnexpectedIPs) > 0 {
+		fmt.Fprintf(&b, "  ⚠ UnexpectedIPs: %v\n", res.UnexpectedIPs)
+	}
+	if len(res.MissingExpectedIPs) > 0 {
+		fmt.Fprintf(&b, "  ⚠ MissingExpectedIPs: %v\n", res.MissingExpectedIPs)
+	}
+
+	if len(res.IPConflicts) > 0 {
+		fmt.Fprintf(&b, "  ⚠ IP conflicts:\n")
+		for ip, sids := range res.IPConflicts {
+			fmt.Fprintf(&b, "    %s claimed by %v\n", ip, sids)
+		}
+	}
+
+	if len(res.SIDAddressDisagreements) > 0 {
+		fmt.Fprintf(&b, "  SID address disagreements:\n")
+		for sid, n := range res.SIDAddressDisagreements {
+			if n > 1 {
+				fmt.Fprintf(&b, "    %s has %d distinct addresses\n", sid, n)
+			}
+		}
+	}
+
+	if len(res.SuspectStaleSIDs) > 0 {
+		fmt.Fprintf(&b, "  ⚠ SuspectStaleSIDs (ranked): %v\n", res.SuspectStaleSIDs)
+	}
+
+	// Per-reporter diffs vs truth
+	if len(res.Diffs) > 0 && res.HasMajority {
+		fmt.Fprintf(&b, "  Diffs vs truth view:\n")
+		for sid, d := range res.Diffs {
+			if len(d.MissingSIDs) == 0 && len(d.ExtraSIDs) == 0 && len(d.AddressMismatches) == 0 {
+				continue
+			}
+			fmt.Fprintf(&b, "    %s:\n", sid)
+			if len(d.MissingSIDs) > 0 {
+				fmt.Fprintf(&b, "      missing SIDs: %v\n", d.MissingSIDs)
+			}
+			if len(d.ExtraSIDs) > 0 {
+				fmt.Fprintf(&b, "      extra SIDs:   %v\n", d.ExtraSIDs)
+			}
+			for k, v := range d.AddressMismatches {
+				fmt.Fprintf(&b, "      addr mismatch for %s: truth=%s this=%s\n", k, v[0], v[1])
+			}
+		}
+	}
+
+	return b.String()
+}
diff --git a/pkg/ovnstatus/ovnstatus.go b/pkg/ovnstatus/ovnstatus.go
new file mode 100644
index 00000000..b5fdf0e0
--- /dev/null
+++ b/pkg/ovnstatus/ovnstatus.go
@@ -0,0 +1,458 @@
+// Package ovnstatus provides an OVNClient that returns structured NB/SB health.
+// It prefers JSON outputs and falls back to minimal text parsing for "Servers".
+package ovnstatus
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"os/exec"
+	"regexp"
+	"strconv"
+	"strings"
+	"time"
+	"unicode"
+	"unicode/utf8"
+)
+
+/************** Public API **************/
+
+// DB is the logical DB name in ovsdb-server.
+type DB string
+
+const (
+	DBNorthbound DB = "OVN_Northbound"
+	DBSouthbound DB = "OVN_Southbound"
+)
+
+// RunnerFunc allows dependency-injecting the command runner.
+type RunnerFunc func(ctx context.Context, bin string, args ...string) (string, error)
+
+// OVNClient holds config + runner and exposes health methods.
+type OVNClient struct {
+	// Paths to local control sockets.
+	NBCTLPath string // e.g., /var/run/ovn/ovnnb_db.ctl
+	SBCTLPath string // e.g., /var/run/ovn/ovnsb_db.ctl
+	NBDBSock  string // tcp:127.0.0.1:6641, unix:/var/run/ovn/ovnnb_db.sock, etc
+	SBDBSock  string // tcp:127.0.0.1:6642, unix:/var/run/ovn/ovnsb_db.sock, etc
+
+	// TLS for ovsdb-client (used for _Server queries). ovn-appctl uses ctl socket, no TLS needed.
+	UseSSL bool
+	Key    string
+	Cert   string
+	CACert string
+
+	FreshLastMsgThreshold time.Duration
+	// Optional expected replica count for stale-member checks.
+	ExpectedReplicas int
+
+	// Runner is the pluggable command runner. If nil, a default runner is used.
+	Runner RunnerFunc
+}
+
+func (o *OVNClient) ApplyDefaults() {
+	if o.NBCTLPath == "" {
+		o.NBCTLPath = "/var/run/ovn/ovnnb_db.ctl"
+	}
+	if o.SBCTLPath == "" {
+		o.SBCTLPath = "/var/run/ovn/ovnsb_db.ctl"
+	}
+	if o.NBDBSock == "" {
+		o.NBDBSock = "unix:/var/run/ovn/ovnnb_db.sock"
+	}
+	if o.SBDBSock == "" {
+		o.SBDBSock = "unix:/var/run/ovn/ovnsb_db.sock"
+	}
+	if o.ExpectedReplicas == 0 {
+		o.ExpectedReplicas = 3
+	}
+	if o.FreshLastMsgThreshold == 0 {
+		o.FreshLastMsgThreshold = 10 * time.Second
+	}
+}
+
+// ServerLocalView is what the local ovsdb-server reports via _Server.Database.
+type ServerLocalView struct {
+	Leader    bool   `json:"leader"`
+	Connected bool   `json:"connected"`
+	CID       string `json:"cid"` // cluster UUID
+	SID       string `json:"sid"` // this server UUID
+	Index     int64  `json:"index"`
+}
+
+// ClusterStatus is a structured view of cluster/status.
+type ClusterStatus struct {
+	Name      string          `json:"name,omitempty"`
+	Role      string          `json:"role,omitempty"` // leader/follower (local)
+	Term      int64           `json:"term,omitempty"`
+	Index     int64           `json:"index,omitempty"`
+	Connected bool            `json:"connected,omitempty"`
+	Servers   []ClusterServer `json:"servers,omitempty"`
+}
+
+// ClusterServer is an entry in the Servers list.
+type ClusterServer struct {
+	SID        string `json:"sid,omitempty"`
+	Address    string `json:"address,omitempty"`
+	Role       string `json:"role,omitempty"`
+	Self       bool   `json:"self,omitempty"`
+	Connected  bool   `json:"connected,omitempty"`
+	LastMsgMs  *int64 `json:"lastMsgMs,omitempty"`
+	NextIndex  *int64 `json:"nextIndex,omitempty"`  // NEW
+	MatchIndex *int64 `json:"matchIndex,omitempty"` // NEW
+}
+
+// HealthSnapshot bundles both sources for easy checks.
+type HealthSnapshot struct {
+	DB    DB
+	Local ServerLocalView
+	Full  ClusterStatus
+}
+
+// StaleMemberCount returns how many configured servers exceed the expected replica count.
+func (hs HealthSnapshot) StaleMemberCount(expectedReplicas int) int {
+	n := len(hs.Full.Servers)
+	if n <= expectedReplicas {
+		return 0
+	}
+	return n - expectedReplicas
+}
+
+// HasQuorum returns whether the local server believes it has a majority.
+func (hs HealthSnapshot) HasQuorum() bool { return hs.Local.Connected }
+
+// IsLeader reports local leadership (per-DB).
+func (hs HealthSnapshot) IsLeader() bool { return hs.Local.Leader }
+
+// HealthNB returns a health snapshot for OVN_Northbound.
+func (c *OVNClient) HealthNB(ctx context.Context) (HealthSnapshot, error) {
+	return c.health(ctx, DBNorthbound, c.NBCTLPath)
+}
+
+// HealthSB returns a health snapshot for OVN_Southbound.
+func (c *OVNClient) HealthSB(ctx context.Context) (HealthSnapshot, error) {
+	return c.health(ctx, DBSouthbound, c.SBCTLPath)
+}
+
+// HealthBoth returns snapshots for both NB and SB.
+func (c *OVNClient) HealthBoth(ctx context.Context) (nb HealthSnapshot, sb HealthSnapshot, err1, err2 error) {
+	nb, err1 = c.HealthNB(ctx)
+	sb, err2 = c.HealthSB(ctx)
+	return nb, sb, err1, err2
+}
+
+/************** Implementation **************/
+
+func (c *OVNClient) health(ctx context.Context, db DB, ctlPath string) (HealthSnapshot, error) {
+	if ctlPath == "" {
+		return HealthSnapshot{}, fmt.Errorf("missing ctlPath for %s", db)
+	}
+	local, err := c.getLocalServerView(ctx, db)
+	if err != nil {
+		return HealthSnapshot{}, err
+	}
+	full, err := c.getClusterStatus(ctx, db, ctlPath)
+	if err != nil {
+		// Return at least the local view.
+		return HealthSnapshot{DB: db, Local: local}, err
+	}
+	// Optional cosmetic: sort Servers for stable output (self first, then by SID).
+	/*
+		sort.SliceStable(full.Servers, func(i, j int) bool {
+			if full.Servers[i].Self != full.Servers[j].Self {
+				return full.Servers[i].Self
+			}
+			return full.Servers[i].SID < full.Servers[j].SID
+		})
+	*/
+	return HealthSnapshot{DB: db, Local: local, Full: full}, nil
+}
+
+type ovsdbQueryResp struct {
+	Rows []struct {
+		Leader    bool     `json:"leader"`
+		Connected bool     `json:"connected"`
+		CID       []string `json:"cid"`
+		SID       []string `json:"sid"`
+		Index     int64    `json:"index"`
+	} `json:"rows"`
+}
+
+func (c *OVNClient) getLocalServerView(ctx context.Context, db DB) (ServerLocalView, error) {
+	addr := ""
+	switch db {
+	case DBNorthbound:
+		addr = c.NBDBSock
+	case DBSouthbound:
+		addr = c.SBDBSock
+	default:
+		return ServerLocalView{}, fmt.Errorf("unexpected value %s for ovn db, expected values %s, %s", db, DBNorthbound, DBSouthbound)
+	}
+
+	query := fmt.Sprintf(
+		`["_Server",{"op":"select","table":"Database","where":[["name","==","%s"]],"columns":["leader","connected","cid","sid","index"]}]`,
+		db,
+	)
+
+	args := []string{"query", addr, query}
+	if c.UseSSL {
+		args = []string{
+			"-p", c.Key, "-c", c.Cert, "-C", c.CACert,
+			"query", addr, query,
+		}
+	}
+
+	out, err := c.run(ctx, "ovsdb-client", args...)
+	if err != nil {
+		return ServerLocalView{}, fmt.Errorf("ovsdb-client query failed: %w (out: %s)", err, out)
+	}
+
+	var resp []ovsdbQueryResp
+	if err := json.Unmarshal([]byte(out), &resp); err != nil {
+		return ServerLocalView{}, fmt.Errorf("parse _Server.Database JSON: %w", err)
+	}
+	if len(resp) == 0 || len(resp[0].Rows) == 0 {
+		return ServerLocalView{}, errors.New("empty _Server.Database response")
+	}
+	row := resp[0].Rows[0]
+	uuidOf := func(arr []string) (string, bool) {
+		if len(arr) == 2 && arr[0] == "uuid" && arr[1] != "" {
+			return arr[1], true
+		}
+		return "", false
+	}
+	cid, okCID := uuidOf(row.CID)
+	sid, okSID := uuidOf(row.SID)
+	if !okCID || !okSID {
+		return ServerLocalView{}, fmt.Errorf("unexpected _Server.Database uuid encoding: cid=%v sid=%v", row.CID, row.SID)
+	}
+	return ServerLocalView{
+		Leader:    row.Leader,
+		Connected: row.Connected,
+		CID:       cid,
+		SID:       sid,
+		Index:     row.Index,
+	}, nil
+}
+
+func (c *OVNClient) getClusterStatus(ctx context.Context, db DB, ctlPath string) (ClusterStatus, error) {
+	out, err := c.run(ctx, "ovn-appctl", "-t", ctlPath, "cluster/status", string(db))
+	if err != nil {
+		return ClusterStatus{}, fmt.Errorf("cluster/status failed: %w (out: %s)", err, out)
+	}
+	return parseServersFromTextWithThreshold(out, c.FreshLastMsgThreshold), nil
+}
+
+func (c *OVNClient) run(ctx context.Context, bin string, args ...string) (string, error) {
+	runner := c.Runner
+	if runner == nil {
+		runner = defaultRunner
+	}
+	return runner(ctx, bin, args...)
+}
+
+/************** Default runner **************/
+
+func defaultRunner(ctx context.Context, bin string, args ...string) (string, error) {
+	// Reasonable default timeout; caller can supply a context with its own deadline.
+	if _, ok := ctx.Deadline(); !ok {
+		var cancel context.CancelFunc
+		ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
+		defer cancel()
+	}
+	cmd := exec.CommandContext(ctx, bin, args...)
+	var stdout, stderr bytes.Buffer
+	cmd.Stdout = &stdout
+	cmd.Stderr = &stderr
+	err := cmd.Run()
+	out := strings.TrimSpace(stdout.String())
+	if err != nil {
+		if out == "" {
+			out = strings.TrimSpace(stderr.String())
+		}
+		return out, err
+	}
+	return out, nil
+}
+
+/************** Helpers **************/
+
+func parseClusterStatusJSON(out string) (ClusterStatus, bool) {
+	var cs ClusterStatus
+	if json.Unmarshal([]byte(out), &cs) == nil && len(cs.Servers) > 0 {
+		return cs, true
+	}
+	var wrap struct {
+		Data ClusterStatus `json:"data"`
+	}
+	if json.Unmarshal([]byte(out), &wrap) == nil && len(wrap.Data.Servers) > 0 {
+		return wrap.Data, true
+	}
+	return ClusterStatus{}, false
+}
+
+func portOf(db DB) string {
+	switch db {
+	case DBNorthbound:
+		return "6641"
+	case DBSouthbound:
+		return "6642"
+	default:
+		return "0"
+	}
+}
+
+/************** Minimal text fallback for "Servers" **************/
+
+// Accepts variants like:
+//
+//	Servers:
+//	  77f0 (self) at tcp:10.0.0.1:6641 (leader)
+//	  9a3b at tcp:10.0.0.2:6641 (follower)
+//	  1c2d at ssl:10.0.0.3:6641 (backup)
+//	  4e5f at tcp:10.0.0.4:6641 (disconnected)
+var (
+	reServersHeader = regexp.MustCompile(`(?m)^\s*Servers:\s*$`)
+	reServerModern  = regexp.MustCompile(`^\s*([0-9a-fA-F-]+)\s*(\((?:self)\))?\s*at\s*([^\s]+)\s*\(([^)]+)\)`)
+	reServerLegacy  = regexp.MustCompile(
+		`^\s*` +
+			`([0-9a-fA-F-]+)\s*` + // 1: primary SID
+			`\(\s*([0-9a-fA-F-]+)\s+at\s+([^)]+)\)\s*` + // 2: inner SID, 3: address (may include [ip]:port)
+			`(?:\((self)\)\s*)?` + // 4: optional "self"
+			`(?:next_index=(\d+)\s+match_index=(\d+)\s*)?` + // 5: next_index, 6: match_index
+			`(?:last msg\s+(\d+)\s+ms\s+ago)?\s*$`, // 7: last msg ms
+	)
+)
+
+func parseServersFromTextWithThreshold(text string, freshThreshold time.Duration) ClusterStatus {
+	if freshThreshold <= 0 {
+		freshThreshold = 10 * time.Second
+	}
+	freshMs := int64(freshThreshold / time.Millisecond)
+
+	cs := ClusterStatus{}
+	section := extractServersBlock(text)
+	for _, ln := range strings.Split(section, "\n") {
+		ln = strings.TrimRight(ln, "\r")
+		if ln == "" {
+			continue
+		}
+
+		// 1) Modern format
+		if m := reServerModern.FindStringSubmatch(ln); len(m) > 0 {
+			role := strings.ToLower(strings.TrimSpace(m[4]))
+			cs.Servers = append(cs.Servers, ClusterServer{
+				SID:       m[1],
+				Self:      strings.Contains(m[2], "self"),
+				Address:   strings.TrimSpace(m[3]),
+				Role:      role,
+				Connected: !strings.Contains(role, "disconn"),
+			})
+			continue
+		}
+
+		// 2) Legacy format (with optional indices and last-msg)
+		if m := reServerLegacy.FindStringSubmatch(ln); len(m) > 0 {
+			var (
+				nextIdxPtr, matchIdxPtr, lastMsgPtr *int64
+			)
+			if m[5] != "" {
+				if v, err := strconv.ParseInt(m[5], 10, 64); err == nil {
+					nextIdxPtr = &v
+				}
+			}
+			if m[6] != "" {
+				if v, err := strconv.ParseInt(m[6], 10, 64); err == nil {
+					matchIdxPtr = &v
+				}
+			}
+			if m[7] != "" {
+				if v, err := strconv.ParseInt(m[7], 10, 64); err == nil {
+					lastMsgPtr = &v
+				}
+			}
+
+			s := ClusterServer{
+				SID:        m[1],
+				Self:       m[4] == "self",
+				Address:    strings.TrimSpace(m[3]),
+				NextIndex:  nextIdxPtr,
+				MatchIndex: matchIdxPtr,
+				LastMsgMs:  lastMsgPtr,
+				// Role unknown in this legacy format; leave empty.
+			}
+
+			// Connected heuristic:
+			switch {
+			case lastMsgPtr != nil:
+				s.Connected = *lastMsgPtr <= freshMs
+			case s.Self:
+				s.Connected = true
+			case nextIdxPtr != nil || matchIdxPtr != nil:
+				// Seeing replication indices implies active exchange recently.
+				s.Connected = true
+			default:
+				s.Connected = false
+			}
+
+			cs.Servers = append(cs.Servers, s)
+			continue
+		}
+
+		// Unknown line → ignore
+	}
+	return cs
+}
+
+func extractServersBlock(text string) string {
+	idx := reServersHeader.FindStringIndex(text)
+	if idx == nil {
+		return ""
+	}
+	rest := text[idx[1]:]
+
+	var b strings.Builder
+	lines := strings.Split(rest, "\n")
+	sawAny := false
+
+	for _, ln := range lines {
+		// Normalize line endings and look at indentation
+		ln = strings.TrimRight(ln, "\r") // handle CRLF
+		trimmed := strings.TrimSpace(ln)
+
+		// Blank line terminates the section *after* we've started collecting
+		if trimmed == "" {
+			if sawAny {
+				break
+			}
+			continue
+		}
+
+		// Does the line belong to the Servers block?
+		if startsWithUnicodeSpace(ln) || strings.HasPrefix(strings.TrimLeftFunc(ln, unicode.IsSpace), "-") {
+			b.WriteString(ln)
+			b.WriteByte('\n')
+			sawAny = true
+			continue
+		}
+
+		// First non-indented, non-blank line after we've started → end of block.
+		if sawAny {
+			break
+		}
+		// If we haven't started yet and this line isn't indented, keep scanning
+		// (defensive; normally the very next line after "Servers:" is indented).
+	}
+
+	return b.String()
+}
+
+func startsWithUnicodeSpace(s string) bool {
+	if s == "" {
+		return false
+	}
+	r, _ := utf8.DecodeRuneInString(s)
+	return unicode.IsSpace(r) // catches ' ', '\t', '\r', etc.
+}
diff --git a/pkg/ovnstatus/ovnstatus_test.go b/pkg/ovnstatus/ovnstatus_test.go
new file mode 100644
index 00000000..8dde66b3
--- /dev/null
+++ b/pkg/ovnstatus/ovnstatus_test.go
@@ -0,0 +1,40 @@
+package ovnstatus
+
+import (
+	"fmt"
+	"testing"
+	"time"
+)
+
+var testStdout = `` +
+	`Last Election started 259684608 ms ago, reason: leadership_transfer
+Last Election won: 259684604 ms ago
+Election timer: 5000
+Log: [20946, 20968]
+Entries not yet committed: 0
+Entries not yet applied: 0
+Connections: ->7bdb ->b007 <-7bdb <-b007
+Disconnections: 34130
+Servers:
+    e40d (e40d at ssl:[192.168.100.12]:6643) (self)
+    7bdb (7bdb at ssl:[192.168.100.11]:6643) last msg 425139 ms ago
+    b007 (b007 at ssl:[192.168.100.14]:6643) last msg 817 ms ago
+`
+var expectedServersBlock = `` +
+	`    e40d (e40d at ssl:[192.168.100.12]:6643) (self)
+    7bdb (7bdb at ssl:[192.168.100.11]:6643) last msg 425139 ms ago
+    b007 (b007 at ssl:[192.168.100.14]:6643) last msg 817 ms ago
+`
+
+func TestExtractServersBlock(t *testing.T) {
+	if actual := extractServersBlock(testStdout); actual != expectedServersBlock {
+		fmt.Println([]byte(actual))
+		fmt.Println([]byte(expectedServersBlock))
+		t.Errorf("error extracting servers block from following string:\n%s\nexpected:\n%s\ngot:\n%s\n", testStdout, expectedServersBlock, actual)
+	}
+}
+
+func TestParseServersBlock(t *testing.T) {
+	cs := parseServersFromTextWithThreshold(testStdout, 10*time.Second)
+	fmt.Printf("%+v\n", cs)
+}