Compare commits

...

24 Commits

Author SHA1 Message Date
Cédric Verstraeten
0e6a004c23 Merge pull request #259 from kerberos-io/fix/add-grace-period
feature/add-broadcasting-feature
2026-03-09 16:20:39 +01:00
Cédric Verstraeten
617f854534 Merge branch 'master' into fix/add-grace-period 2026-03-09 16:17:35 +01:00
Cédric Verstraeten
1bf8006055 Refactor WebRTC handling to use per-peer broadcasters for video and audio tracks 2026-03-09 15:12:01 +00:00
Cédric Verstraeten
ca0e426382 Add max signaling age constant and discard stale WebRTC messages 2026-03-09 14:50:00 +00:00
Cédric Verstraeten
726d0722d9 Merge pull request #258 from kerberos-io/fix/add-grace-period
fix/add-grace-period
2026-03-09 15:20:53 +01:00
Cédric Verstraeten
d8f320b040 Add disconnect grace period handling in WebRTC connection manager 2026-03-09 14:15:50 +00:00
Cédric Verstraeten
0131b87692 Merge pull request #257 from kerberos-io/security/middleware-exposure
security/middleware-exposure
2026-03-09 14:18:11 +01:00
Cédric Verstraeten
54e8198b65 Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-09 14:18:00 +01:00
Cédric Verstraeten
3bfb68f950 Update port configuration and secure routes with JWT authentication middleware 2026-03-09 12:42:05 +00:00
Cédric Verstraeten
c05e59c936 Merge pull request #255 from kerberos-io/feature/improve-mqtt-concurrency
feature/improve-mqtt-concurrency
2026-03-09 13:25:26 +01:00
Cédric Verstraeten
b42d63b668 Enhance WebRTC packet processing for improved latency handling and keyframe synchronization 2026-03-09 12:17:38 +00:00
Cédric Verstraeten
0ca007e424 Refactor session key usage in ConnectionManager and enhance candidate queuing 2026-03-09 12:09:22 +00:00
Cédric Verstraeten
229d085de7 Merge pull request #253 from kerberos-io/fix/mqtt-reconnection
fix/mqtt-reconnection
2026-03-09 12:43:42 +01:00
Cédric Verstraeten
30e2b8318d Refactor build workflow to support multi-architecture builds and enhance MQTT connection handling 2026-03-09 11:40:24 +00:00
Cédric Verstraeten
dbcf4e242c Enhance MQTT reconnection handling and improve WebRTC connection cleanup
- Enable automatic reconnection for MQTT with configurable intervals and timeouts.
- Add logging for connection loss and reconnection attempts.
- Refactor WebRTC connection cleanup to ensure proper resource management on disconnection.
- Improve event handling in ImageCanvas and Dashboard components for better performance and reliability.
2026-03-09 11:04:10 +00:00
Cédric Verstraeten
ccf4034cc8 Merge pull request #252 from kerberos-io/fix/close-mp4-after-started
fix/close-mp4-after-started
2026-03-03 15:21:12 +01:00
Cédric Verstraeten
a34836e8f4 Delay MP4 creation until the first keyframe is received to ensure valid recordings 2026-03-03 14:16:39 +00:00
Cédric Verstraeten
dd1464d1be Fix recording closure condition to ensure it only triggers after recording has started 2026-03-03 14:03:11 +00:00
Cédric Verstraeten
2c02e0aeb1 Merge pull request #250 from kerberos-io/fix/add-avc-description-fallback
fix/add-avc-description-fallback
2026-02-27 11:48:34 +01:00
cedricve
d5464362bb Add AVC descriptor fallback for SPS parse errors
When setting the AVC descriptor fails in MP4.Close(), attempt a fallback that constructs an AvcC/avc1 sample entry from available SPS/PPS NALUs. Adds github.com/Eyevinn/mp4ff/avc import and two helpers: addAVCDescriptorFallback (builds a visual sample entry, sets tkhd width/height if available, and inserts it into stsd) and buildAVCDecConfRecFromSPS (creates an avc.DecConfRec from SPS/PPS bytes by extracting profile/compat/level and filling defaults). Logs errors and warns when the fallback is used. This provides resilience against SPS parsing errors when writing the MP4 track descriptor.
2026-02-27 11:35:22 +01:00
Cédric Verstraeten
5bcefd0015 Merge pull request #249 from kerberos-io/feature/enhance-avc-hevc-ssp-nalus
feature/enhance-avc-hevc-ssp-nalus
2026-02-27 11:12:03 +01:00
cedricve
5bb9def42d Normalize and debug H264/H265 parameter sets
Replace direct sanitizeParameterSets usage with normalizeH264ParameterSets and normalizeH265ParameterSets in mp4.Close. The new functions split Annex-B blobs, strip start codes, detect NALU types (SPS/PPS for AVC; VPS/SPS/PPS for HEVC), aggregate distinct parameter sets and fall back to sanitizeParameterSets if none are found. Added splitParamSetNALUs and formatNaluDebug helpers and debug logging to output concise parameter-set summaries before setting AVC/HEVC descriptors. These changes improve handling of concatenated Annex-B parameter set blobs and make debugging parameter extraction easier.
2026-02-27 11:09:28 +01:00
Cédric Verstraeten
ff38ccbadf Merge pull request #248 from kerberos-io/fix/sanitize-parameter-sets
fix/sanitize-parameter-sets
2026-02-26 20:43:53 +01:00
cedricve
f64e899de9 Populate/sanitize NALUs and avoid empty MP4
Fill missing SPS/PPS/VPS from camera config before closing recordings and warn when parameter sets are incomplete (for both continuous and motion-detection flows). Sanitize parameter sets (remove Annex-B start codes and drop empty NALUs) before writing AVC/HEVC descriptors. Prevent creation of empty MP4 files by flushing/closing and removing files when no audio/video samples were added, and only add an audio track when audio samples exist.
2026-02-26 20:37:10 +01:00
11 changed files with 950 additions and 285 deletions

