[kubeovn] Implement the KubeOVN plunger

This patch implements external monitoring of the Kube-OVN cluster. A new
reconciler timed to run its reconcile loop at a fixed interval execs
into the ovn-central pods and collects their cluster info. If the
members' opinions about the cluster disagree, an alert is raised. Other
issues with the distributed consensus are also highlighted.

```release-note
[kubeovn,cozystack-controller] Implement the KubeOVN plunger, an
external monitoring agent for the ovn-central cluster.
```

Signed-off-by: Timofei Larkin <lllamnyp@gmail.com>
This commit is contained in:
Timofei Larkin
2025-09-03 09:55:31 +03:00
parent c0d5e52e65
commit 382a9787f4
25 changed files with 2729 additions and 0 deletions

View File

@@ -0,0 +1,280 @@
package kubeovnplunger
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"time"
"github.com/cozystack/cozystack/internal/sse"
"github.com/cozystack/cozystack/pkg/ovnstatus"
"github.com/prometheus/client_golang/prometheus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var (
srv *sse.Server
)
const (
rescanInterval = 1 * time.Minute
)
// KubeOVNPlunger watches the ovn-central cluster members
type KubeOVNPlunger struct {
client.Client
Scheme *runtime.Scheme
ClientSet kubernetes.Interface
REST *rest.Config
Registry prometheus.Registerer
metrics metrics
lastLeader map[string]string
seenCIDs map[string]map[string]struct{}
}
// Reconcile runs the checks on the ovn-central members to see if their views of the cluster are consistent
func (r *KubeOVNPlunger) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
deploy := &appsv1.Deployment{}
if err := r.Get(ctx, req.NamespacedName, deploy); err != nil {
return ctrl.Result{}, err
}
iphints := map[string]string{}
for _, env := range deploy.Spec.Template.Spec.Containers[0].Env {
if env.Name != "NODE_IPS" {
continue
}
for _, ip := range strings.Split(env.Value, ",") {
iphints[ip] = ""
}
break
}
if len(iphints) == 0 {
l.Info("WARNING: running without IP hints, some error conditions cannot be detected")
}
pods := &corev1.PodList{}
if err := r.List(ctx, pods, client.InNamespace(req.Namespace), client.MatchingLabels(map[string]string{"app": req.Name})); err != nil {
return ctrl.Result{}, fmt.Errorf("list ovn-central pods: %w", err)
}
nbmv := make([]ovnstatus.MemberView, 0, len(pods.Items))
sbmv := make([]ovnstatus.MemberView, 0, len(pods.Items))
nbSnaps := make([]ovnstatus.HealthSnapshot, 0, len(pods.Items))
sbSnaps := make([]ovnstatus.HealthSnapshot, 0, len(pods.Items))
// TODO: get real iphints
for i := range pods.Items {
o := ovnstatus.OVNClient{}
o.ApplyDefaults()
o.Runner = func(ctx context.Context, bin string, args ...string) (string, error) {
cmd := append([]string{bin}, args...)
eo := ExecOptions{
Namespace: req.Namespace,
Pod: pods.Items[i].Name,
Container: pods.Items[i].Spec.Containers[0].Name,
Command: cmd,
}
res, err := r.ExecPod(ctx, eo)
if err != nil {
return "", err
}
return res.Stdout, nil
}
nb, sb, err1, err2 := o.HealthBoth(ctx)
if err1 != nil || err2 != nil {
l.Error(fmt.Errorf("health check failed: nb=%w, sb=%w", err1, err2), "pod", pods.Items[i].Name)
continue
}
nbSnaps = append(nbSnaps, nb)
sbSnaps = append(sbSnaps, sb)
nbmv = append(nbmv, ovnstatus.BuildMemberView(nb))
sbmv = append(sbmv, ovnstatus.BuildMemberView(sb))
}
r.recordAndPruneCIDs("nb", cidFromSnaps(nbSnaps))
r.recordAndPruneCIDs("sb", cidFromSnaps(sbSnaps))
nbmv = ovnstatus.NormalizeViews(nbmv)
sbmv = ovnstatus.NormalizeViews(sbmv)
nbecv := ovnstatus.AnalyzeConsensusWithIPHints(nbmv, &ovnstatus.Hints{ExpectedIPs: iphints})
sbecv := ovnstatus.AnalyzeConsensusWithIPHints(sbmv, &ovnstatus.Hints{ExpectedIPs: iphints})
expected := len(iphints)
r.WriteClusterMetrics("nb", nbSnaps, nbecv, expected)
r.WriteClusterMetrics("sb", sbSnaps, sbecv, expected)
r.WriteMemberMetrics("nb", nbSnaps, nbmv, nbecv)
r.WriteMemberMetrics("sb", sbSnaps, sbmv, sbecv)
srv.Publish(nbecv.PrettyString() + sbecv.PrettyString())
return ctrl.Result{}, nil
}
// SetupWithManager attaches a generic ticker to trigger a reconcile every <interval> seconds
func (r *KubeOVNPlunger) SetupWithManager(mgr ctrl.Manager, kubeOVNNamespace, appName string) error {
r.REST = rest.CopyConfig(mgr.GetConfig())
cs, err := kubernetes.NewForConfig(r.REST)
if err != nil {
return fmt.Errorf("build clientset: %w", err)
}
r.ClientSet = cs
ch := make(chan event.GenericEvent, 10)
mapFunc := func(context.Context, client.Object) []reconcile.Request {
return []reconcile.Request{{
NamespacedName: types.NamespacedName{Namespace: kubeOVNNamespace, Name: appName},
}}
}
mapper := handler.EnqueueRequestsFromMapFunc(mapFunc)
srv = sse.New(sse.Options{
Addr: ":18080",
AllowCORS: true,
})
r.initMetrics()
r.lastLeader = make(map[string]string)
r.seenCIDs = map[string]map[string]struct{}{"nb": {}, "sb": {}}
if err := ctrl.NewControllerManagedBy(mgr).
Named("kubeovnplunger").
WatchesRawSource(source.Channel(ch, mapper)).
Complete(r); err != nil {
return err
}
_ = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
go srv.ListenAndServe()
<-ctx.Done()
_ = srv.Shutdown(context.Background())
return nil
}))
return mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
ticker := time.NewTicker(rescanInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
ch <- event.GenericEvent{
Object: &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Namespace: kubeOVNNamespace,
Name: appName,
},
},
}
}
}
}))
}
type ExecOptions struct {
Namespace string
Pod string
Container string
Command []string // e.g. []string{"sh", "-c", "echo hi"}
Stdin io.Reader // optional
TTY bool // if true, stderr is merged into stdout
Timeout time.Duration // optional overall timeout
}
type ExecResult struct {
Stdout string
Stderr string
ExitCode *int // nil if not determinable
}
// ExecPod runs a command in a pod and returns stdout/stderr/exit code.
func (r *KubeOVNPlunger) ExecPod(ctx context.Context, opts ExecOptions) (*ExecResult, error) {
if opts.Namespace == "" || opts.Pod == "" || opts.Container == "" {
return nil, fmt.Errorf("namespace, pod, and container are required")
}
req := r.ClientSet.CoreV1().RESTClient().
Post().
Resource("pods").
Namespace(opts.Namespace).
Name(opts.Pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: opts.Container,
Command: opts.Command,
Stdin: opts.Stdin != nil,
Stdout: true,
Stderr: !opts.TTY,
TTY: opts.TTY,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(r.REST, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("spdy executor: %w", err)
}
var stdout, stderr bytes.Buffer
streamCtx := ctx
if opts.Timeout > 0 {
var cancel context.CancelFunc
streamCtx, cancel = context.WithTimeout(ctx, opts.Timeout)
defer cancel()
}
streamErr := exec.StreamWithContext(streamCtx, remotecommand.StreamOptions{
Stdin: opts.Stdin,
Stdout: &stdout,
Stderr: &stderr,
Tty: opts.TTY,
})
res := &ExecResult{Stdout: stdout.String(), Stderr: stderr.String()}
if streamErr != nil {
// Try to surface exit code instead of treating all failures as transport errors
type exitCoder interface{ ExitStatus() int }
if ec, ok := streamErr.(exitCoder); ok {
code := ec.ExitStatus()
res.ExitCode = &code
return res, nil
}
return res, fmt.Errorf("exec stream: %w", streamErr)
}
zero := 0
res.ExitCode = &zero
return res, nil
}
func (r *KubeOVNPlunger) recordAndPruneCIDs(db, currentCID string) {
// Mark current as seen
if r.seenCIDs[db] == nil {
r.seenCIDs[db] = map[string]struct{}{}
}
if currentCID != "" {
r.seenCIDs[db][currentCID] = struct{}{}
}
// Build a set of "still active" CIDs this cycle (could be none if you failed to collect)
active := map[string]struct{}{}
if currentCID != "" {
active[currentCID] = struct{}{}
}
// Any seen CID that isn't active now is stale -> delete all its series
for cid := range r.seenCIDs[db] {
if _, ok := active[cid]; ok {
continue
}
r.deleteAllFor(db, cid)
delete(r.seenCIDs[db], cid)
}
}

View File

@@ -0,0 +1,34 @@
package kubeovnplunger
import (
"context"
"testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
var testPlunger *KubeOVNPlunger
func init() {
scheme := runtime.NewScheme()
cfg := config.GetConfigOrDie()
c, _ := client.New(cfg, client.Options{})
cs, _ := kubernetes.NewForConfig(cfg)
testPlunger = &KubeOVNPlunger{
Client: c,
Scheme: scheme,
ClientSet: cs,
REST: cfg,
}
}
func TestPlungerGetsStatuses(t *testing.T) {
_, err := testPlunger.Reconcile(context.Background(), ctrl.Request{})
if err != nil {
t.Errorf("error should be nil but it's %s", err)
}
}

View File

@@ -0,0 +1,423 @@
package kubeovnplunger
import (
"time"
"github.com/cozystack/cozystack/pkg/ovnstatus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type metrics struct {
// --- Core cluster health (per DB/cid) ---
clusterQuorum *prometheus.GaugeVec // 1/0
allAgree *prometheus.GaugeVec // 1/0
membersExpected *prometheus.GaugeVec
membersObserved *prometheus.GaugeVec
ipsExpected *prometheus.GaugeVec
ipsObserved *prometheus.GaugeVec
excessMembers *prometheus.GaugeVec
missingMembers *prometheus.GaugeVec
unexpectedIPsCount *prometheus.GaugeVec
missingExpectedIPsCount *prometheus.GaugeVec
ipConflictsCount *prometheus.GaugeVec
sidAddrDisagreements *prometheus.GaugeVec
// --- Consensus summary (per DB/cid) ---
consensusMajoritySize *prometheus.GaugeVec
consensusMinoritySize *prometheus.GaugeVec
consensusDiffsTotal *prometheus.GaugeVec
// --- Detail exports (sparse, keyed by IP/SID) ---
unexpectedIPGauge *prometheus.GaugeVec // {db,cid,ip} -> 1
missingExpectedIPGauge *prometheus.GaugeVec // {db,cid,ip} -> 1
ipConflictGauge *prometheus.GaugeVec // {db,cid,ip} -> count(sids)
suspectStaleGauge *prometheus.GaugeVec // {db,cid,sid} -> 1
// --- Per-member liveness/freshness (per DB/cid/sid[/ip]) ---
memberConnected *prometheus.GaugeVec // {db,cid,sid,ip}
memberLeader *prometheus.GaugeVec // {db,cid,sid}
memberLastMsgMs *prometheus.GaugeVec // {db,cid,sid}
memberIndex *prometheus.GaugeVec // {db,cid,sid}
memberIndexGap *prometheus.GaugeVec // {db,cid,sid}
memberReporter *prometheus.GaugeVec // {db,cid,sid}
memberMissingReporter *prometheus.GaugeVec // {db,cid,sid}
// --- Ops/housekeeping ---
leaderTransitionsTotal *prometheus.CounterVec // {db,cid}
collectErrorsTotal *prometheus.CounterVec // {db,cid}
publishEventsTotal *prometheus.CounterVec // {db,cid}
snapshotTimestampSec *prometheus.GaugeVec // {db,cid}
}
func (r *KubeOVNPlunger) initMetrics() {
p := promauto.With(r.Registry)
ns := "ovn"
// --- Core cluster health ---
r.metrics.clusterQuorum = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "quorum",
Help: "1 if cluster has quorum, else 0",
}, []string{"db", "cid"})
r.metrics.allAgree = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "all_agree",
Help: "1 if all members report identical membership",
}, []string{"db", "cid"})
r.metrics.membersExpected = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "members_expected",
Help: "Expected cluster size (replicas)",
}, []string{"db", "cid"})
r.metrics.membersObserved = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "members_observed",
Help: "Observed members (distinct SIDs across views)",
}, []string{"db", "cid"})
r.metrics.ipsExpected = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "ips_expected",
Help: "Expected distinct member IPs (from k8s hints)",
}, []string{"db", "cid"})
r.metrics.ipsObserved = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "ips_observed",
Help: "Observed distinct member IPs (from OVN views)",
}, []string{"db", "cid"})
r.metrics.excessMembers = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "excess_members",
Help: "Members over expected (>=0)",
}, []string{"db", "cid"})
r.metrics.missingMembers = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "missing_members",
Help: "Members short of expected (>=0)",
}, []string{"db", "cid"})
r.metrics.unexpectedIPsCount = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "unexpected_ips",
Help: "Count of IPs in OVN not present in k8s expected set",
}, []string{"db", "cid"})
r.metrics.missingExpectedIPsCount = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "missing_expected_ips",
Help: "Count of expected IPs not found in OVN",
}, []string{"db", "cid"})
r.metrics.ipConflictsCount = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "ip_conflicts",
Help: "Number of IPs claimed by multiple SIDs",
}, []string{"db", "cid"})
r.metrics.sidAddrDisagreements = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "cluster", Name: "sid_address_disagreements",
Help: "Number of SIDs seen with >1 distinct addresses",
}, []string{"db", "cid"})
// --- Consensus summary ---
r.metrics.consensusMajoritySize = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "majority_size",
Help: "Majority group size (0 if none)",
}, []string{"db", "cid"})
r.metrics.consensusMinoritySize = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "minority_size",
Help: "Minority group size",
}, []string{"db", "cid"})
r.metrics.consensusDiffsTotal = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "diffs_total",
Help: "Total per-reporter differences vs truth (missing + extra + mismatches)",
}, []string{"db", "cid"})
// --- Detail exports (sparse) ---
r.metrics.unexpectedIPGauge = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "unexpected_ip",
Help: "Unexpected IP present in OVN; value fixed at 1",
}, []string{"db", "cid", "ip"})
r.metrics.missingExpectedIPGauge = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "missing_expected_ip",
Help: "Expected IP missing from OVN; value fixed at 1",
}, []string{"db", "cid", "ip"})
r.metrics.ipConflictGauge = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "ip_conflict",
Help: "Number of SIDs claiming the same IP for this key",
}, []string{"db", "cid", "ip"})
r.metrics.suspectStaleGauge = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "consensus", Name: "suspect_stale",
Help: "Suspected stale SID candidate for kick; value fixed at 1 (emit only when remediation is warranted)",
}, []string{"db", "cid", "sid"})
// --- Per-member liveness/freshness ---
r.metrics.memberConnected = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "connected",
Help: "1 if local server reports connected/quorum, else 0",
}, []string{"db", "cid", "sid", "ip"})
r.metrics.memberLeader = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "leader",
Help: "1 if this member is leader, else 0",
}, []string{"db", "cid", "sid"})
r.metrics.memberLastMsgMs = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "last_msg_ms",
Help: "Follower->leader 'last msg' age in ms (legacy heuristic). NaN/omit if unknown",
}, []string{"db", "cid", "sid"})
r.metrics.memberIndex = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "index",
Help: "Local Raft log index",
}, []string{"db", "cid", "sid"})
r.metrics.memberIndexGap = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "index_gap",
Help: "Leader index minus local index (>=0)",
}, []string{"db", "cid", "sid"})
r.metrics.memberReporter = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "reporter",
Help: "1 if a self-view from this SID was collected in the scrape cycle",
}, []string{"db", "cid", "sid"})
r.metrics.memberMissingReporter = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "member", Name: "missing_reporter",
Help: "1 if SID appears in union but produced no self-view",
}, []string{"db", "cid", "sid"})
// --- Ops/housekeeping ---
r.metrics.leaderTransitionsTotal = p.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Subsystem: "ops", Name: "leader_transitions_total",
Help: "Count of observed leader SID changes",
}, []string{"db", "cid"})
r.metrics.collectErrorsTotal = p.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Subsystem: "ops", Name: "collect_errors_total",
Help: "Count of errors during health collection/analysis",
}, []string{"db", "cid"})
r.metrics.publishEventsTotal = p.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Subsystem: "ops", Name: "publish_events_total",
Help: "Count of SSE publish events (optional)",
}, []string{"db", "cid"})
r.metrics.snapshotTimestampSec = p.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Subsystem: "ops", Name: "snapshot_timestamp_seconds",
Help: "Unix timestamp of the last successful consensus snapshot",
}, []string{"db", "cid"})
}
func (r *KubeOVNPlunger) WriteClusterMetrics(db string, snaps []ovnstatus.HealthSnapshot, ecv ovnstatus.ExtendedConsensusResult, expectedReplicas int) {
cid := cidFromSnaps(snaps)
// Core cluster health
r.metrics.clusterQuorum.WithLabelValues(db, cid).Set(b2f(ecv.HasMajority))
r.metrics.allAgree.WithLabelValues(db, cid).Set(b2f(ecv.AllAgree))
r.metrics.membersExpected.WithLabelValues(db, cid).Set(float64(expectedReplicas))
r.metrics.membersObserved.WithLabelValues(db, cid).Set(float64(ecv.MembersCount))
r.metrics.ipsExpected.WithLabelValues(db, cid).Set(float64(len(ecv.ConsensusResult.TruthView.Members))) // optional; or len(hints.ExpectedIPs)
r.metrics.ipsObserved.WithLabelValues(db, cid).Set(float64(ecv.DistinctIPCount))
r.metrics.excessMembers.WithLabelValues(db, cid).Set(float64(ecv.ExpectedExcess))
r.metrics.missingMembers.WithLabelValues(db, cid).Set(float64(ecv.ExpectedShortfall))
r.metrics.unexpectedIPsCount.WithLabelValues(db, cid).Set(float64(len(ecv.UnexpectedIPs)))
r.metrics.missingExpectedIPsCount.WithLabelValues(db, cid).Set(float64(len(ecv.MissingExpectedIPs)))
r.metrics.ipConflictsCount.WithLabelValues(db, cid).Set(float64(len(ecv.IPConflicts)))
// Count SIDs with >1 distinct addresses
disagree := 0
for _, n := range ecv.SIDAddressDisagreements {
if n > 1 {
disagree++
}
}
r.metrics.sidAddrDisagreements.WithLabelValues(db, cid).Set(float64(disagree))
// Consensus summary
r.metrics.consensusMajoritySize.WithLabelValues(db, cid).Set(float64(len(ecv.MajorityMembers)))
r.metrics.consensusMinoritySize.WithLabelValues(db, cid).Set(float64(len(ecv.MinorityMembers)))
// Sum diffs across reporters (missing + extra + mismatches)
totalDiffs := 0
for _, d := range ecv.Diffs {
totalDiffs += len(d.MissingSIDs) + len(d.ExtraSIDs) + len(d.AddressMismatches)
}
r.metrics.consensusDiffsTotal.WithLabelValues(db, cid).Set(float64(totalDiffs))
// Sparse per-key exports (reset then re-emit for this {db,cid})
r.metrics.unexpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
for _, ip := range ecv.UnexpectedIPs {
r.metrics.unexpectedIPGauge.WithLabelValues(db, cid, ip).Set(1)
}
r.metrics.missingExpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
for _, ip := range ecv.MissingExpectedIPs {
r.metrics.missingExpectedIPGauge.WithLabelValues(db, cid, ip).Set(1)
}
r.metrics.ipConflictGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
for ip, sids := range ecv.IPConflicts {
r.metrics.ipConflictGauge.WithLabelValues(db, cid, ip).Set(float64(len(sids)))
}
// Only emit suspects when remediation is warranted (e.g., TooManyMembers / unexpected IPs / conflicts)
r.metrics.suspectStaleGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
if ecv.TooManyMembers || len(ecv.UnexpectedIPs) > 0 || len(ecv.IPConflicts) > 0 {
for _, sid := range ecv.SuspectStaleSIDs {
r.metrics.suspectStaleGauge.WithLabelValues(db, cid, sid).Set(1)
}
}
// Snapshot timestamp
r.metrics.snapshotTimestampSec.WithLabelValues(db, cid).Set(float64(time.Now().Unix()))
}
func (r *KubeOVNPlunger) WriteMemberMetrics(db string, snaps []ovnstatus.HealthSnapshot, views []ovnstatus.MemberView, ecv ovnstatus.ExtendedConsensusResult) {
cid := cidFromSnaps(snaps)
// Figure out current leader SID (prefer local view from any leader snapshot)
curLeader := ""
for _, s := range snaps {
if s.Local.Leader {
curLeader = s.Local.SID
break
}
}
// Leader transitions
key := db + "|" + cid
if prev, ok := r.lastLeader[key]; ok && prev != "" && curLeader != "" && prev != curLeader {
r.metrics.leaderTransitionsTotal.WithLabelValues(db, cid).Inc()
}
if curLeader != "" {
r.lastLeader[key] = curLeader
}
// Build quick maps for reporter set & IP per SID (best-effort)
reporter := map[string]struct{}{}
for _, v := range views {
if v.FromSID != "" {
reporter[v.FromSID] = struct{}{}
}
}
sidToIP := map[string]string{}
for _, v := range views {
for sid, addr := range v.Members {
if sidToIP[sid] == "" && addr != "" {
sidToIP[sid] = ovnstatus.AddrToIP(addr) // expose addrToIP or wrap here
}
}
}
// Reset member vectors for this {db,cid} (avoid stale series)
r.metrics.memberConnected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberLeader.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberLastMsgMs.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberIndex.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberIndexGap.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberMissingReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
// Leader index (to compute gaps)
lIdx := leaderIndex(snaps, curLeader)
// Emit one series per snapshot (self view)
for _, s := range snaps {
sid := s.Local.SID
ip := sidToIP[sid]
if ip == "" {
ip = "unknown"
}
r.metrics.memberConnected.WithLabelValues(db, cid, sid, ip).Set(b2f(s.Local.Connected))
r.metrics.memberLeader.WithLabelValues(db, cid, sid).Set(b2f(s.Local.Leader))
r.metrics.memberIndex.WithLabelValues(db, cid, sid).Set(float64(s.Local.Index))
if lIdx != nil && s.Local.Index >= 0 {
gap := *lIdx - s.Local.Index
if gap < 0 {
gap = 0
}
r.metrics.memberIndexGap.WithLabelValues(db, cid, sid).Set(float64(gap))
}
// Reporter presence
_, isReporter := reporter[sid]
r.metrics.memberReporter.WithLabelValues(db, cid, sid).Set(b2f(isReporter))
}
// “Missing reporter” SIDs = union reporters (from ecv)
reporterSet := map[string]struct{}{}
for sid := range reporter {
reporterSet[sid] = struct{}{}
}
unionSet := map[string]struct{}{}
for _, sid := range ecv.UnionMembers {
unionSet[sid] = struct{}{}
}
for sid := range unionSet {
if _, ok := reporterSet[sid]; !ok {
r.metrics.memberMissingReporter.WithLabelValues(db, cid, sid).Set(1)
}
}
// Legacy follower freshness (if you kept LastMsgMs in servers parsing)
// We only know LastMsgMs from the Full.Servers in each snapshot; pick the freshest per SID.
lastMsg := map[string]int64{}
for _, s := range snaps {
for _, srv := range s.Full.Servers {
if srv.LastMsgMs != nil {
cur, ok := lastMsg[srv.SID]
if !ok || *srv.LastMsgMs < cur {
lastMsg[srv.SID] = *srv.LastMsgMs
}
}
}
}
for sid, ms := range lastMsg {
r.metrics.memberLastMsgMs.WithLabelValues(db, cid, sid).Set(float64(ms))
}
}
func (r *KubeOVNPlunger) deleteAllFor(db, cid string) {
// Cluster-level vecs (db,cid)
r.metrics.clusterQuorum.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.allAgree.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.membersExpected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.membersObserved.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.ipsExpected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.ipsObserved.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.excessMembers.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.missingMembers.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.unexpectedIPsCount.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.missingExpectedIPsCount.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.ipConflictsCount.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.sidAddrDisagreements.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.consensusMajoritySize.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.consensusMinoritySize.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.consensusDiffsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
// Sparse detail vecs (db,cid,*)
r.metrics.unexpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.missingExpectedIPGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.ipConflictGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.suspectStaleGauge.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
// Per-member vecs (db,cid,*)
r.metrics.memberConnected.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberLeader.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberLastMsgMs.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberIndex.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberIndexGap.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.memberMissingReporter.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
// Ops vecs (db,cid)
r.metrics.leaderTransitionsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.collectErrorsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.publishEventsTotal.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
r.metrics.snapshotTimestampSec.DeletePartialMatch(prometheus.Labels{"db": db, "cid": cid})
}

