safe websocket

This commit is contained in:
typescreep
2025-10-31 01:56:07 +03:00
parent 79ea642953
commit 8a3477241f
3 changed files with 169 additions and 61 deletions

2
package-lock.json generated
View File

@@ -11,7 +11,7 @@
"@ant-design/icons": "5.6.0",
"@monaco-editor/react": "4.6.0",
"@originjs/vite-plugin-federation": "1.3.6",
"@prorobotech/openapi-k8s-toolkit": "^0.0.1-alpha.150",
"@prorobotech/openapi-k8s-toolkit": "0.0.1-alpha.150",
"@readme/openapi-parser": "4.0.0",
"@reduxjs/toolkit": "2.2.5",
"@tanstack/react-query": "5.62.2",

View File

@@ -13,7 +13,7 @@ import React, { FC, useCallback, useEffect, useReducer, useRef, useState } from
import { theme as antdtheme, Flex, Tooltip } from 'antd'
import { ResumeCircleIcon, PauseCircleIcon, LockedIcon, UnlockedIcon } from '@prorobotech/openapi-k8s-toolkit'
import { TScrollMsg, TServerFrame } from './types'
import { eventKey } from './utils'
import { eventKey, compareRV, getRV, getMaxRV } from './utils'
import { reducer } from './reducer'
import { EventRow } from './molecules'
import { Styled } from './styled'
@@ -44,6 +44,9 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
removeIgnoredRef.current = isRemoveIgnored
}, [isRemoveIgnored])
// track latest resourceVersion we have processed
const latestRVRef = useRef<string | undefined>(undefined)
// Reducer-backed store of events
const [state, dispatch] = useReducer(reducer, { order: [], byKey: {} })
@@ -64,6 +67,14 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
const backoffRef = useRef(750) // ms; increases on failures up to a cap
const urlRef = useRef(wsUrl) // latest wsUrl (stable inside callbacks)
// Guards for unmount & reconnect timer
const mountedRef = useRef(true)
const reconnectTimerRef = useRef<number | null>(null)
const onMessageRef = useRef<(ev: MessageEvent) => void>(() => {})
const startedRef = useRef(false)
const connectingRef = useRef(false)
const haveAnchorRef = useRef(false)
// Keep urlRef in sync so connect() uses the latest wsUrl
useEffect(() => {
urlRef.current = wsUrl
@@ -90,88 +101,148 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
wsRef.current.send(JSON.stringify(msg))
}, [contToken, pageSize])
const maybeAutoScroll = useCallback(() => {
if (wantMoreRef.current && hasMore) sendScroll()
}, [hasMore, sendScroll])
// Handle all incoming frames from the server
const onMessage = useCallback((ev: MessageEvent) => {
let frame: TServerFrame | undefined
try {
frame = JSON.parse(String(ev.data))
} catch {
// Ignore malformed frames; you could surface these in UI if desired
return
}
if (!frame) return
useEffect(() => {
onMessageRef.current = (ev: MessageEvent) => {
let frame: TServerFrame | undefined
try {
frame = JSON.parse(String(ev.data)) as TServerFrame
} catch {
return
}
if (!frame) return
if (frame.type === 'INITIAL') {
// Replace current list with newest set; set pagination token
dispatch({ type: 'RESET', items: frame.items })
setContToken(frame.continue)
setHasMore(Boolean(frame.continue))
setLastError(undefined)
return
}
if (frame.type === 'INITIAL') {
dispatch({ type: 'RESET', items: frame.items })
setContToken(frame.continue)
setHasMore(Boolean(frame.continue))
setLastError(undefined)
fetchingRef.current = false
if (frame.type === 'PAGE') {
// Append older items to the end; clear fetching guard
dispatch({ type: 'APPEND_PAGE', items: frame.items })
setContToken(frame.continue)
setHasMore(Boolean(frame.continue))
fetchingRef.current = false
return
}
if (frame.type === 'PAGE_ERROR') {
// Keep live stream but surface pagination error
setLastError(frame.error || 'Failed to load next page')
fetchingRef.current = false
return
}
if (!pausedRef.current) {
if (frame.type === 'ADDED' || frame.type === 'MODIFIED') {
// Live update: insert or replace
dispatch({ type: 'UPSERT', item: frame.item })
const snapshotRV = frame.resourceVersion || getMaxRV(frame.items)
if (snapshotRV) {
latestRVRef.current = snapshotRV
haveAnchorRef.current = true // NEW: we now have a safe anchor
}
return
}
if (!removeIgnoredRef.current && frame.type === 'DELETED') {
// Live delete
dispatch({ type: 'REMOVE', key: eventKey(frame.item) })
if (frame.type === 'PAGE') {
dispatch({ type: 'APPEND_PAGE', items: frame.items })
setContToken(frame.continue)
setHasMore(Boolean(frame.continue))
fetchingRef.current = false
const batchRV = getMaxRV(frame.items)
if (batchRV && (!latestRVRef.current || compareRV(batchRV, latestRVRef.current) > 0)) {
latestRVRef.current = batchRV
}
maybeAutoScroll()
return
}
if (frame.type === 'PAGE_ERROR') {
setLastError(frame.error || 'Failed to load next page')
fetchingRef.current = false
return
}
if (frame.type === 'ADDED' || frame.type === 'MODIFIED' || frame.type === 'DELETED') {
const rv = getRV(frame.item)
if (rv && (!latestRVRef.current || compareRV(rv, latestRVRef.current) > 0)) {
latestRVRef.current = rv
}
}
if (!pausedRef.current) {
if (frame.type === 'ADDED' || frame.type === 'MODIFIED') {
dispatch({ type: 'UPSERT', item: frame.item })
return
}
if (!removeIgnoredRef.current && frame.type === 'DELETED') {
dispatch({ type: 'REMOVE', key: eventKey(frame.item) })
}
}
}
}, [maybeAutoScroll])
const buildWsUrl = useCallback((raw: string) => {
try {
const hasScheme = /^[a-z]+:/i.test(raw)
const base = window.location.origin
let u = hasScheme ? new URL(raw) : new URL(raw.startsWith('/') ? raw : `/${raw}`, base)
if (u.protocol === 'http:') u.protocol = 'ws:'
if (u.protocol === 'https:') u.protocol = 'wss:'
if (u.protocol !== 'ws:' && u.protocol !== 'wss:') {
u = new URL(u.pathname + u.search + u.hash, base)
u.protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
}
if (haveAnchorRef.current && latestRVRef.current) {
u.searchParams.set('sinceRV', latestRVRef.current)
} else {
u.searchParams.delete('sinceRV')
}
return u.toString()
} catch {
const origin = window.location.origin.replace(/^http/, 'ws')
const prefix = raw.startsWith('/') ? '' : '/'
const rv = haveAnchorRef.current ? latestRVRef.current : undefined
const sep = raw.includes('?') ? '&' : '?'
return `${origin}${prefix}${raw}${rv ? `${sep}sinceRV=${encodeURIComponent(rv)}` : ''}`
}
}, [])
// Establish and maintain the WebSocket connection with bounded backoff
const connect = useCallback(() => {
if (!mountedRef.current) return
// Prevent duplicate opens
if (connectingRef.current) return
if (
wsRef.current &&
(wsRef.current.readyState === WebSocket.OPEN || wsRef.current.readyState === WebSocket.CONNECTING)
) {
return
}
connectingRef.current = true
setConnStatus('connecting')
setLastError(undefined)
// Accept absolute ws(s) URLs; otherwise resolve relative to current origin
const buildWsUrl = (raw: string) => {
if (/^wss?:/i.test(raw)) return raw // already absolute ws(s)
const origin = window.location.origin.replace(/^http/i, 'ws')
if (raw.startsWith('/')) return `${origin}${raw}`
return `${origin}/${raw}`
}
const ws = new WebSocket(buildWsUrl(urlRef.current))
const url = buildWsUrl(urlRef.current)
const ws = new WebSocket(url)
wsRef.current = ws
ws.addEventListener('open', () => {
if (!mountedRef.current) return
backoffRef.current = 750
fetchingRef.current = false
setConnStatus('open')
backoffRef.current = 750 // reset backoff on success
connectingRef.current = false
})
ws.addEventListener('message', onMessage)
ws.addEventListener('message', ev => onMessageRef.current(ev))
const scheduleReconnect = () => {
// Only clear if we're still looking at this instance
if (wsRef.current === ws) wsRef.current = null
setConnStatus('closed')
const wait = Math.min(backoffRef.current, 8000)
const next = Math.min(wait * 2, 12000)
connectingRef.current = false
// Bounded exponential backoff with jitter to avoid herding
const base = Math.min(backoffRef.current, 8000)
const jitter = Math.random() * 0.4 + 0.8 // 0.8x1.2x
const wait = Math.floor(base * jitter)
const next = Math.min(base * 2, 12000)
backoffRef.current = next
// Reconnect after a short delay; preserves component mount semantics
setTimeout(() => {
if (reconnectTimerRef.current) {
window.clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
reconnectTimerRef.current = window.setTimeout(() => {
if (!mountedRef.current) return
connect()
}, wait)
}
@@ -181,13 +252,30 @@ export const Events: FC<TEventsProps> = ({ wsUrl, pageSize = 50, height }) => {
setLastError('WebSocket error')
scheduleReconnect()
})
}, [onMessage])
}, [buildWsUrl])
// Kick off initial connection on mount; clean up on unmount
useEffect(() => {
if (startedRef.current) return undefined // StrictMode double-invoke guard
startedRef.current = true
mountedRef.current = true
connect()
return () => closeWS()
}, [connect, closeWS])
return () => {
mountedRef.current = false
startedRef.current = false
if (reconnectTimerRef.current) {
window.clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
closeWS()
wsRef.current = null
connectingRef.current = false
}
// INTENTIONALLY EMPTY DEPS do not reopen on state changes
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
// IntersectionObserver to trigger SCROLL when sentinel becomes visible
useEffect(() => {

View File

@@ -6,3 +6,23 @@ export const eventKey = (e: TEventsV1Event) => {
const ns = e.metadata?.namespace ?? ''
return `${ns}/${n}`
}
// Compare resourceVersions safely (string-based)
export const compareRV = (a: string, b: string): number => {
if (a.length !== b.length) return a.length > b.length ? 1 : -1
// eslint-disable-next-line no-nested-ternary
return a > b ? 1 : a < b ? -1 : 0
}
type WithRV = { metadata?: { resourceVersion?: string } }
export const getRV = (item: WithRV): string | undefined => item?.metadata?.resourceVersion
// ✅ Pure functional + no restricted syntax
export const getMaxRV = <T extends WithRV>(items: ReadonlyArray<T>): string | undefined => {
const rvs = items
.map(getRV)
.filter((v): v is string => Boolean(v))
.sort(compareRV)
return rvs.length ? rvs[rvs.length - 1] : undefined
}