View File

@@ -7,61 +7,34 @@ env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-24.04
build:
runs-on: ${{ matrix.runner }}
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
include:
- architecture: amd64
runner: ubuntu-24.04
dockerfile: Dockerfile
- architecture: arm64
runner: ubuntu-24.04-arm
dockerfile: Dockerfile.arm64
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
build-arm64:
runs-on: ubuntu-24.04-arm
permissions:
contents: write
strategy:
matrix:
architecture: [arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
docker build -t ${{ matrix.architecture }} -f ${{ matrix.dockerfile }} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}

View File

@@ -159,6 +159,19 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
}
// Close mp4
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
}
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
}
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
}
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
log.Log.Warning("capture.main.HandleRecordStream(continuous): closing MP4 without full parameter sets, moov may be incomplete")
}
mp4Video.Close(&config)
log.Log.Info("capture.main.HandleRecordStream(continuous): recording finished: file save: " + name)
@@ -505,21 +518,8 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): missing SPS/PPS at recording start")
}
// Create a video file, and set the dimensions.
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
// Create the MP4 only once the first keyframe arrives.
var mp4Video *video.MP4
for cursorError == nil {
@@ -538,7 +538,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
default:
}
if (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
if start && (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
log.Log.Info("capture.main.HandleRecordStream(motiondetection): timestamp+postRecording-now < 0 - " + strconv.FormatInt(motionTimestamp+postRecording-now, 10) + " < 0")
log.Log.Info("capture.main.HandleRecordStream(motiondetection): now-startRecording > maxRecordingPeriod-500 - " + strconv.FormatInt(now-startRecording, 10) + " > " + strconv.FormatInt(maxRecordingPeriod-500, 10))
log.Log.Info("capture.main.HandleRecordStream(motiondetection): closing recording (timestamp: " + strconv.FormatInt(motionTimestamp, 10) + ", postRecording: " + strconv.FormatInt(postRecording, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
@@ -548,20 +548,44 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// We start the recording if we have a keyframe and the last duration is 0 or less than the current packet time.
// It could be start we start from the beginning of the recording.
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): write frames")
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): recording started on keyframe")
// Align duration timers with the first keyframe.
startRecording = pkt.CurrentTime
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
start = true
}
if start {
pts := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add video sample")
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.IsAudio {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add audio sample")
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
@@ -579,7 +603,25 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// This is used to determine if we need to start a new recording.
lastRecordingTime = pkt.CurrentTime
if mp4Video == nil {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): recording closed without keyframe; no MP4 created")
continue
}
// This will close the recording and write the last packet.
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
}
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
}
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
}
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): closing MP4 without full parameter sets, moov may be incomplete")
}
mp4Video.Close(&config)
log.Log.Info("capture.main.HandleRecordStream(motiondetection): file save: " + name)

View File

@@ -800,17 +800,19 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
// Create per-peer broadcasters instead of shared tracks.
// Each viewer gets its own track with independent, non-blocking writes
// so a slow/congested peer cannot stall the others.
streams, _ := rtspClient.GetStreams()
videoTrack := webrtc.NewVideoTrack(streams)
audioTrack := webrtc.NewAudioTrack(streams)
videoBroadcaster := webrtc.NewVideoBroadcaster(streams)
audioBroadcaster := webrtc.NewAudioBroadcaster(streams)
if videoTrack == nil && audioTrack == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio tracks")
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio broadcasters")
return
}
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, rtspClient)
if config.Capture.ForwardWebRTC == "true" {
@@ -818,7 +820,7 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Info("cloud.HandleLiveStreamHD(): Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("cloud.HandleLiveStreamHD(): setting up a peer connection.")
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, handshake)
}
}

View File