View File

@@ -0,0 +1,31 @@
package kubeovnplunger
import "github.com/cozystack/cozystack/pkg/ovnstatus"
func b2f(b bool) float64 {
if b {
return 1
}
return 0
}
// Pull a cluster UUID (cid) from any snapshots Local.CID (falls back to "")
func cidFromSnaps(snaps []ovnstatus.HealthSnapshot) string {
for _, s := range snaps {
if s.Local.CID != "" {
return s.Local.CID
}
}
return ""
}
// Map SID -> last local index to compute gaps (optional)
func leaderIndex(snaps []ovnstatus.HealthSnapshot, leaderSID string) (idx *int64) {
for _, s := range snaps {
if s.Local.SID == leaderSID && s.Local.Index > 0 {
v := s.Local.Index
return &v
}
}
return nil
}

293
internal/sse/server.go Normal file
View File

@@ -0,0 +1,293 @@
// Package sse provides a tiny Server-Sent Events server with pluggable routes.
// No external deps; safe for quick demos and small dashboards.
package sse
import (
"context"
"fmt"
"html/template"
"log"
"net/http"
"strings"
"sync"
"time"
)
// Options configures the SSE server.
type Options struct {
// Addr is the listening address, e.g. ":8080" or "127.0.0.1:0".
Addr string
// IndexPath is the path serving a minimal live HTML page ("" to disable).
// e.g. "/" or "/status"
IndexPath string
// StreamPath is the SSE endpoint path, e.g. "/stream".
StreamPath string
// Title for the index page (cosmetic).
Title string
// AllowCORS, if true, sets Access-Control-Allow-Origin: * for /stream.
AllowCORS bool
// ClientBuf is the per-client buffered message queue size.
// If 0, defaults to 16. When full, new messages are dropped for that client.
ClientBuf int
// Heartbeat sends a comment line every interval to keep connections alive.
// If 0, defaults to 25s.
Heartbeat time.Duration
// Logger (optional). If nil, log.Printf is used.
Logger *log.Logger
}
// Server is a simple SSE broadcaster.
type Server struct {
opts Options
mux *http.ServeMux
http *http.Server
clientsMu sync.RWMutex
clients map[*client]struct{}
// latest holds the most recent payload (sent to new clients on connect).
latestMu sync.RWMutex
latest string
}
type client struct {
ch chan string
closeCh chan struct{}
flusher http.Flusher
w http.ResponseWriter
req *http.Request
logf func(string, ...any)
heartbeat time.Duration
}
func New(opts Options) *Server {
if opts.ClientBuf <= 0 {
opts.ClientBuf = 16
}
if opts.Heartbeat <= 0 {
opts.Heartbeat = 25 * time.Second
}
if opts.Addr == "" {
opts.Addr = ":8080"
}
if opts.StreamPath == "" {
opts.StreamPath = "/stream"
}
if opts.IndexPath == "" {
opts.IndexPath = "/"
}
s := &Server{
opts: opts,
mux: http.NewServeMux(),
clients: make(map[*client]struct{}),
}
s.routes()
s.http = &http.Server{
Addr: opts.Addr,
Handler: s.mux,
ReadHeaderTimeout: 10 * time.Second,
}
return s
}
func (s *Server) routes() {
if s.opts.IndexPath != "" {
s.mux.HandleFunc(s.opts.IndexPath, s.handleIndex)
}
s.mux.HandleFunc(s.opts.StreamPath, s.handleStream)
}
func (s *Server) logf(format string, args ...any) {
if s.opts.Logger != nil {
s.opts.Logger.Printf(format, args...)
} else {
log.Printf(format, args...)
}
}
// ListenAndServe starts the HTTP server (blocking).
func (s *Server) ListenAndServe() error {
s.logf("sse: listening on http://%s (index=%s, stream=%s)", s.http.Addr, s.opts.IndexPath, s.opts.StreamPath)
return s.http.ListenAndServe()
}
// Shutdown gracefully stops the server.
func (s *Server) Shutdown(ctx context.Context) error {
s.clientsMu.Lock()
for c := range s.clients {
close(c.closeCh)
}
s.clientsMu.Unlock()
return s.http.Shutdown(ctx)
}
// Publish broadcasts a new payload to all clients and stores it as latest.
func (s *Server) Publish(payload string) {
// Store latest
s.latestMu.Lock()
s.latest = payload
s.latestMu.Unlock()
// Broadcast
s.clientsMu.RLock()
defer s.clientsMu.RUnlock()
for c := range s.clients {
select {
case c.ch <- payload:
default:
// Drop if client is slow (buffer full)
if s.opts.Logger != nil {
s.opts.Logger.Printf("sse: dropping message to slow client %p", c)
}
}
}
}
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
page := indexTemplate(s.opts.Title, s.opts.StreamPath)
_, _ = w.Write([]byte(page))
}
func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
// Required SSE headers
if s.opts.AllowCORS {
w.Header().Set("Access-Control-Allow-Origin", "*")
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
c := &client{
ch: make(chan string, s.opts.ClientBuf),
closeCh: make(chan struct{}),
flusher: flusher,
w: w,
req: r,
logf: s.logf,
heartbeat: s.opts.Heartbeat,
}
// Register client
s.clientsMu.Lock()
s.clients[c] = struct{}{}
s.clientsMu.Unlock()
// Initial comment to open the stream for some proxies
fmt.Fprintf(w, ": connected %s\n\n", time.Now().Format(time.RFC3339))
flusher.Flush()
// Send latest if any
s.latestMu.RLock()
latest := s.latest
s.latestMu.RUnlock()
if latest != "" {
writeSSE(w, latest)
flusher.Flush()
}
// Start pump
go c.pump()
// Block until client disconnects
<-r.Context().Done()
// Unregister client
close(c.closeCh)
s.clientsMu.Lock()
delete(s.clients, c)
s.clientsMu.Unlock()
}
func (c *client) pump() {
t := time.NewTicker(c.heartbeat)
defer t.Stop()
for {
select {
case <-c.closeCh:
return
case msg := <-c.ch:
writeSSE(c.w, msg)
c.flusher.Flush()
case <-t.C:
// heartbeat comment (keeps connections alive through proxies)
fmt.Fprint(c.w, ": hb\n\n")
c.flusher.Flush()
}
}
}
func writeSSE(w http.ResponseWriter, msg string) {
// Split on lines; each needs its own "data:" field per the SSE spec
lines := strings.Split(strings.TrimRight(msg, "\n"), "\n")
for _, ln := range lines {
fmt.Fprintf(w, "data: %s\n", ln)
}
fmt.Fprint(w, "\n")
}
// Minimal index page with live updates
func indexTemplate(title, streamPath string) string {
if title == "" {
title = "SSE Stream"
}
if streamPath == "" {
streamPath = "/stream"
}
const tpl = `<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>{{.Title}}</title>
<style>
body { font-family: system-ui, sans-serif; margin: 2rem; }
pre { background:#111; color:#eee; padding:1rem; border-radius:12px; white-space:pre-wrap;}
.status { margin-bottom: 1rem; }
</style>
</head>
<body>
<h1>{{.Title}}</h1>
<div class="status">Connecting…</div>
<pre id="out"></pre>
<script>
const statusEl = document.querySelector('.status');
const out = document.getElementById('out');
const es = new EventSource('{{.Stream}}');
es.onmessage = (e) => {
// Replace content with the latest full snapshot
if (e.data === "") return;
// We accumulate until a blank 'data:' terminator; simpler approach: reset on first line.
// For this demo, server always sends full content in one event, so just overwrite.
out.textContent = (out._acc ?? "") + e.data + "\n";
};
es.addEventListener('open', () => { statusEl.textContent = "Connected"; out._acc = ""; });
es.addEventListener('error', () => { statusEl.textContent = "Disconnected (browser will retry)…"; out._acc = ""; });
// Optional: keep the latest only per message
es.onmessage = (e) => {
out.textContent = e.data + "\n";
statusEl.textContent = "Connected";
};
</script>
</body>
</html>`
page, _ := template.New("idx").Parse(tpl)
var b strings.Builder
_ = page.Execute(&b, map[string]any{
"Title": title,
"Stream": streamPath,
})
return b.String()
}