@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.GET("/config", func(c *gin.Context) {
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.POST("/config", func(c *gin.Context) {
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
api := r.Group("/api")
{
// Public endpoints (no authentication required)
api.POST("/login", authMiddleware.LoginHandler)
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods. Doesn't require any authorization.
// These are available for anyone, but require the agent, to reach
// the camera.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods. Doesn't require any authorization.
// Will verify the current onvif settings.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
// Secured endpoints..
// Apply JWT authentication middleware.
// All routes registered below this line require a valid JWT token.
api.Use(authMiddleware.MiddlewareFunc())
{
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
}
}
return api

View File

@@ -90,10 +90,31 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(true)
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
opts.SetStore(mqtt.NewMemoryStore())
opts.SetConnectRetry(true)
//opts.SetAutoReconnect(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetMaxReconnectInterval(1 * time.Minute)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetWriteTimeout(10 * time.Second)
opts.SetOrderMatters(false)
opts.SetConnectTimeout(30 * time.Second)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
if err != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error())
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost")
}
})
opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker")
})
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): MQTT session is online")
})
hubKey := ""
// This is the old way ;)
@@ -133,10 +154,14 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
}
}
mqc := mqtt.NewClient(opts)
if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) {
if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): initial MQTT connection established")
}
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection")
}
return mqc
}
@@ -144,12 +169,18 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
return nil
}
// maxSignalingAge is the maximum age of a WebRTC signaling message (request-hd-stream,
// receive-hd-candidates) before it is considered stale and discarded. With CleanSession=false
// the MQTT broker may replay queued messages from previous sessions; this prevents the agent
// from setting up peer connections for viewers that are no longer waiting.
const maxSignalingAge = 30 * time.Second
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey)
mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
// Decode the message, we are expecting following format.
// {
@@ -249,6 +280,18 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
// We'll find out which message we received, and act accordingly.
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): received message with action: " + payload.Action)
// For time-sensitive WebRTC signaling messages, discard stale ones that may
// have been queued by the broker while CleanSession=false.
if payload.Action == "request-hd-stream" || payload.Action == "receive-hd-candidates" {
messageAge := time.Since(time.Unix(message.Timestamp, 0))
if messageAge > maxSignalingAge {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): discarding stale " + payload.Action +
" message (age: " + messageAge.Round(time.Second).String() + ")")
return
}
}
switch payload.Action {
case "record":
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
@@ -276,6 +319,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
}
})
if token.WaitTimeout(10 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener)
}
} else {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener)
}
}
}

View File

@@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/Eyevinn/mp4ff/avc"
mp4ff "github.com/Eyevinn/mp4ff/mp4"
"github.com/kerberos-io/agent/machinery/src/encryption"
"github.com/kerberos-io/agent/machinery/src/log"
@@ -431,7 +432,12 @@ func (mp4 *MP4) Close(config *models.Config) {
mp4.TotalKeyframesReceived, mp4.TotalKeyframesWritten, mp4.SegmentCount, mp4.FragmentKeyframeCount))
if mp4.VideoTotalDuration == 0 && mp4.AudioTotalDuration == 0 {
log.Log.Error("mp4.Close(): no video or audio samples added, cannot create MP4 file")
log.Log.Error("mp4.Close(): no video or audio samples added, removing empty MP4 file")
mp4.Writer.Flush()
_ = mp4.FileWriter.Sync()
_ = mp4.FileWriter.Close()
_ = os.Remove(mp4.FileName)
return
}
// Add final pending samples before closing
@@ -554,9 +560,16 @@ func (mp4 *MP4) Close(config *models.Config) {
case "H264", "AVC1":
init.AddEmptyTrack(videoTimescale, "video", "und")
includePS := true
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", mp4.SPSNALUs, mp4.PPSNALUs, includePS)
spsNALUs, ppsNALUs := normalizeH264ParameterSets(mp4.SPSNALUs, mp4.PPSNALUs)
log.Log.Debug("mp4.Close(): AVC parameter sets: SPS=" + formatNaluDebug(spsNALUs) + ", PPS=" + formatNaluDebug(ppsNALUs))
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", spsNALUs, ppsNALUs, includePS)
if err != nil {
log.Log.Error("mp4.Close(): error setting AVC descriptor: " + err.Error())
if fallbackErr := addAVCDescriptorFallback(init.Moov.Traks[0], spsNALUs, ppsNALUs, uint16(mp4.width), uint16(mp4.height)); fallbackErr != nil {
log.Log.Error("mp4.Close(): error setting AVC descriptor fallback: " + fallbackErr.Error())
} else {
log.Log.Warning("mp4.Close(): AVC descriptor fallback used due to SPS parse error")
}
}
init.Moov.Traks[0].Tkhd.Duration = actualVideoDuration
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
@@ -573,7 +586,9 @@ func (mp4 *MP4) Close(config *models.Config) {
case "H265", "HVC1":
init.AddEmptyTrack(videoTimescale, "video", "und")
includePS := true
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs, [][]byte{}, includePS)
vpsNALUs, spsNALUs, ppsNALUs := normalizeH265ParameterSets(mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs)
log.Log.Debug("mp4.Close(): HEVC parameter sets: VPS=" + formatNaluDebug(vpsNALUs) + ", SPS=" + formatNaluDebug(spsNALUs) + ", PPS=" + formatNaluDebug(ppsNALUs))
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", vpsNALUs, spsNALUs, ppsNALUs, [][]byte{}, includePS)
if err != nil {
log.Log.Error("mp4.Close(): error setting HEVC descriptor: " + err.Error())
}
@@ -589,8 +604,8 @@ func (mp4 *MP4) Close(config *models.Config) {
init.Moov.Traks[0].Mdia.Mdhd.ModificationTime = macTime
}
// Try adding audio track if available
if mp4.AudioTrackName == "AAC" || mp4.AudioTrackName == "MP4A" {
// Try adding audio track if available and samples were recorded.
if (mp4.AudioTrackName == "AAC" || mp4.AudioTrackName == "MP4A") && mp4.AudioTotalDuration > 0 {
// Add an audio track to the moov box
init.AddEmptyTrack(audioTimescale, "audio", "und")
@@ -828,6 +843,172 @@ func removeAnnexBStartCode(nalu []byte) []byte {
return nalu
}
// sanitizeParameterSets removes Annex B start codes and drops empty NALUs.
func sanitizeParameterSets(nalus [][]byte) [][]byte {
if len(nalus) == 0 {
return nalus
}
clean := make([][]byte, 0, len(nalus))
for _, nalu := range nalus {
trimmed := removeAnnexBStartCode(nalu)
if len(trimmed) == 0 {
continue
}
clean = append(clean, trimmed)
}
return clean
}
// normalizeH264ParameterSets splits Annex B blobs and extracts SPS/PPS NALUs.
func normalizeH264ParameterSets(spsIn [][]byte, ppsIn [][]byte) ([][]byte, [][]byte) {
all := make([][]byte, 0, len(spsIn)+len(ppsIn))
all = append(all, spsIn...)
all = append(all, ppsIn...)
var spsOut [][]byte
var ppsOut [][]byte
for _, blob := range all {
for _, nalu := range splitParamSetNALUs(blob) {
nalu = removeAnnexBStartCode(nalu)
if len(nalu) == 0 {
continue
}
typ := nalu[0] & 0x1F
switch typ {
case 7:
spsOut = append(spsOut, nalu)
case 8:
ppsOut = append(ppsOut, nalu)
}
}
}
if len(spsOut) == 0 {
spsOut = sanitizeParameterSets(spsIn)
}
if len(ppsOut) == 0 {
ppsOut = sanitizeParameterSets(ppsIn)
}
return spsOut, ppsOut
}
// normalizeH265ParameterSets splits Annex B blobs and extracts VPS/SPS/PPS NALUs.
func normalizeH265ParameterSets(vpsIn [][]byte, spsIn [][]byte, ppsIn [][]byte) ([][]byte, [][]byte, [][]byte) {
all := make([][]byte, 0, len(vpsIn)+len(spsIn)+len(ppsIn))
all = append(all, vpsIn...)
all = append(all, spsIn...)
all = append(all, ppsIn...)
var vpsOut [][]byte
var spsOut [][]byte
var ppsOut [][]byte
for _, blob := range all {
for _, nalu := range splitParamSetNALUs(blob) {
nalu = removeAnnexBStartCode(nalu)
if len(nalu) == 0 {
continue
}
typ := (nalu[0] >> 1) & 0x3F
switch typ {
case 32:
vpsOut = append(vpsOut, nalu)
case 33:
spsOut = append(spsOut, nalu)
case 34:
ppsOut = append(ppsOut, nalu)
}
}
}
if len(vpsOut) == 0 {
vpsOut = sanitizeParameterSets(vpsIn)
}
if len(spsOut) == 0 {
spsOut = sanitizeParameterSets(spsIn)
}
if len(ppsOut) == 0 {
ppsOut = sanitizeParameterSets(ppsIn)
}
return vpsOut, spsOut, ppsOut
}
// splitParamSetNALUs splits Annex B parameter set blobs; raw NALUs are returned as-is.
func splitParamSetNALUs(blob []byte) [][]byte {
if len(blob) == 0 {
return nil
}
if findStartCode(blob, 0) >= 0 {
return splitNALUs(blob)
}
return [][]byte{blob}
}
func formatNaluDebug(nalus [][]byte) string {
if len(nalus) == 0 {
return "none"
}
parts := make([]string, 0, len(nalus))
for _, nalu := range nalus {
if len(nalu) == 0 {
parts = append(parts, "len=0")
continue
}
max := 8
if len(nalu) < max {
max = len(nalu)
}
parts = append(parts, fmt.Sprintf("len=%d head=%x", len(nalu), nalu[:max]))
}
return strings.Join(parts, "; ")
}
func addAVCDescriptorFallback(trak *mp4ff.TrakBox, spsNALUs, ppsNALUs [][]byte, width, height uint16) error {
if trak == nil || trak.Mdia == nil || trak.Mdia.Minf == nil || trak.Mdia.Minf.Stbl == nil || trak.Mdia.Minf.Stbl.Stsd == nil {
return fmt.Errorf("missing trak stsd")
}
if len(spsNALUs) == 0 {
return fmt.Errorf("no SPS NALU available")
}
decConfRec, err := buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs)
if err != nil {
return err
}
if width == 0 && trak.Tkhd != nil {
width = uint16(uint32(trak.Tkhd.Width) >> 16)
}
if height == 0 && trak.Tkhd != nil {
height = uint16(uint32(trak.Tkhd.Height) >> 16)
}
if width > 0 && height > 0 && trak.Tkhd != nil {
trak.Tkhd.Width = mp4ff.Fixed32(uint32(width) << 16)
trak.Tkhd.Height = mp4ff.Fixed32(uint32(height) << 16)
}
avcC := &mp4ff.AvcCBox{DecConfRec: *decConfRec}
avcx := mp4ff.CreateVisualSampleEntryBox("avc1", width, height, avcC)
trak.Mdia.Minf.Stbl.Stsd.AddChild(avcx)
return nil
}
func buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs [][]byte) (*avc.DecConfRec, error) {
if len(spsNALUs) == 0 {
return nil, fmt.Errorf("no SPS NALU available")
}
sps := spsNALUs[0]
if len(sps) < 4 {
return nil, fmt.Errorf("SPS too short: len=%d", len(sps))
}
// SPS NALU: byte 0 is NAL header, next 3 bytes are profile/compat/level.
dec := &avc.DecConfRec{
AVCProfileIndication: sps[1],
ProfileCompatibility: sps[2],
AVCLevelIndication: sps[3],
SPSnalus: spsNALUs,
PPSnalus: ppsNALUs,
ChromaFormat: 1,
BitDepthLumaMinus1: 0,
BitDepthChromaMinus1: 0,
NumSPSExt: 0,
NoTrailingInfo: true,
}
return dec, nil
}
// splitNALUs splits Annex B data into raw NAL units without start codes.
func splitNALUs(data []byte) [][]byte {
var nalus [][]byte

View File

@@ -0,0 +1,137 @@
package webrtc
import (
"io"
"sync"
"github.com/kerberos-io/agent/machinery/src/log"
pionWebRTC "github.com/pion/webrtc/v4"
pionMedia "github.com/pion/webrtc/v4/pkg/media"
)
const (
// peerSampleBuffer controls how many samples can be buffered per peer before
// dropping. Keeps slow peers from blocking the broadcaster.
peerSampleBuffer = 60
)
// peerTrack is a per-peer track with its own non-blocking sample channel.
type peerTrack struct {
track *pionWebRTC.TrackLocalStaticSample
samples chan pionMedia.Sample
done chan struct{}
}
// TrackBroadcaster fans out media samples to multiple peer-specific tracks
// without blocking. Each peer gets its own TrackLocalStaticSample and a
// goroutine that drains samples independently, so a slow/congested peer
// cannot stall the others.
type TrackBroadcaster struct {
mu sync.RWMutex
peers map[string]*peerTrack
mimeType string
id string
streamID string
}
// NewTrackBroadcaster creates a new broadcaster for either video or audio.
func NewTrackBroadcaster(mimeType string, id string, streamID string) *TrackBroadcaster {
return &TrackBroadcaster{
peers: make(map[string]*peerTrack),
mimeType: mimeType,
id: id,
streamID: streamID,
}
}
// AddPeer creates a new per-peer track and starts a writer goroutine.
// Returns the track to be added to the PeerConnection via AddTrack().
func (b *TrackBroadcaster) AddPeer(sessionKey string) (*pionWebRTC.TrackLocalStaticSample, error) {
track, err := pionWebRTC.NewTrackLocalStaticSample(
pionWebRTC.RTPCodecCapability{MimeType: b.mimeType},
b.id,
b.streamID,
)
if err != nil {
return nil, err
}
pt := &peerTrack{
track: track,
samples: make(chan pionMedia.Sample, peerSampleBuffer),
done: make(chan struct{}),
}
b.mu.Lock()
b.peers[sessionKey] = pt
b.mu.Unlock()
// Per-peer writer goroutine — drains samples independently.
go func() {
defer close(pt.done)
for sample := range pt.samples {
if err := pt.track.WriteSample(sample); err != nil {
if err == io.ErrClosedPipe {
return
}
log.Log.Error("webrtc.broadcaster.peerWriter(): error writing sample for " + sessionKey + ": " + err.Error())
}
}
}()
log.Log.Info("webrtc.broadcaster.AddPeer(): added peer track for " + sessionKey)
return track, nil
}
// RemovePeer stops the writer goroutine and removes the peer.
func (b *TrackBroadcaster) RemovePeer(sessionKey string) {
b.mu.Lock()
pt, exists := b.peers[sessionKey]
if exists {
delete(b.peers, sessionKey)
}
b.mu.Unlock()
if exists {
close(pt.samples)
<-pt.done // wait for writer goroutine to finish
log.Log.Info("webrtc.broadcaster.RemovePeer(): removed peer track for " + sessionKey)
}
}
// WriteSample fans out a sample to all connected peers without blocking.
// If a peer's buffer is full (slow consumer), the sample is dropped for
// that peer only — other peers are unaffected.
func (b *TrackBroadcaster) WriteSample(sample pionMedia.Sample) {
b.mu.RLock()
defer b.mu.RUnlock()
for sessionKey, pt := range b.peers {
select {
case pt.samples <- sample:
default:
log.Log.Warning("webrtc.broadcaster.WriteSample(): dropping sample for slow peer " + sessionKey)
}
}
}
// PeerCount returns the current number of connected peers.
func (b *TrackBroadcaster) PeerCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.peers)
}
// Close removes all peers and stops all writer goroutines.
func (b *TrackBroadcaster) Close() {
b.mu.Lock()
keys := make([]string, 0, len(b.peers))
for k := range b.peers {
keys = append(keys, k)
}
b.mu.Unlock()
for _, key := range keys {
b.RemovePeer(key)
}
}

View File

@@ -29,8 +29,10 @@ const (
rtcpBufferSize = 1500
// Timeouts and intervals
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
maxLivePacketAge = 1500 * time.Millisecond
disconnectGracePeriod = 5 * time.Second
// Track identifiers
trackStreamID = "kerberos-stream"
@@ -46,10 +48,16 @@ type ConnectionManager struct {
// peerConnectionWrapper wraps a peer connection with additional metadata
type peerConnectionWrapper struct {
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
connected atomic.Bool
disconnectMu sync.Mutex
disconnectTimer *time.Timer
sessionKey string
videoBroadcaster *TrackBroadcaster
audioBroadcaster *TrackBroadcaster
}
var globalConnectionManager = NewConnectionManager()
@@ -88,22 +96,41 @@ func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
}
// AddPeerConnection adds a peer connection to the manager
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
func (cm *ConnectionManager) AddPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.peerConnections[sessionID] = wrapper
cm.peerConnections[sessionKey] = wrapper
}
// RemovePeerConnection removes a peer connection from the manager
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
func (cm *ConnectionManager) RemovePeerConnection(sessionKey string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if wrapper, exists := cm.peerConnections[sessionID]; exists {
if wrapper, exists := cm.peerConnections[sessionKey]; exists {
if wrapper.cancelCtx != nil {
wrapper.cancelCtx()
}
delete(cm.peerConnections, sessionID)
delete(cm.peerConnections, sessionKey)
}
}
// QueueCandidate safely queues a candidate for a session without racing with channel closure.
func (cm *ConnectionManager) QueueCandidate(sessionKey string, candidate string) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
ch, exists := cm.candidateChannels[sessionKey]
if !exists {
ch = make(chan string, candidateChannelBuffer)
cm.candidateChannels[sessionKey] = ch
}
select {
case ch <- candidate:
return true
default:
return false
}
}
@@ -122,6 +149,35 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, -1)
}
func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
wrapper.closeOnce.Do(func() {
if wrapper.connected.Swap(false) {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
}
// Remove per-peer tracks from broadcasters so the fan-out stops
// writing to this peer immediately.
if wrapper.videoBroadcaster != nil {
wrapper.videoBroadcaster.RemovePeer(sessionKey)
}
if wrapper.audioBroadcaster != nil {
wrapper.audioBroadcaster.RemovePeer(sessionKey)
}
globalConnectionManager.CloseCandidateChannel(sessionKey)
if wrapper.conn != nil {
if err := wrapper.conn.Close(); err != nil {
log.Log.Error("webrtc.main.cleanupPeerConnection(): error closing peer connection: " + err.Error())
}
}
globalConnectionManager.RemovePeerConnection(sessionKey)
close(wrapper.done)
})
}
type WebRTC struct {
Name string
StunServers []string
@@ -161,16 +217,27 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
}
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
select {
case ch <- candidate.Candidate:
default:
if !globalConnectionManager.QueueCandidate(key, candidate.Candidate) {
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
}
}
func decodeICECandidate(candidate string) (pionWebRTC.ICECandidateInit, error) {
if candidate == "" {
return pionWebRTC.ICECandidateInit{}, io.EOF
}
var candidateInit pionWebRTC.ICECandidateInit
if err := json.Unmarshal([]byte(candidate), &candidateInit); err == nil {
if candidateInit.Candidate != "" {
return candidateInit, nil
}
}
return pionWebRTC.ICECandidateInit{Candidate: candidate}, nil
}
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
if err := pionWebRTC.ConfigureNack(mediaEngine, interceptorRegistry); err != nil {
return err
@@ -184,7 +251,7 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
config := configuration.Config
deviceKey := config.Key
@@ -264,16 +331,27 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Create context for this connection
ctx, cancel := context.WithCancel(context.Background())
wrapper := &peerConnectionWrapper{
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
sessionKey: sessionKey,
videoBroadcaster: videoBroadcaster,
audioBroadcaster: audioBroadcaster,
}
// Create a per-peer video track from the broadcaster so writes
// to this peer are independent and non-blocking.
var videoSender *pionWebRTC.RTPSender = nil
if videoTrack != nil {
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
if videoBroadcaster != nil {
peerVideoTrack, trackErr := videoBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer video track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if videoSender, err = peerConnection.AddTrack(peerVideoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
cancel()
cleanupPeerConnection(sessionKey, wrapper)
return
}
} else {
@@ -302,11 +380,18 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Create a per-peer audio track from the broadcaster.
var audioSender *pionWebRTC.RTPSender = nil
if audioTrack != nil {
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
if audioBroadcaster != nil {
peerAudioTrack, trackErr := audioBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer audio track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if audioSender, err = peerConnection.AddTrack(peerAudioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
cancel()
cleanupPeerConnection(sessionKey, wrapper)
return
}
} else {
@@ -335,32 +420,65 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Log ICE connection state changes for diagnostics
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
" (session: " + handshake.SessionID + ")")
})
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
" (session: " + handshake.SessionID + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
wrapper.closeOnce.Do(func() {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
// Clean up resources
globalConnectionManager.CloseCandidateChannel(sessionKey)
if err := peerConnection.Close(); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
}
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
close(wrapper.done)
})
case pionWebRTC.PeerConnectionStateConnected:
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
case pionWebRTC.PeerConnectionStateDisconnected:
// Disconnected is a transient state that can recover.
// Start a grace period timer; if we don't recover, then cleanup.
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
cleanupPeerConnection(sessionKey, wrapper)
})
}
wrapper.disconnectMu.Unlock()
case pionWebRTC.PeerConnectionStateFailed:
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateClosed:
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateConnected:
// Cancel any pending disconnect timer — connection recovered
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
}
wrapper.disconnectMu.Unlock()
if wrapper.connected.CompareAndSwap(false, true) {
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
}
}
})
@@ -379,27 +497,21 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
}
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate
// The other peer will add this candidate by calling AddICECandidate.
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
var hasRelayCandidates bool
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
@@ -444,8 +556,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
candateBinary, err := json.Marshal(candidateJSON)
if err == nil {
valueMap["candidate"] = string(candateBinary)
// SDP is not needed to be send..
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
} else {
@@ -469,38 +579,81 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
})
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
}
} else {
log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error())
globalConnectionManager.CloseCandidateChannel(sessionKey)
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to decode remote session description: " + err.Error())
}
}
func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
// Verify H264 is available (same check as NewVideoTrack)
for _, s := range streams {
if s.Name == "H264" {
return NewTrackBroadcaster(pionWebRTC.MimeTypeH264, "video", trackStreamID)
}
}
log.Log.Error("webrtc.main.NewVideoBroadcaster(): no H264 stream found")
return nil
}
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
for _, s := range streams {
switch s.Name {
case "OPUS":
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
case "PCM_MULAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
case "PCM_ALAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
}
}
log.Log.Error("webrtc.main.NewAudioBroadcaster(): no supported audio codec found")
return nil
}
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
mimeType := pionWebRTC.MimeTypeH264
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
@@ -539,6 +692,7 @@ type streamState struct {
lastKeepAlive int64
peerCount int64
start bool
catchingUp bool
receivedKeyFrame bool
lastAudioSample *pionMedia.Sample
lastVideoSample *pionMedia.Sample
@@ -613,23 +767,54 @@ func updateStreamState(communication *models.Communication, state *streamState)
}
// writeFinalSamples writes any remaining buffered samples
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
if state.lastVideoSample != nil && videoTrack != nil {
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
}
func writeFinalSamples(state *streamState, videoBroadcaster, audioBroadcaster *TrackBroadcaster) {
if state.lastVideoSample != nil && videoBroadcaster != nil {
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
if state.lastAudioSample != nil && audioTrack != nil {
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
}
if state.lastAudioSample != nil && audioBroadcaster != nil {
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
}
// processVideoPacket processes a video packet and writes samples to the track
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
if videoTrack == nil {
func sampleTimestamp(pkt packets.Packet) uint32 {
if pkt.TimeLegacy > 0 {
return uint32(pkt.TimeLegacy.Milliseconds())
}
if pkt.Time > 0 {
return uint32(pkt.Time)
}
return 0
}
func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback time.Duration) time.Duration {
if current.TimeLegacy > 0 {
currentDurationMs := current.TimeLegacy.Milliseconds()
previousDurationMs := int64(previousTimestamp)
if currentDurationMs > previousDurationMs {
duration := time.Duration(currentDurationMs-previousDurationMs) * time.Millisecond
if duration > 0 {
return duration
}
}
}
currentTimestamp := sampleTimestamp(current)
if currentTimestamp > previousTimestamp {
duration := time.Duration(currentTimestamp-previousTimestamp) * time.Millisecond
if duration > 0 {
return duration
}
}
return fallback
}
// processVideoPacket processes a video packet and writes samples to the broadcaster
func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster *TrackBroadcaster, config models.Config) {
if videoBroadcaster == nil {
return
}
@@ -642,7 +827,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
return
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
if config.Capture.ForwardWebRTC == "true" {
// Remote forwarding not yet implemented
@@ -651,20 +836,16 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
}
if state.lastVideoSample != nil {
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
}
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
state.lastVideoSample = &sample
}
// processAudioPacket processes an audio packet and writes samples to the track
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
if audioTrack == nil {
// processAudioPacket processes an audio packet and writes samples to the broadcaster
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, hasAAC bool) {
if audioBroadcaster == nil {
return
}
@@ -674,27 +855,32 @@ func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pion
return
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
if state.lastAudioSample != nil {
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
}
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
state.lastAudioSample = &sample
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
func shouldDropPacketForLatency(pkt packets.Packet) bool {
if pkt.CurrentTime == 0 {
return false
}
age := time.Since(time.UnixMilli(pkt.CurrentTime))
return age > maxLivePacketAge
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, rtspClient capture.RTSPClient) {
config := configuration.Config
// Check if at least one track is available
if videoTrack == nil && audioTrack == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
// Check if at least one broadcaster is available
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio broadcasters are nil, cannot proceed")
return
}
@@ -717,7 +903,7 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}
defer func() {
writeFinalSamples(state, videoTrack, audioTrack)
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
}()
@@ -747,6 +933,31 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
continue
}
// Keep live WebRTC close to realtime.
// If audio+video load makes this consumer fall behind, skip old packets and
// wait for a recent keyframe before resuming video.
if shouldDropPacketForLatency(pkt) {
if !state.catchingUp {
log.Log.Warning("webrtc.main.WriteToTrack(): stream is lagging behind, dropping old packets until the next recent keyframe")
}
state.catchingUp = true
state.start = false
state.receivedKeyFrame = false
state.lastAudioSample = nil
state.lastVideoSample = nil
continue
}
if state.catchingUp {
if !(pkt.IsVideo && pkt.IsKeyFrame) {
continue
}
state.catchingUp = false
state.start = false
state.receivedKeyFrame = false
log.Log.Info("webrtc.main.WriteToTrack(): caught up with live stream at a recent keyframe")
}
// Wait for first keyframe before processing
if !state.receivedKeyFrame {
if pkt.IsKeyFrame {
@@ -758,9 +969,9 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
// Process video or audio packets
if pkt.IsVideo {
processVideoPacket(pkt, state, videoTrack, config)
processVideoPacket(pkt, state, videoBroadcaster, config)
} else if pkt.IsAudio {
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
processAudioPacket(pkt, state, audioBroadcaster, codecs.hasAAC)
}
}
}

View File

@@ -7,6 +7,7 @@ import './ImageCanvas.css';
class ImageCanvas extends React.Component {
componentDidMount() {
this.isUnmounted = false;
this.width = 0;
this.height = 0;
@@ -58,6 +59,9 @@ class ImageCanvas extends React.Component {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -71,6 +75,9 @@ class ImageCanvas extends React.Component {
componentDidUpdate() {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -82,11 +89,57 @@ class ImageCanvas extends React.Component {
});
}
componentWillUnmount() {
this.isUnmounted = true;
if (this.pendingImage) {
this.pendingImage.onload = null;
this.pendingImage.src = '';
this.pendingImage = null;
}
if (this.editor) {
this.editor.onSelectionEnd = null;
this.editor.onRegionMoveEnd = null;
this.editor.onRegionDelete = null;
if (this.editor.RM) {
this.editor.RM.deleteAllRegions();
}
if (typeof this.editor.dispose === 'function') {
this.editor.dispose();
} else if (typeof this.editor.destroy === 'function') {
this.editor.destroy();
}
this.editor = null;
}
if (this.toolbarContainer) {
this.toolbarContainer.innerHTML = '';
this.toolbarContainer = null;
}
if (this.editorContainer) {
this.editorContainer.innerHTML = '';
this.editorContainer = null;
}
}
loadData = (image) => {
if (!this.editor) {
return;
}
const w = image.width;
const h = image.height;
this.editor.addContentSource(image).then(() => {
if (this.isUnmounted || !this.editor) {
return;
}
// Add exisiting polygons
this.editor.RM.deleteAllRegions();
const { polygons } = this.props;
@@ -152,11 +205,19 @@ class ImageCanvas extends React.Component {
// eslint-disable-next-line class-methods-use-this
loadImage = (path, onready) => {
if (this.pendingImage) {
this.pendingImage.onload = null;
}
const image = new Image();
image.src = path;
image.addEventListener('load', (e) => {
this.pendingImage = image;
image.onload = (e) => {
if (this.pendingImage === image) {
this.pendingImage = null;
}
onready(e.target);
});
};
image.src = path;
};
// eslint-disable-next-line class-methods-use-this

View File

@@ -38,16 +38,14 @@ class Dashboard extends React.Component {
initialised: false,
};
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].addEventListener('load', () => {
this.setState({
liveviewLoaded: true,
});
});
[this.liveviewElement] = liveview;
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
}
this.initialiseLiveview();
}
@@ -57,13 +55,14 @@ class Dashboard extends React.Component {
}
componentWillUnmount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].remove();
if (this.liveviewElement) {
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
this.liveviewElement = null;
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {
@@ -72,6 +71,12 @@ class Dashboard extends React.Component {
dispatchSend(message);
}
handleLiveviewLoad() {
this.setState({
liveviewLoaded: true,
});
}
handleClose() {
this.setState({
open: false,

View File

@@ -159,7 +159,10 @@ class Settings extends React.Component {
componentWillUnmount() {
document.removeEventListener('keydown', this.escFunction, false);
clearInterval(this.interval);
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {