Compare commits

..

36 Commits

Author SHA1 Message Date
Cédric Verstraeten
8657765e5d Merge pull request #262 from kerberos-io/feature/concurrency-webrtc
feature/concurrency-webrtc
2026-03-09 21:37:04 +01:00
Cédric Verstraeten
76a136abc9 Add trailing commas to fallback calls
Add trailing commas to the arguments passed to fallbackToSDLiveview in ui/src/pages/Dashboard/Dashboard.jsx. This is a non-functional formatting change applied to the WebRTC initialization, ICE candidate handling, and connection-state fallback calls to align with the project's code style/formatter.
2026-03-09 21:34:17 +01:00
Cédric Verstraeten
5475b79459 Remove extraneous trailing commas in Dashboard
Clean up trailing commas and minor formatting in ui/src/pages/Dashboard/Dashboard.jsx. Adjusts object/argument commas and formatting around WebRTC message handling, peer connection setup, error handling, and SD liveview fallback callbacks to avoid potential syntax/lint issues.
2026-03-09 21:32:10 +01:00
Cédric Verstraeten
2ad768780f Format Dashboard.jsx: add trailing commas
Apply consistent formatting to ui/src/pages/Dashboard/Dashboard.jsx by adding trailing commas in object literals, function call argument lists, and callbacks (primarily around WebRTC handling and error messages). This is a non-functional style change to match the project's code style (e.g., Prettier/ESLint) and should not affect runtime behavior.
2026-03-09 21:29:30 +01:00
Cédric Verstraeten
f64b5fb65b Replace uuidv4 with local createUUID
Remove the uuidv4 import and introduce a local createUUID helper that uses window.crypto.randomUUID when available and falls back to a v4-style generator. Update webrtcClientId and webrtcSessionId to use createUUID(), removing the external dependency while preserving UUID generation for WebRTC session/client IDs.
2026-03-09 21:27:01 +01:00
Cédric Verstraeten
bb773316a2 Add trailing commas and tidy Media.scss
Add trailing commas to multi-line function calls and RTCPeerConnection instantiation in Dashboard.jsx for consistent formatting. In Media.scss remove an extra blank line and relocate the .media-filters__field:first-child rule to consolidate related styles. Purely stylistic/organizational changes with no intended behavior change.
2026-03-09 21:23:16 +01:00
Cédric Verstraeten
fc6fa9d425 Format Media.jsx and add newline
Reformat code in ui/src/pages/Media/Media.jsx for readability: wrap long argument lists (getTimestampFromInput, buildEventFilter) and reflow the appliedFilter ternary onto multiple lines. Also add the missing trailing newline to ui/public/locales/en/translation.json. No functional changes.
2026-03-09 21:18:52 +01:00
Cédric Verstraeten
aa183ee0fb Enable MQTT persistent sessions and resume subs
Switch MQTT client to persistent sessions by setting CleanSession to false, enabling ResumeSubs and using an in-memory store. Previously CleanSession was true and resume/store were commented out, which could drop subscriptions on reconnect; these changes ensure subscriptions are preserved across reconnects and re-subscribed from memory.
2026-03-09 21:12:22 +01:00
Cédric Verstraeten
730b1b2a40 Add WebRTC liveview signaling and UI fallback
Introduce structured WebRTC handshake signaling and client-side fallbacks. Changes:

- machinery: replace HandleLiveHDHandshake channel to carry LiveHDHandshake (payload + signaling callbacks) and expose active WebRTC reader count in dashboard data.
- routers: MQTT and WebSocket handlers now send/receive LiveHDHandshake structs; websocket supports stream-hd and webrtc-candidate messages and uses callback-based signaling to reply over the WS connection.
- webrtc: add helper functions to send MQTT or callback answers/candidates, adapt InitializeWebRTCConnection to the new handshake type, and expose GetActivePeerConnectionCount.
- utils: minor GetMediaFormatted filtering fix and unit test for timestamp range behavior.
- ui: Dashboard gains native WebRTC liveview with fallback to SD image stream, shows active listener count, and handles signaling/candidates; Media page adds datetime range filters, infinite-scroll append behavior, and styles; reducer/action updates to support appending events; package.json scripts disable ESLint plugin during start/build/test.

These changes enable browser-based HD liveviews with dual signaling paths (websocket callbacks or MQTT), improve media filtering, and provide graceful fallback to SD streaming when WebRTC fails.
2026-03-09 21:10:18 +01:00
Cédric Verstraeten
4efc80fecb Enhance WebRTC signaling robustness
Increase HD handshake channel buffer and harden signaling flow: enlarge HandleLiveHDHandshake buffer from 10 to 100 and add a nil-check to drop and log requests when the channel is not initialized. Add publishSignalingMessageAsync to publish MQTT messages with timeout and error logging, and replace blocking Publish().Wait() calls for ICE candidates and SDP answers with the async publisher. Reintroduce the remote-candidate processor goroutine after remote description handling to avoid AddICECandidate races. These changes reduce blocking, improve error handling, and make WebRTC/MQTT signaling more resilient.
2026-03-09 20:05:00 +01:00
Cédric Verstraeten
4fbee60e9f Merge pull request #261 from kerberos-io/feature/add-webrtc-aac-transcoder
feature/add-webrtc-aac-transcoder
2026-03-09 17:46:17 +01:00
Cédric Verstraeten
d6c25df280 Add missing imports for strconv and strings in AAC transcoder stub 2026-03-09 16:42:42 +00:00
Cédric Verstraeten
72a2d28e1e Update aac_transcoder_stub.go 2026-03-09 17:41:54 +01:00
Cédric Verstraeten
eb0972084f Implement AAC transcoding for WebRTC using FFmpeg; update Dockerfiles and launch configuration 2026-03-09 16:34:52 +00:00
Cédric Verstraeten
41a1d221fc Merge pull request #260 from kerberos-io/fix/set-clean-state
fix/set-clean-state
2026-03-09 16:56:36 +01:00
Cédric Verstraeten
eaacc93d2f Set MQTT clean session to true and disable resume subscriptions 2026-03-09 15:50:40 +00:00
Cédric Verstraeten
0e6a004c23 Merge pull request #259 from kerberos-io/fix/add-grace-period
feature/add-broadcasting-feature
2026-03-09 16:20:39 +01:00
Cédric Verstraeten
617f854534 Merge branch 'master' into fix/add-grace-period 2026-03-09 16:17:35 +01:00
Cédric Verstraeten
1bf8006055 Refactor WebRTC handling to use per-peer broadcasters for video and audio tracks 2026-03-09 15:12:01 +00:00
Cédric Verstraeten
ca0e426382 Add max signaling age constant and discard stale WebRTC messages 2026-03-09 14:50:00 +00:00
Cédric Verstraeten
726d0722d9 Merge pull request #258 from kerberos-io/fix/add-grace-period
fix/add-grace-period
2026-03-09 15:20:53 +01:00
Cédric Verstraeten
d8f320b040 Add disconnect grace period handling in WebRTC connection manager 2026-03-09 14:15:50 +00:00
Cédric Verstraeten
0131b87692 Merge pull request #257 from kerberos-io/security/middleware-exposure
security/middleware-exposure
2026-03-09 14:18:11 +01:00
Cédric Verstraeten
54e8198b65 Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-09 14:18:00 +01:00
Cédric Verstraeten
3bfb68f950 Update port configuration and secure routes with JWT authentication middleware 2026-03-09 12:42:05 +00:00
Cédric Verstraeten
c05e59c936 Merge pull request #255 from kerberos-io/feature/improve-mqtt-concurrency
feature/improve-mqtt-concurrency
2026-03-09 13:25:26 +01:00
Cédric Verstraeten
b42d63b668 Enhance WebRTC packet processing for improved latency handling and keyframe synchronization 2026-03-09 12:17:38 +00:00
Cédric Verstraeten
0ca007e424 Refactor session key usage in ConnectionManager and enhance candidate queuing 2026-03-09 12:09:22 +00:00
Cédric Verstraeten
229d085de7 Merge pull request #253 from kerberos-io/fix/mqtt-reconnection
fix/mqtt-reconnection
2026-03-09 12:43:42 +01:00
Cédric Verstraeten
30e2b8318d Refactor build workflow to support multi-architecture builds and enhance MQTT connection handling 2026-03-09 11:40:24 +00:00
Cédric Verstraeten
dbcf4e242c Enhance MQTT reconnection handling and improve WebRTC connection cleanup
- Enable automatic reconnection for MQTT with configurable intervals and timeouts.
- Add logging for connection loss and reconnection attempts.
- Refactor WebRTC connection cleanup to ensure proper resource management on disconnection.
- Improve event handling in ImageCanvas and Dashboard components for better performance and reliability.
2026-03-09 11:04:10 +00:00
Cédric Verstraeten
ccf4034cc8 Merge pull request #252 from kerberos-io/fix/close-mp4-after-started
fix/close-mp4-after-started
2026-03-03 15:21:12 +01:00
Cédric Verstraeten
a34836e8f4 Delay MP4 creation until the first keyframe is received to ensure valid recordings 2026-03-03 14:16:39 +00:00
Cédric Verstraeten
dd1464d1be Fix recording closure condition to ensure it only triggers after recording has started 2026-03-03 14:03:11 +00:00
Cédric Verstraeten
2c02e0aeb1 Merge pull request #250 from kerberos-io/fix/add-avc-description-fallback
fix/add-avc-description-fallback
2026-02-27 11:48:34 +01:00
cedricve
d5464362bb Add AVC descriptor fallback for SPS parse errors
When setting the AVC descriptor fails in MP4.Close(), attempt a fallback that constructs an AvcC/avc1 sample entry from available SPS/PPS NALUs. Adds github.com/Eyevinn/mp4ff/avc import and two helpers: addAVCDescriptorFallback (builds a visual sample entry, sets tkhd width/height if available, and inserts it into stsd) and buildAVCDecConfRecFromSPS (creates an avc.DecConfRec from SPS/PPS bytes by extracting profile/compat/level and filling defaults). Logs errors and warns when the fallback is used. This provides resilience against SPS parsing errors when writing the MP4 track descriptor.
2026-02-27 11:35:22 +01:00
26 changed files with 2352 additions and 412 deletions

View File

@@ -7,61 +7,34 @@ env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-24.04
build:
runs-on: ${{ matrix.runner }}
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
include:
- architecture: amd64
runner: ubuntu-24.04
dockerfile: Dockerfile
- architecture: arm64
runner: ubuntu-24.04-arm
dockerfile: Dockerfile.arm64
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
build-arm64:
runs-on: ubuntu-24.04-arm
permissions:
contents: write
strategy:
matrix:
architecture: [arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
docker build -t ${{ matrix.architecture }} -f ${{ matrix.dockerfile }} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}

View File

@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -95,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -518,21 +518,8 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): missing SPS/PPS at recording start")
}
// Create a video file, and set the dimensions.
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
// Create the MP4 only once the first keyframe arrives.
var mp4Video *video.MP4
for cursorError == nil {
@@ -551,7 +538,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
default:
}
if (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
if start && (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
log.Log.Info("capture.main.HandleRecordStream(motiondetection): timestamp+postRecording-now < 0 - " + strconv.FormatInt(motionTimestamp+postRecording-now, 10) + " < 0")
log.Log.Info("capture.main.HandleRecordStream(motiondetection): now-startRecording > maxRecordingPeriod-500 - " + strconv.FormatInt(now-startRecording, 10) + " > " + strconv.FormatInt(maxRecordingPeriod-500, 10))
log.Log.Info("capture.main.HandleRecordStream(motiondetection): closing recording (timestamp: " + strconv.FormatInt(motionTimestamp, 10) + ", postRecording: " + strconv.FormatInt(postRecording, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
@@ -561,20 +548,44 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// We start the recording if we have a keyframe and the last duration is 0 or less than the current packet time.
// It could be start we start from the beginning of the recording.
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): write frames")
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): recording started on keyframe")
// Align duration timers with the first keyframe.
startRecording = pkt.CurrentTime
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
start = true
}
if start {
pts := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add video sample")
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.IsAudio {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add audio sample")
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
@@ -592,6 +603,11 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// This is used to determine if we need to start a new recording.
lastRecordingTime = pkt.CurrentTime
if mp4Video == nil {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): recording closed without keyframe; no MP4 created")
continue
}
// This will close the recording and write the last packet.
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs

View File

@@ -800,17 +800,19 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
// Create per-peer broadcasters instead of shared tracks.
// Each viewer gets its own track with independent, non-blocking writes
// so a slow/congested peer cannot stall the others.
streams, _ := rtspClient.GetStreams()
videoTrack := webrtc.NewVideoTrack(streams)
audioTrack := webrtc.NewAudioTrack(streams)
videoBroadcaster := webrtc.NewVideoBroadcaster(streams)
audioBroadcaster := webrtc.NewAudioBroadcaster(streams)
if videoTrack == nil && audioTrack == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio tracks")
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio broadcasters")
return
}
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, rtspClient)
if config.Capture.ForwardWebRTC == "true" {
@@ -818,7 +820,7 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Info("cloud.HandleLiveStreamHD(): Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("cloud.HandleLiveStreamHD(): setting up a peer connection.")
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, handshake)
}
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/packets"
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
"github.com/tevino/abool"
)
@@ -303,7 +304,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}
// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10)
communication.HandleLiveHDHandshake = make(chan models.LiveHDHandshake, 100)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
@@ -552,6 +553,11 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
// The total number of recordings stored in the directory.
recordingDirectory := configDirectory + "/data/recordings"
numberOfRecordings := utils.NumberOfMP4sInDirectory(recordingDirectory)
activeWebRTCReaders := webrtc.GetActivePeerConnectionCount()
pendingWebRTCHandshakes := 0
if communication.HandleLiveHDHandshake != nil {
pendingWebRTCHandshakes = len(communication.HandleLiveHDHandshake)
}
// All days stored in this agent.
days := []string{}
@@ -574,6 +580,8 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
"cameraOnline": cameraIsOnline,
"cloudOnline": cloudIsOnline,
"numberOfRecordings": numberOfRecordings,
"webrtcReaders": activeWebRTCReaders,
"webrtcPending": pendingWebRTCHandshakes,
"days": days,
"latestEvents": latestEvents,
})

View File

@@ -8,6 +8,17 @@ import (
"github.com/tevino/abool"
)
type LiveHDSignalingCallbacks struct {
SendAnswer func(sessionID string, sdp string) error
SendCandidate func(sessionID string, candidate string) error
SendError func(sessionID string, message string) error
}
type LiveHDHandshake struct {
Payload RequestHDStreamPayload
Signaling *LiveHDSignalingCallbacks
}
// The communication struct that is managing
// all the communication between the different goroutines.
type Communication struct {
@@ -27,7 +38,7 @@ type Communication struct {
HandleHeartBeat chan string
HandleLiveSD chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan RequestHDStreamPayload
HandleLiveHDHandshake chan LiveHDHandshake
HandleLiveHDPeers chan string
HandleONVIF chan OnvifAction
IsConfiguring *abool.AtomicBool

View File

@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.GET("/config", func(c *gin.Context) {
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.POST("/config", func(c *gin.Context) {
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
api := r.Group("/api")
{
// Public endpoints (no authentication required)
api.POST("/login", authMiddleware.LoginHandler)
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods. Doesn't require any authorization.
// These are available for anyone, but require the agent, to reach
// the camera.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods. Doesn't require any authorization.
// Will verify the current onvif settings.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
// Secured endpoints..
// Apply JWT authentication middleware.
// All routes registered below this line require a valid JWT token.
api.Use(authMiddleware.MiddlewareFunc())
{
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
}
}
return api

View File

@@ -90,10 +90,32 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(true)
//opts.SetCleanSession(true)
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
opts.SetStore(mqtt.NewMemoryStore())
opts.SetConnectRetry(true)
//opts.SetAutoReconnect(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetMaxReconnectInterval(1 * time.Minute)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetWriteTimeout(10 * time.Second)
opts.SetOrderMatters(false)
opts.SetConnectTimeout(30 * time.Second)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
if err != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error())
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost")
}
})
opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker")
})
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): MQTT session is online")
})
hubKey := ""
// This is the old way ;)
@@ -133,10 +155,14 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
}
}
mqc := mqtt.NewClient(opts)
if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) {
if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): initial MQTT connection established")
}
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection")
}
return mqc
}
@@ -144,12 +170,18 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
return nil
}
// maxSignalingAge is the maximum age of a WebRTC signaling message (request-hd-stream,
// receive-hd-candidates) before it is considered stale and discarded. With CleanSession=false
// the MQTT broker may replay queued messages from previous sessions; this prevents the agent
// from setting up peer connections for viewers that are no longer waiting.
const maxSignalingAge = 30 * time.Second
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey)
mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
// Decode the message, we are expecting following format.
// {
@@ -249,6 +281,18 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
// We'll find out which message we received, and act accordingly.
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): received message with action: " + payload.Action)
// For time-sensitive WebRTC signaling messages, discard stale ones that may
// have been queued by the broker while CleanSession=false.
if payload.Action == "request-hd-stream" || payload.Action == "receive-hd-candidates" {
messageAge := time.Since(time.Unix(message.Timestamp, 0))
if messageAge > maxSignalingAge {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): discarding stale " + payload.Action +
" message (age: " + messageAge.Round(time.Second).String() + ")")
return
}
}
switch payload.Action {
case "record":
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
@@ -276,6 +320,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
}
})
if token.WaitTimeout(10 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener)
}
} else {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener)
}
}
}
@@ -484,9 +538,13 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models
if communication.CameraConnected {
// Set the Hub key, so we can send back the answer.
requestHDStreamPayload.HubKey = hubKey
select {
case communication.HandleLiveHDHandshake <- requestHDStreamPayload:
default:
if communication.HandleLiveHDHandshake == nil {
log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request")
return
}
communication.HandleLiveHDHandshake <- models.LiveHDHandshake{
Payload: requestHDStreamPayload,
}
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.")
} else {

View File

@@ -6,6 +6,7 @@ import (
"image"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@@ -14,6 +15,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
)
type Message struct {
@@ -28,6 +30,23 @@ type Connection struct {
Cancels map[string]context.CancelFunc
}
func writeWebRTCError(connection *Connection, clientID string, sessionID string, errorMessage string) {
if connection == nil {
return
}
if err := connection.WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-error",
Message: map[string]string{
"session_id": sessionID,
"message": errorMessage,
},
}); err != nil {
log.Log.Error("routers.websocket.main.writeWebRTCError(): " + err.Error())
}
}
// Concurrency handling - sending messages
func (c *Connection) WriteJson(message Message) error {
c.mu.Lock()
@@ -115,6 +134,82 @@ func WebsocketHandler(c *gin.Context, configuration *models.Configuration, commu
go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice)
}
}
case "stream-hd":
sessionID := message.Message["session_id"]
sessionDescription := message.Message["sdp"]
if sessionID == "" || sessionDescription == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or sdp")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
if communication.HandleLiveHDHandshake == nil {
writeWebRTCError(sockets[clientID], clientID, sessionID, "webrtc liveview is not available")
break
}
handshake := models.LiveHDHandshake{
Payload: models.RequestHDStreamPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
SessionDescription: sessionDescription,
},
Signaling: &models.LiveHDSignalingCallbacks{
SendAnswer: func(callbackSessionID string, sdp string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-answer",
Message: map[string]string{
"session_id": callbackSessionID,
"sdp": sdp,
},
})
},
SendCandidate: func(callbackSessionID string, candidate string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-candidate",
Message: map[string]string{
"session_id": callbackSessionID,
"candidate": candidate,
},
})
},
SendError: func(callbackSessionID string, errorMessage string) error {
writeWebRTCError(sockets[clientID], clientID, callbackSessionID, errorMessage)
return nil
},
},
}
communication.HandleLiveHDHandshake <- handshake
case "webrtc-candidate":
sessionID := message.Message["session_id"]
candidate := message.Message["candidate"]
if sessionID == "" || candidate == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or candidate")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
key := configuration.Config.Key + "/" + sessionID
go webrtc.RegisterCandidates(key, models.ReceiveHDCandidatesPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
Candidate: candidate,
})
}
err = conn.ReadJSON(&message)

View File

@@ -27,7 +27,8 @@ import (
// VERSION is the agent version. It defaults to "0.0.0" for local dev builds
// and is overridden at build time via:
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
//
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
var VERSION = "0.0.0"
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@@ -198,6 +199,13 @@ func GetMediaFormatted(files []os.FileInfo, recordingDirectory string, configura
timestampInt, err := strconv.ParseInt(timestamp, 10, 64)
if err == nil {
if eventFilter.TimestampOffsetStart > 0 {
// TimestampOffsetStart represents the newest lower bound to include.
if timestampInt < eventFilter.TimestampOffsetStart {
continue
}
}
// If we have an offset we will check if we should skip or not
if eventFilter.TimestampOffsetEnd > 0 {
// Medias are sorted from new to older. TimestampOffsetEnd holds the oldest

View File

@@ -0,0 +1,54 @@
package utils
import (
"os"
"testing"
"time"
"github.com/kerberos-io/agent/machinery/src/models"
)
type stubFileInfo struct {
name string
}
func (s stubFileInfo) Name() string { return s.name }
func (s stubFileInfo) Size() int64 { return 0 }
func (s stubFileInfo) Mode() os.FileMode { return 0 }
func (s stubFileInfo) ModTime() time.Time { return time.Unix(0, 0) }
func (s stubFileInfo) IsDir() bool { return false }
func (s stubFileInfo) Sys() interface{} { return nil }
func TestGetMediaFormattedHonorsTimestampRange(t *testing.T) {
configuration := &models.Configuration{}
configuration.Config.Timezone = "UTC"
configuration.Config.Name = "Front Door"
configuration.Config.Key = "camera-1"
files := []os.FileInfo{
stubFileInfo{name: "1700000200_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000100_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000000_6_7_8_9_10.mp4"},
}
media := GetMediaFormatted(files, "/tmp/recordings", configuration, models.EventFilter{
TimestampOffsetStart: 1700000050,
TimestampOffsetEnd: 1700000200,
NumberOfElements: 10,
})
if len(media) != 1 {
t.Fatalf("expected 1 media item in time range, got %d", len(media))
}
if media[0].Timestamp != "1700000100" {
t.Fatalf("expected timestamp 1700000100, got %s", media[0].Timestamp)
}
if media[0].CameraName != "Front Door" {
t.Fatalf("expected camera name to be preserved, got %s", media[0].CameraName)
}
if media[0].CameraKey != "camera-1" {
t.Fatalf("expected camera key to be preserved, got %s", media[0].CameraKey)
}
}

View File

@@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/Eyevinn/mp4ff/avc"
mp4ff "github.com/Eyevinn/mp4ff/mp4"
"github.com/kerberos-io/agent/machinery/src/encryption"
"github.com/kerberos-io/agent/machinery/src/log"
@@ -564,6 +565,11 @@ func (mp4 *MP4) Close(config *models.Config) {
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", spsNALUs, ppsNALUs, includePS)
if err != nil {
log.Log.Error("mp4.Close(): error setting AVC descriptor: " + err.Error())
if fallbackErr := addAVCDescriptorFallback(init.Moov.Traks[0], spsNALUs, ppsNALUs, uint16(mp4.width), uint16(mp4.height)); fallbackErr != nil {
log.Log.Error("mp4.Close(): error setting AVC descriptor fallback: " + fallbackErr.Error())
} else {
log.Log.Warning("mp4.Close(): AVC descriptor fallback used due to SPS parse error")
}
}
init.Moov.Traks[0].Tkhd.Duration = actualVideoDuration
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
@@ -952,6 +958,57 @@ func formatNaluDebug(nalus [][]byte) string {
return strings.Join(parts, "; ")
}
func addAVCDescriptorFallback(trak *mp4ff.TrakBox, spsNALUs, ppsNALUs [][]byte, width, height uint16) error {
if trak == nil || trak.Mdia == nil || trak.Mdia.Minf == nil || trak.Mdia.Minf.Stbl == nil || trak.Mdia.Minf.Stbl.Stsd == nil {
return fmt.Errorf("missing trak stsd")
}
if len(spsNALUs) == 0 {
return fmt.Errorf("no SPS NALU available")
}
decConfRec, err := buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs)
if err != nil {
return err
}
if width == 0 && trak.Tkhd != nil {
width = uint16(uint32(trak.Tkhd.Width) >> 16)
}
if height == 0 && trak.Tkhd != nil {
height = uint16(uint32(trak.Tkhd.Height) >> 16)
}
if width > 0 && height > 0 && trak.Tkhd != nil {
trak.Tkhd.Width = mp4ff.Fixed32(uint32(width) << 16)
trak.Tkhd.Height = mp4ff.Fixed32(uint32(height) << 16)
}
avcC := &mp4ff.AvcCBox{DecConfRec: *decConfRec}
avcx := mp4ff.CreateVisualSampleEntryBox("avc1", width, height, avcC)
trak.Mdia.Minf.Stbl.Stsd.AddChild(avcx)
return nil
}
func buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs [][]byte) (*avc.DecConfRec, error) {
if len(spsNALUs) == 0 {
return nil, fmt.Errorf("no SPS NALU available")
}
sps := spsNALUs[0]
if len(sps) < 4 {
return nil, fmt.Errorf("SPS too short: len=%d", len(sps))
}
// SPS NALU: byte 0 is NAL header, next 3 bytes are profile/compat/level.
dec := &avc.DecConfRec{
AVCProfileIndication: sps[1],
ProfileCompatibility: sps[2],
AVCLevelIndication: sps[3],
SPSnalus: spsNALUs,
PPSnalus: ppsNALUs,
ChromaFormat: 1,
BitDepthLumaMinus1: 0,
BitDepthChromaMinus1: 0,
NumSPSExt: 0,
NoTrailingInfo: true,
}
return dec, nil
}
// splitNALUs splits Annex B data into raw NAL units without start codes.
func splitNALUs(data []byte) [][]byte {
var nalus [][]byte

View File

@@ -0,0 +1,270 @@
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
// Build with: go build -tags ffmpeg ...
//
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
// and an AAC decoder compiled into the FFmpeg build (usually the default).
//
//go:build ffmpeg
package webrtc
/*
#cgo pkg-config: libavcodec libavutil libswresample
#cgo CFLAGS: -Wno-deprecated-declarations
#include <libavcodec/avcodec.h>
#include <libavutil/channel_layout.h>
#include <libavutil/frame.h>
#include <libavutil/mem.h>
#include <libavutil/opt.h>
#include <libswresample/swresample.h>
#include <stdlib.h>
#include <string.h>
// ── Transcoder handle ───────────────────────────────────────────────────
typedef struct {
AVCodecContext *codec_ctx;
AVCodecParserContext *parser;
SwrContext *swr_ctx;
AVFrame *frame;
AVPacket *pkt;
int swr_initialized;
int in_sample_rate;
int in_channels;
} aac_transcoder_t;
// ── Create / Destroy ────────────────────────────────────────────────────
static aac_transcoder_t* aac_transcoder_create(void) {
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
if (!codec) return NULL;
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
if (!t) return NULL;
t->codec_ctx = avcodec_alloc_context3(codec);
if (!t->codec_ctx) { free(t); return NULL; }
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->parser = av_parser_init(AV_CODEC_ID_AAC);
if (!t->parser) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->frame = av_frame_alloc();
t->pkt = av_packet_alloc();
if (!t->frame || !t->pkt) {
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
av_parser_close(t->parser);
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
return t;
}
static void aac_transcoder_destroy(aac_transcoder_t *t) {
if (!t) return;
if (t->swr_ctx) swr_free(&t->swr_ctx);
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
if (t->parser) av_parser_close(t->parser);
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
free(t);
}
// ── Lazy resampler init (called after the first decoded frame) ──────────
static int aac_init_swr(aac_transcoder_t *t) {
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
if (in_ch_layout == 0)
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
t->swr_ctx = swr_alloc_set_opts(
NULL,
AV_CH_LAYOUT_MONO, // out: mono
AV_SAMPLE_FMT_S16, // out: signed 16-bit
8000, // out: 8 kHz
in_ch_layout, // in: from decoder
t->codec_ctx->sample_fmt, // in: from decoder
t->codec_ctx->sample_rate, // in: from decoder
0, NULL);
if (!t->swr_ctx) return -1;
if (swr_init(t->swr_ctx) < 0) {
swr_free(&t->swr_ctx);
return -1;
}
t->in_sample_rate = t->codec_ctx->sample_rate;
t->in_channels = t->codec_ctx->channels;
t->swr_initialized = 1;
return 0;
}
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
// Caller must free *out_pcm with av_free() when non-NULL.
static int aac_transcode_to_pcm(aac_transcoder_t *t,
const uint8_t *data, int data_size,
uint8_t **out_pcm, int *out_size) {
*out_pcm = NULL;
*out_size = 0;
if (!data || data_size <= 0) return 0;
int buf_cap = 8192;
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
if (!buf) return -1;
int buf_len = 0;
while (data_size > 0) {
uint8_t *pout = NULL;
int pout_size = 0;
int used = av_parser_parse2(t->parser, t->codec_ctx,
&pout, &pout_size,
data, data_size,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
if (used < 0) break;
data += used;
data_size -= used;
if (pout_size == 0) continue;
// Feed parsed frame to decoder
t->pkt->data = pout;
t->pkt->size = pout_size;
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
// Pull all decoded frames
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
if (!t->swr_initialized) {
if (aac_init_swr(t) < 0) {
av_frame_unref(t->frame);
av_free(buf);
return -1;
}
}
int out_samples = swr_get_out_samples(t->swr_ctx,
t->frame->nb_samples);
if (out_samples <= 0) out_samples = t->frame->nb_samples;
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
if (needed > buf_cap) {
buf_cap = needed * 2;
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
buf = tmp;
}
uint8_t *dst = buf + buf_len;
int converted = swr_convert(t->swr_ctx,
&dst, out_samples,
(const uint8_t**)t->frame->extended_data,
t->frame->nb_samples);
if (converted > 0)
buf_len += converted * 2;
av_frame_unref(t->frame);
}
}
if (buf_len == 0) {
av_free(buf);
return 0;
}
*out_pcm = buf;
*out_size = buf_len;
return 0;
}
*/
import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/zaf/g711"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is compiled in (requires the "ffmpeg" build tag).
func AACTranscodingAvailable() bool { return true }
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
// and encodes it as G.711 µ-law for WebRTC transport.
type AACTranscoder struct {
handle *C.aac_transcoder_t
}
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
func NewAACTranscoder() (*AACTranscoder, error) {
h := C.aac_transcoder_create()
if h == nil {
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
}
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
return &AACTranscoder{handle: h}, nil
}
// Transcode converts an ADTS buffer (one or more AAC frames) into
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || t.handle == nil || len(adtsData) == 0 {
return nil, nil
}
var outPCM *C.uint8_t
var outSize C.int
ret := C.aac_transcode_to_pcm(
t.handle,
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
C.int(len(adtsData)),
&outPCM, &outSize,
)
if ret < 0 {
return nil, errors.New("AAC decode/resample failed")
}
if outSize == 0 || outPCM == nil {
return nil, nil // decoder buffering, no output yet
}
defer C.av_free(unsafe.Pointer(outPCM))
// Copy S16LE PCM to Go slice, then encode to µ-law.
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
ulaw := g711.EncodeUlaw(pcm)
// Log resampler details once.
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
log.Log.Info(fmt.Sprintf(
"webrtc.aac_transcoder: first output resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
// Prevent repeated logging by zeroing the field we check.
t.handle.in_sample_rate = 0
}
return ulaw, nil
}
// Close releases all FFmpeg resources held by the transcoder.
func (t *AACTranscoder) Close() {
if t != nil && t.handle != nil {
C.aac_transcoder_destroy(t.handle)
t.handle = nil
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
}
}

View File

@@ -0,0 +1,205 @@
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
//
//go:build !ffmpeg
package webrtc
import (
"bytes"
"errors"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/kerberos-io/agent/machinery/src/log"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is available in the current runtime.
func AACTranscodingAvailable() bool {
_, err := exec.LookPath("ffmpeg")
return err == nil
}
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
type AACTranscoder struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderrBuf bytes.Buffer
mu sync.Mutex
outMu sync.Mutex
outBuf bytes.Buffer
closed bool
closeOnce sync.Once
}
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
func NewAACTranscoder() (*AACTranscoder, error) {
ffmpegPath, err := exec.LookPath("ffmpeg")
if err != nil {
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
}
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
cmd := exec.Command(
ffmpegPath,
"-hide_banner",
"-loglevel", "error",
"-fflags", "+nobuffer",
"-flags", "low_delay",
"-f", "aac",
"-i", "pipe:0",
"-vn",
"-ac", "1",
"-ar", "8000",
"-acodec", "pcm_mulaw",
"-f", "mulaw",
"pipe:1",
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stderr = &bytes.Buffer{}
if err := cmd.Start(); err != nil {
return nil, err
}
t := &AACTranscoder{
cmd: cmd,
stdin: stdin,
stdout: stdout,
}
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
t.stderrBuf = *stderrBuf
}
go func() {
buf := make([]byte, 4096)
for {
n, readErr := stdout.Read(buf)
if n > 0 {
t.outMu.Lock()
_, _ = t.outBuf.Write(buf[:n])
buffered := t.outBuf.Len()
t.outMu.Unlock()
if buffered <= 8192 || buffered%16000 == 0 {
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
}
}
if readErr != nil {
if readErr != io.EOF {
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
}
return
}
}
}()
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
return t, nil
}
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || len(adtsData) == 0 {
return nil, nil
}
t.mu.Lock()
defer t.mu.Unlock()
if t.closed {
return nil, errors.New("AAC transcoder is closed")
}
if _, err := t.stdin.Write(adtsData); err != nil {
return nil, err
}
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
}
deadline := time.Now().Add(75 * time.Millisecond)
for {
data := t.readAvailable()
if len(data) > 0 {
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
return data, nil
}
if time.Now().After(deadline) {
if stderr := t.stderrString(); stderr != "" {
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
} else {
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
}
return nil, nil
}
time.Sleep(5 * time.Millisecond)
}
}
func (t *AACTranscoder) readAvailable() []byte {
t.outMu.Lock()
defer t.outMu.Unlock()
if t.outBuf.Len() == 0 {
return nil
}
out := make([]byte, t.outBuf.Len())
copy(out, t.outBuf.Bytes())
t.outBuf.Reset()
return out
}
func (t *AACTranscoder) stderrString() string {
if t == nil {
return ""
}
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
return strings.TrimSpace(stderrBuf.String())
}
return strings.TrimSpace(t.stderrBuf.String())
}
// Close stops the ffmpeg subprocess.
func (t *AACTranscoder) Close() {
if t == nil {
return
}
t.closeOnce.Do(func() {
t.mu.Lock()
t.closed = true
if t.stdin != nil {
_ = t.stdin.Close()
}
t.mu.Unlock()
if t.stdout != nil {
_ = t.stdout.Close()
}
if t.cmd != nil {
_ = t.cmd.Process.Kill()
_, _ = t.cmd.Process.Wait()
if stderr := t.stderrString(); stderr != "" {
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
}
}
})
}

View File

@@ -0,0 +1,137 @@
package webrtc
import (
"io"
"sync"
"github.com/kerberos-io/agent/machinery/src/log"
pionWebRTC "github.com/pion/webrtc/v4"
pionMedia "github.com/pion/webrtc/v4/pkg/media"
)
const (
// peerSampleBuffer controls how many samples can be buffered per peer before
// dropping. Keeps slow peers from blocking the broadcaster.
peerSampleBuffer = 60
)
// peerTrack is a per-peer track with its own non-blocking sample channel.
type peerTrack struct {
track *pionWebRTC.TrackLocalStaticSample
samples chan pionMedia.Sample
done chan struct{}
}
// TrackBroadcaster fans out media samples to multiple peer-specific tracks
// without blocking. Each peer gets its own TrackLocalStaticSample and a
// goroutine that drains samples independently, so a slow/congested peer
// cannot stall the others.
type TrackBroadcaster struct {
mu sync.RWMutex
peers map[string]*peerTrack
mimeType string
id string
streamID string
}
// NewTrackBroadcaster creates a new broadcaster for either video or audio.
func NewTrackBroadcaster(mimeType string, id string, streamID string) *TrackBroadcaster {
return &TrackBroadcaster{
peers: make(map[string]*peerTrack),
mimeType: mimeType,
id: id,
streamID: streamID,
}
}
// AddPeer creates a new per-peer track and starts a writer goroutine.
// Returns the track to be added to the PeerConnection via AddTrack().
func (b *TrackBroadcaster) AddPeer(sessionKey string) (*pionWebRTC.TrackLocalStaticSample, error) {
track, err := pionWebRTC.NewTrackLocalStaticSample(
pionWebRTC.RTPCodecCapability{MimeType: b.mimeType},
b.id,
b.streamID,
)
if err != nil {
return nil, err
}
pt := &peerTrack{
track: track,
samples: make(chan pionMedia.Sample, peerSampleBuffer),
done: make(chan struct{}),
}
b.mu.Lock()
b.peers[sessionKey] = pt
b.mu.Unlock()
// Per-peer writer goroutine — drains samples independently.
go func() {
defer close(pt.done)
for sample := range pt.samples {
if err := pt.track.WriteSample(sample); err != nil {
if err == io.ErrClosedPipe {
return
}
log.Log.Error("webrtc.broadcaster.peerWriter(): error writing sample for " + sessionKey + ": " + err.Error())
}
}
}()
log.Log.Info("webrtc.broadcaster.AddPeer(): added peer track for " + sessionKey)
return track, nil
}
// RemovePeer stops the writer goroutine and removes the peer.
func (b *TrackBroadcaster) RemovePeer(sessionKey string) {
b.mu.Lock()
pt, exists := b.peers[sessionKey]
if exists {
delete(b.peers, sessionKey)
}
b.mu.Unlock()
if exists {
close(pt.samples)
<-pt.done // wait for writer goroutine to finish
log.Log.Info("webrtc.broadcaster.RemovePeer(): removed peer track for " + sessionKey)
}
}
// WriteSample fans out a sample to all connected peers without blocking.
// If a peer's buffer is full (slow consumer), the sample is dropped for
// that peer only — other peers are unaffected.
func (b *TrackBroadcaster) WriteSample(sample pionMedia.Sample) {
b.mu.RLock()
defer b.mu.RUnlock()
for sessionKey, pt := range b.peers {
select {
case pt.samples <- sample:
default:
log.Log.Warning("webrtc.broadcaster.WriteSample(): dropping sample for slow peer " + sessionKey)
}
}
}
// PeerCount returns the current number of connected peers.
func (b *TrackBroadcaster) PeerCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.peers)
}
// Close removes all peers and stops all writer goroutines.
func (b *TrackBroadcaster) Close() {
b.mu.Lock()
keys := make([]string, 0, len(b.peers))
for k := range b.peers {
keys = append(keys, k)
}
b.mu.Unlock()
for _, key := range keys {
b.RemovePeer(key)
}
}

View File

@@ -4,13 +4,14 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
//"github.com/izern/go-fdkaac/fdkaac"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
@@ -29,8 +30,10 @@ const (
rtcpBufferSize = 1500
// Timeouts and intervals
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
maxLivePacketAge = 1500 * time.Millisecond
disconnectGracePeriod = 5 * time.Second
// Track identifiers
trackStreamID = "kerberos-stream"
@@ -46,10 +49,16 @@ type ConnectionManager struct {
// peerConnectionWrapper wraps a peer connection with additional metadata
type peerConnectionWrapper struct {
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
connected atomic.Bool
disconnectMu sync.Mutex
disconnectTimer *time.Timer
sessionKey string
videoBroadcaster *TrackBroadcaster
audioBroadcaster *TrackBroadcaster
}
var globalConnectionManager = NewConnectionManager()
@@ -88,22 +97,41 @@ func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
}
// AddPeerConnection adds a peer connection to the manager
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
func (cm *ConnectionManager) AddPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.peerConnections[sessionID] = wrapper
cm.peerConnections[sessionKey] = wrapper
}
// RemovePeerConnection removes a peer connection from the manager
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
func (cm *ConnectionManager) RemovePeerConnection(sessionKey string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if wrapper, exists := cm.peerConnections[sessionID]; exists {
if wrapper, exists := cm.peerConnections[sessionKey]; exists {
if wrapper.cancelCtx != nil {
wrapper.cancelCtx()
}
delete(cm.peerConnections, sessionID)
delete(cm.peerConnections, sessionKey)
}
}
// QueueCandidate safely queues a candidate for a session without racing with channel closure.
func (cm *ConnectionManager) QueueCandidate(sessionKey string, candidate string) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
ch, exists := cm.candidateChannels[sessionKey]
if !exists {
ch = make(chan string, candidateChannelBuffer)
cm.candidateChannels[sessionKey] = ch
}
select {
case ch <- candidate:
return true
default:
return false
}
}
@@ -112,6 +140,11 @@ func (cm *ConnectionManager) GetPeerConnectionCount() int64 {
return atomic.LoadInt64(&cm.peerConnectionCount)
}
// GetActivePeerConnectionCount returns the current number of connected WebRTC readers.
func GetActivePeerConnectionCount() int64 {
return globalConnectionManager.GetPeerConnectionCount()
}
// IncrementPeerCount atomically increments the peer connection count
func (cm *ConnectionManager) IncrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, 1)
@@ -122,6 +155,35 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, -1)
}
func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
wrapper.closeOnce.Do(func() {
if wrapper.connected.Swap(false) {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
}
// Remove per-peer tracks from broadcasters so the fan-out stops
// writing to this peer immediately.
if wrapper.videoBroadcaster != nil {
wrapper.videoBroadcaster.RemovePeer(sessionKey)
}
if wrapper.audioBroadcaster != nil {
wrapper.audioBroadcaster.RemovePeer(sessionKey)
}
globalConnectionManager.CloseCandidateChannel(sessionKey)
if wrapper.conn != nil {
if err := wrapper.conn.Close(); err != nil {
log.Log.Error("webrtc.main.cleanupPeerConnection(): error closing peer connection: " + err.Error())
}
}
globalConnectionManager.RemovePeerConnection(sessionKey)
close(wrapper.done)
})
}
type WebRTC struct {
Name string
StunServers []string
@@ -161,16 +223,27 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
}
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
select {
case ch <- candidate.Candidate:
default:
if !globalConnectionManager.QueueCandidate(key, candidate.Candidate) {
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
}
}
func decodeICECandidate(candidate string) (pionWebRTC.ICECandidateInit, error) {
if candidate == "" {
return pionWebRTC.ICECandidateInit{}, io.EOF
}
var candidateInit pionWebRTC.ICECandidateInit
if err := json.Unmarshal([]byte(candidate), &candidateInit); err == nil {
if candidateInit.Candidate != "" {
return candidateInit, nil
}
}
return pionWebRTC.ICECandidateInit{Candidate: candidate}, nil
}
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
if err := pionWebRTC.ConfigureNack(mediaEngine, interceptorRegistry); err != nil {
return err
@@ -184,7 +257,79 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
func publishSignalingMessageAsync(mqttClient mqtt.Client, topic string, payload []byte, description string) {
if mqttClient == nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): mqtt client is nil for " + description)
return
}
token := mqttClient.Publish(topic, 2, false, payload)
go func() {
if !token.WaitTimeout(5 * time.Second) {
log.Log.Warning("webrtc.main.publishSignalingMessageAsync(): timed out publishing " + description)
return
}
if err := token.Error(); err != nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): failed publishing " + description + ": " + err.Error())
}
}()
}
func sendCandidateSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, candidateJSON []byte) {
if handshake.Signaling != nil && handshake.Signaling.SendCandidate != nil {
if err := handshake.Signaling.SendCandidate(handshake.Payload.SessionID, string(candidateJSON)); err != nil {
log.Log.Error("webrtc.main.sendCandidateSignal(): " + err.Error())
}
return
}
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-candidates",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"candidate": string(candidateJSON),
"session_id": handshake.Payload.SessionID,
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "ICE candidate for session "+handshake.Payload.SessionID)
} else {
log.Log.Info("webrtc.main.sendCandidateSignal(): while packaging mqtt message: " + err.Error())
}
}
func sendAnswerSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, answer pionWebRTC.SessionDescription) {
encodedAnswer := base64.StdEncoding.EncodeToString([]byte(answer.SDP))
if handshake.Signaling != nil && handshake.Signaling.SendAnswer != nil {
if err := handshake.Signaling.SendAnswer(handshake.Payload.SessionID, encodedAnswer); err != nil {
log.Log.Error("webrtc.main.sendAnswerSignal(): " + err.Error())
}
return
}
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"sdp": []byte(encodedAnswer),
"session_id": handshake.Payload.SessionID,
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "SDP answer for session "+handshake.Payload.SessionID)
} else {
log.Log.Info("webrtc.main.sendAnswerSignal(): while packaging mqtt message: " + err.Error())
}
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.LiveHDHandshake) {
config := configuration.Config
deviceKey := config.Key
@@ -192,14 +337,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
turnServers := []string{config.TURNURI}
turnServersUsername := config.TURNUsername
turnServersCredential := config.TURNPassword
handshakePayload := handshake.Payload
// We create a channel which will hold the candidates for this session.
sessionKey := config.Key + "/" + handshake.SessionID
sessionKey := config.Key + "/" + handshakePayload.SessionID
candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey)
// Set variables
hubKey := handshake.HubKey
sessionDescription := handshake.SessionDescription
hubKey := handshakePayload.HubKey
sessionDescription := handshakePayload.SessionDescription
// Create WebRTC object
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
@@ -264,16 +410,27 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Create context for this connection
ctx, cancel := context.WithCancel(context.Background())
wrapper := &peerConnectionWrapper{
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
sessionKey: sessionKey,
videoBroadcaster: videoBroadcaster,
audioBroadcaster: audioBroadcaster,
}
// Create a per-peer video track from the broadcaster so writes
// to this peer are independent and non-blocking.
var videoSender *pionWebRTC.RTPSender = nil
if videoTrack != nil {
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
if videoBroadcaster != nil {
peerVideoTrack, trackErr := videoBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer video track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if videoSender, err = peerConnection.AddTrack(peerVideoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
cancel()
cleanupPeerConnection(sessionKey, wrapper)
return
}
} else {
@@ -302,11 +459,18 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Create a per-peer audio track from the broadcaster.
var audioSender *pionWebRTC.RTPSender = nil
if audioTrack != nil {
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
if audioBroadcaster != nil {
peerAudioTrack, trackErr := audioBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer audio track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if audioSender, err = peerConnection.AddTrack(peerAudioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
cancel()
cleanupPeerConnection(sessionKey, wrapper)
return
}
} else {
@@ -335,71 +499,71 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Log ICE connection state changes for diagnostics
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
" (session: " + handshakePayload.SessionID + ")")
})
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
" (session: " + handshakePayload.SessionID + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
wrapper.closeOnce.Do(func() {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
// Clean up resources
globalConnectionManager.CloseCandidateChannel(sessionKey)
if err := peerConnection.Close(); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
}
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
close(wrapper.done)
})
case pionWebRTC.PeerConnectionStateConnected:
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
case pionWebRTC.PeerConnectionStateDisconnected:
// Disconnected is a transient state that can recover.
// Start a grace period timer; if we don't recover, then cleanup.
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
disconnectGracePeriod.String() + " for recovery (session: " + handshakePayload.SessionID + ")")
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshakePayload.SessionID + ")")
cleanupPeerConnection(sessionKey, wrapper)
})
}
wrapper.disconnectMu.Unlock()
case pionWebRTC.PeerConnectionStateFailed:
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateClosed:
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateConnected:
// Cancel any pending disconnect timer — connection recovered
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshakePayload.SessionID + ")")
}
wrapper.disconnectMu.Unlock()
if wrapper.connected.CompareAndSwap(false, true) {
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
}
}
})
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
}()
// Iterate over the candidates and send them to the remote client
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
}
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate
// The other peer will add this candidate by calling AddICECandidate.
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
var hasRelayCandidates bool
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
@@ -444,63 +608,115 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
candateBinary, err := json.Marshal(candidateJSON)
if err == nil {
valueMap["candidate"] = string(candateBinary)
// SDP is not needed to be send..
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
valueMap["session_id"] = handshakePayload.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
} else {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error())
}
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-candidates",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
sendCandidateSignal(configuration, mqttClient, hubKey, handshake, candateBinary)
})
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
if err == nil {
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshakePayload.SessionID)
}()
// Process remote candidates only after the remote description is set.
// MQTT can deliver candidates before the SDP offer handling completes,
// and Pion rejects AddICECandidate calls until SetRemoteDescription succeeds.
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
sendAnswerSignal(configuration, mqttClient, hubKey, handshake, answer)
}
} else {
log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error())
globalConnectionManager.CloseCandidateChannel(sessionKey)
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to decode remote session description: " + err.Error())
}
}
func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
// Verify H264 is available (same check as NewVideoTrack)
for _, s := range streams {
if s.Name == "H264" {
return NewTrackBroadcaster(pionWebRTC.MimeTypeH264, "video", trackStreamID)
}
}
log.Log.Error("webrtc.main.NewVideoBroadcaster(): no H264 stream found")
return nil
}
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
var audioCodecNames []string
hasAAC := false
for _, s := range streams {
if s.IsAudio {
audioCodecNames = append(audioCodecNames, s.Name)
}
switch s.Name {
case "OPUS":
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
case "PCM_MULAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
case "PCM_ALAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
case "AAC":
hasAAC = true
}
}
if hasAAC {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): AAC detected, creating PCMU audio track for transcoded output")
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioBroadcaster(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
} else {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): no audio stream found in camera feed")
}
return nil
}
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
mimeType := pionWebRTC.MimeTypeH264
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
@@ -513,18 +729,33 @@ func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
var mimeType string
var audioCodecNames []string
hasAAC := false
for _, stream := range streams {
if stream.IsAudio {
audioCodecNames = append(audioCodecNames, stream.Name)
}
if stream.Name == "OPUS" {
mimeType = pionWebRTC.MimeTypeOpus
} else if stream.Name == "PCM_MULAW" {
mimeType = pionWebRTC.MimeTypePCMU
} else if stream.Name == "PCM_ALAW" {
mimeType = pionWebRTC.MimeTypePCMA
} else if stream.Name == "AAC" {
hasAAC = true
}
}
if mimeType == "" {
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
return nil
if hasAAC {
mimeType = pionWebRTC.MimeTypePCMU
log.Log.Info("webrtc.main.NewAudioTrack(): AAC detected, creating PCMU audio track for transcoded output")
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioTrack(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
return nil
} else {
log.Log.Info("webrtc.main.NewAudioTrack(): no audio stream found in camera feed")
return nil
}
}
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
if err != nil {
@@ -539,9 +770,15 @@ type streamState struct {
lastKeepAlive int64
peerCount int64
start bool
catchingUp bool
receivedKeyFrame bool
lastAudioSample *pionMedia.Sample
lastVideoSample *pionMedia.Sample
audioPacketsSeen int64
aacPacketsSeen int64
audioSamplesSent int64
aacNoOutput int64
aacErrors int64
}
// codecSupport tracks which codecs are available in the stream
@@ -613,23 +850,54 @@ func updateStreamState(communication *models.Communication, state *streamState)
}
// writeFinalSamples writes any remaining buffered samples
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
if state.lastVideoSample != nil && videoTrack != nil {
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
}
func writeFinalSamples(state *streamState, videoBroadcaster, audioBroadcaster *TrackBroadcaster) {
if state.lastVideoSample != nil && videoBroadcaster != nil {
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
if state.lastAudioSample != nil && audioTrack != nil {
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
}
if state.lastAudioSample != nil && audioBroadcaster != nil {
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
}
// processVideoPacket processes a video packet and writes samples to the track
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
if videoTrack == nil {
func sampleTimestamp(pkt packets.Packet) uint32 {
if pkt.TimeLegacy > 0 {
return uint32(pkt.TimeLegacy.Milliseconds())
}
if pkt.Time > 0 {
return uint32(pkt.Time)
}
return 0
}
func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback time.Duration) time.Duration {
if current.TimeLegacy > 0 {
currentDurationMs := current.TimeLegacy.Milliseconds()
previousDurationMs := int64(previousTimestamp)
if currentDurationMs > previousDurationMs {
duration := time.Duration(currentDurationMs-previousDurationMs) * time.Millisecond
if duration > 0 {
return duration
}
}
}
currentTimestamp := sampleTimestamp(current)
if currentTimestamp > previousTimestamp {
duration := time.Duration(currentTimestamp-previousTimestamp) * time.Millisecond
if duration > 0 {
return duration
}
}
return fallback
}
// processVideoPacket processes a video packet and writes samples to the broadcaster
func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster *TrackBroadcaster, config models.Config) {
if videoBroadcaster == nil {
return
}
@@ -642,7 +910,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
return
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
if config.Capture.ForwardWebRTC == "true" {
// Remote forwarding not yet implemented
@@ -651,50 +919,83 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
}
if state.lastVideoSample != nil {
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
}
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
state.lastVideoSample = &sample
}
// processAudioPacket processes an audio packet and writes samples to the track
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
if audioTrack == nil {
// processAudioPacket processes an audio packet and writes samples to the broadcaster.
// When the packet carries AAC and a transcoder is provided, the audio is transcoded
// to G.711 µ-law on the fly so it can be sent over a PCMU WebRTC track.
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, transcoder *AACTranscoder) {
if audioBroadcaster == nil {
return
}
if hasAAC {
// AAC transcoding not yet implemented
// TODO: Implement AAC to PCM_MULAW transcoding
return
state.audioPacketsSeen++
audioData := pkt.Data
if pkt.Codec == "AAC" {
state.aacPacketsSeen++
if transcoder == nil {
state.aacErrors++
if state.aacErrors <= 3 || state.aacErrors%100 == 0 {
log.Log.Warning(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet dropped because transcoder is nil (aac_packets=%d, input_bytes=%d)", state.aacPacketsSeen, len(pkt.Data)))
}
return // no transcoder silently drop
}
pcmu, err := transcoder.Transcode(pkt.Data)
if err != nil {
state.aacErrors++
log.Log.Error("webrtc.main.processAudioPacket(): AAC transcode error: " + err.Error())
return
}
if len(pcmu) == 0 {
state.aacNoOutput++
if state.aacNoOutput <= 5 || state.aacNoOutput%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet produced no PCMU output yet (aac_packets=%d, no_output=%d, input_bytes=%d)", state.aacPacketsSeen, state.aacNoOutput, len(pkt.Data)))
}
return // decoder still buffering
}
if state.aacPacketsSeen <= 5 || state.aacPacketsSeen%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC transcoded to PCMU (aac_packets=%d, input_bytes=%d, output_bytes=%d, peers=%d)", state.aacPacketsSeen, len(pkt.Data), len(pcmu), audioBroadcaster.PeerCount()))
}
audioData = pcmu
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
sample := pionMedia.Sample{Data: audioData, PacketTimestamp: sampleTimestamp(pkt)}
if state.lastAudioSample != nil {
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
state.audioSamplesSent++
if state.audioSamplesSent <= 5 || state.audioSamplesSent%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): queueing audio sample (samples=%d, codec=%s, bytes=%d, duration_ms=%d, peers=%d)", state.audioSamplesSent, pkt.Codec, len(state.lastAudioSample.Data), state.lastAudioSample.Duration.Milliseconds(), audioBroadcaster.PeerCount()))
}
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
state.lastAudioSample = &sample
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
func shouldDropPacketForLatency(pkt packets.Packet) bool {
if pkt.CurrentTime == 0 {
return false
}
age := time.Since(time.UnixMilli(pkt.CurrentTime))
return age > maxLivePacketAge
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, rtspClient capture.RTSPClient) {
config := configuration.Config
// Check if at least one track is available
if videoTrack == nil && audioTrack == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
// Check if at least one broadcaster is available
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio broadcasters are nil, cannot proceed")
return
}
@@ -706,8 +1007,22 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
return
}
// Create AAC transcoder if needed (AAC → G.711 µ-law).
var aacTranscoder *AACTranscoder
if codecs.hasAAC && audioBroadcaster != nil {
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): AAC audio detected, creating transcoder (audio_peers=%d)", audioBroadcaster.PeerCount()))
t, err := NewAACTranscoder()
if err != nil {
log.Log.Error("webrtc.main.WriteToTrack(): failed to create AAC transcoder: " + err.Error())
} else {
aacTranscoder = t
log.Log.Info("webrtc.main.WriteToTrack(): AAC transcoder created successfully")
defer aacTranscoder.Close()
}
}
if config.Capture.TranscodingWebRTC == "true" {
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
log.Log.Info("webrtc.main.WriteToTrack(): transcoding config enabled")
}
// Initialize streaming state
@@ -717,7 +1032,13 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}
defer func() {
writeFinalSamples(state, videoTrack, audioTrack)
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): audio summary packets=%d aac_packets=%d sent=%d aac_no_output=%d aac_errors=%d peers=%d", state.audioPacketsSeen, state.aacPacketsSeen, state.audioSamplesSent, state.aacNoOutput, state.aacErrors, func() int {
if audioBroadcaster == nil {
return 0
}
return audioBroadcaster.PeerCount()
}()))
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
}()
@@ -747,6 +1068,31 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
continue
}
// Keep live WebRTC close to realtime.
// If audio+video load makes this consumer fall behind, skip old packets and
// wait for a recent keyframe before resuming video.
if shouldDropPacketForLatency(pkt) {
if !state.catchingUp {
log.Log.Warning("webrtc.main.WriteToTrack(): stream is lagging behind, dropping old packets until the next recent keyframe")
}
state.catchingUp = true
state.start = false
state.receivedKeyFrame = false
state.lastAudioSample = nil
state.lastVideoSample = nil
continue
}
if state.catchingUp {
if !(pkt.IsVideo && pkt.IsKeyFrame) {
continue
}
state.catchingUp = false
state.start = false
state.receivedKeyFrame = false
log.Log.Info("webrtc.main.WriteToTrack(): caught up with live stream at a recent keyframe")
}
// Wait for first keyframe before processing
if !state.receivedKeyFrame {
if pkt.IsKeyFrame {
@@ -758,9 +1104,9 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
// Process video or audio packets
if pkt.IsVideo {
processVideoPacket(pkt, state, videoTrack, config)
processVideoPacket(pkt, state, videoBroadcaster, config)
} else if pkt.IsAudio {
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
processAudioPacket(pkt, state, audioBroadcaster, aacTranscoder)
}
}
}

View File

@@ -45,9 +45,9 @@
"crypto": false
},
"scripts": {
"start": "react-scripts start",
"build": "GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "react-scripts test",
"start": "DISABLE_ESLINT_PLUGIN=true react-scripts start",
"build": "DISABLE_ESLINT_PLUGIN=true GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "DISABLE_ESLINT_PLUGIN=true react-scripts test",
"eject": "react-scripts eject",
"lint": "eslint --debug 'src/**/*.{js,jsx,ts,tsx}'",
"format": "prettier --write \"**/*.{js,jsx,json,md}\""

View File

@@ -237,4 +237,4 @@
"remove_after_upload_enabled": "Enable delete on upload"
}
}
}
}

View File

@@ -194,7 +194,7 @@ export const getDashboardInformation = (onSuccess, onError) => {
};
};
export const getEvents = (eventfilter, onSuccess, onError) => {
export const getEvents = (eventfilter, onSuccess, onError, append = false) => {
return (dispatch) => {
doGetEvents(
eventfilter,
@@ -203,6 +203,7 @@ export const getEvents = (eventfilter, onSuccess, onError) => {
type: 'GET_EVENTS',
events: data.events,
filter: eventfilter,
append,
});
if (onSuccess) {
onSuccess();

View File

@@ -7,6 +7,7 @@ import './ImageCanvas.css';
class ImageCanvas extends React.Component {
componentDidMount() {
this.isUnmounted = false;
this.width = 0;
this.height = 0;
@@ -58,6 +59,9 @@ class ImageCanvas extends React.Component {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -71,6 +75,9 @@ class ImageCanvas extends React.Component {
componentDidUpdate() {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -82,11 +89,57 @@ class ImageCanvas extends React.Component {
});
}
componentWillUnmount() {
this.isUnmounted = true;
if (this.pendingImage) {
this.pendingImage.onload = null;
this.pendingImage.src = '';
this.pendingImage = null;
}
if (this.editor) {
this.editor.onSelectionEnd = null;
this.editor.onRegionMoveEnd = null;
this.editor.onRegionDelete = null;
if (this.editor.RM) {
this.editor.RM.deleteAllRegions();
}
if (typeof this.editor.dispose === 'function') {
this.editor.dispose();
} else if (typeof this.editor.destroy === 'function') {
this.editor.destroy();
}
this.editor = null;
}
if (this.toolbarContainer) {
this.toolbarContainer.innerHTML = '';
this.toolbarContainer = null;
}
if (this.editorContainer) {
this.editorContainer.innerHTML = '';
this.editorContainer = null;
}
}
loadData = (image) => {
if (!this.editor) {
return;
}
const w = image.width;
const h = image.height;
this.editor.addContentSource(image).then(() => {
if (this.isUnmounted || !this.editor) {
return;
}
// Add exisiting polygons
this.editor.RM.deleteAllRegions();
const { polygons } = this.props;
@@ -152,11 +205,19 @@ class ImageCanvas extends React.Component {
// eslint-disable-next-line class-methods-use-this
loadImage = (path, onready) => {
if (this.pendingImage) {
this.pendingImage.onload = null;
}
const image = new Image();
image.src = path;
image.addEventListener('load', (e) => {
this.pendingImage = image;
image.onload = (e) => {
if (this.pendingImage === image) {
this.pendingImage = null;
}
onready(e.target);
});
};
image.src = path;
};
// eslint-disable-next-line class-methods-use-this

View File

@@ -26,6 +26,23 @@ import {
import './Dashboard.scss';
import ReactTooltip from 'react-tooltip';
import config from '../../config';
import { getConfig } from '../../actions/agent';
function createUUID() {
if (
typeof window !== 'undefined' &&
window.crypto &&
typeof window.crypto.randomUUID === 'function'
) {
return window.crypto.randomUUID();
}
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (char) => {
const random = Math.floor(Math.random() * 16);
const value = char === 'x' ? random : 8 + Math.floor(random / 4);
return value.toString(16);
});
}
// eslint-disable-next-line react/prefer-stateless-function
class Dashboard extends React.Component {
@@ -33,43 +50,55 @@ class Dashboard extends React.Component {
super();
this.state = {
liveviewLoaded: false,
liveviewMode: 'webrtc',
open: false,
currentRecording: '',
initialised: false,
};
this.videoRef = React.createRef();
this.pendingRemoteCandidates = [];
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.initialiseSDLiveview = this.initialiseSDLiveview.bind(this);
this.startWebRTCLiveview = this.startWebRTCLiveview.bind(this);
this.handleWebRTCSignalMessage = this.handleWebRTCSignalMessage.bind(this);
this.fallbackToSDLiveview = this.fallbackToSDLiveview.bind(this);
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].addEventListener('load', () => {
this.setState({
liveviewLoaded: true,
});
});
}
const { dispatchGetConfig } = this.props;
dispatchGetConfig(() => this.initialiseLiveview());
this.initialiseLiveview();
}
componentDidUpdate() {
this.initialiseLiveview();
componentDidUpdate(prevProps) {
const { images, dashboard } = this.props;
const { liveviewLoaded, liveviewMode } = this.state;
const configLoaded = this.hasAgentConfig(this.props);
const prevConfigLoaded = this.hasAgentConfig(prevProps);
if (!prevConfigLoaded && configLoaded) {
this.initialiseLiveview();
}
if (
liveviewMode === 'sd' &&
!liveviewLoaded &&
prevProps.images !== images &&
images.length > 0
) {
this.setState({
liveviewLoaded: true,
});
}
if (!prevProps.dashboard.cameraOnline && dashboard.cameraOnline) {
this.initialiseLiveview();
}
}
componentWillUnmount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].remove();
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
}
const { dispatchSend } = this.props;
const message = {
message_type: 'stop-sd',
};
dispatchSend(message);
this.stopSDLiveview();
this.stopWebRTCLiveview();
}
handleClose() {
@@ -79,32 +108,378 @@ class Dashboard extends React.Component {
});
}
getCurrentTimestamp() {
return Math.round(Date.now() / 1000);
// eslint-disable-next-line react/sort-comp
hasAgentConfig(props) {
const currentProps = props || this.props;
const { config: configResponse } = currentProps;
return !!(configResponse && configResponse.config);
}
browserSupportsWebRTC() {
return (
typeof window !== 'undefined' &&
typeof window.RTCPeerConnection !== 'undefined'
);
}
buildPeerConnectionConfig() {
const { config: configResponse } = this.props;
const agentConfig =
configResponse && configResponse.config ? configResponse.config : {};
const iceServers = [];
if (agentConfig.stunuri) {
iceServers.push({
urls: [agentConfig.stunuri],
});
}
if (agentConfig.turnuri) {
const turnServer = {
urls: [agentConfig.turnuri],
};
if (agentConfig.turn_username) {
turnServer.username = agentConfig.turn_username;
}
if (agentConfig.turn_password) {
turnServer.credential = agentConfig.turn_password;
}
iceServers.push(turnServer);
}
return {
iceServers,
iceTransportPolicy: agentConfig.turn_force === 'true' ? 'relay' : 'all',
};
}
initialiseLiveview() {
const { initialised } = this.state;
if (!initialised) {
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
const { dashboard } = this.props;
if (initialised || !dashboard.cameraOnline) {
return;
}
if (!this.hasAgentConfig()) {
return;
}
if (this.browserSupportsWebRTC()) {
this.startWebRTCLiveview();
} else {
this.fallbackToSDLiveview('WebRTC is not supported in this browser.');
}
}
initialiseSDLiveview() {
if (this.requestStreamSubscription) {
return;
}
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
dispatchSend(message);
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
this.setState({
initialised: true,
});
stopSDLiveview() {
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
dispatchSend({
message_type: 'stop-sd',
});
}
stopWebRTCLiveview() {
if (this.webrtcTimeout) {
window.clearTimeout(this.webrtcTimeout);
this.webrtcTimeout = null;
}
if (this.webrtcSocket) {
this.webrtcSocket.onopen = null;
this.webrtcSocket.onmessage = null;
this.webrtcSocket.onerror = null;
this.webrtcSocket.onclose = null;
this.webrtcSocket.close();
this.webrtcSocket = null;
}
if (this.webrtcPeerConnection) {
this.webrtcPeerConnection.ontrack = null;
this.webrtcPeerConnection.onicecandidate = null;
this.webrtcPeerConnection.onconnectionstatechange = null;
this.webrtcPeerConnection.close();
this.webrtcPeerConnection = null;
}
this.pendingRemoteCandidates = [];
this.webrtcOfferStarted = false;
this.webrtcSessionId = null;
this.webrtcClientId = null;
if (this.videoRef.current) {
this.videoRef.current.srcObject = null;
}
}
sendWebRTCMessage(messageType, message = {}) {
if (!this.webrtcSocket || this.webrtcSocket.readyState !== WebSocket.OPEN) {
return;
}
this.webrtcSocket.send(
JSON.stringify({
client_id: this.webrtcClientId,
message_type: messageType,
message,
})
);
}
async handleWebRTCSignalMessage(event) {
let data;
try {
data = JSON.parse(event.data);
} catch (error) {
return;
}
const { message_type: messageType, message = {} } = data;
const { session_id: sessionID, sdp, candidate } = message;
if (messageType === 'hello-back') {
await this.beginWebRTCLiveview();
return;
}
if (sessionID && sessionID !== this.webrtcSessionId) {
return;
}
switch (messageType) {
case 'webrtc-answer':
try {
await this.webrtcPeerConnection.setRemoteDescription({
type: 'answer',
sdp: window.atob(sdp),
});
await this.flushPendingRemoteCandidates();
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC answer: ${error.message}`
);
}
break;
case 'webrtc-candidate': {
try {
const candidateInit = JSON.parse(candidate);
if (
this.webrtcPeerConnection.remoteDescription &&
this.webrtcPeerConnection.remoteDescription.type
) {
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} else {
this.pendingRemoteCandidates.push(candidateInit);
}
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC candidate: ${error.message}`
);
}
break;
}
case 'webrtc-error':
this.fallbackToSDLiveview(
message.message || 'The agent could not start the WebRTC liveview.'
);
break;
default:
break;
}
}
async beginWebRTCLiveview() {
if (!this.webrtcPeerConnection || this.webrtcOfferStarted) {
return;
}
try {
this.webrtcOfferStarted = true;
const offer = await this.webrtcPeerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
await this.webrtcPeerConnection.setLocalDescription(offer);
this.sendWebRTCMessage('stream-hd', {
session_id: this.webrtcSessionId,
sdp: window.btoa(this.webrtcPeerConnection.localDescription.sdp),
});
} catch (error) {
this.fallbackToSDLiveview(
`Unable to initialise WebRTC liveview: ${error.message}`,
);
}
}
async flushPendingRemoteCandidates() {
if (
!this.webrtcPeerConnection ||
!this.webrtcPeerConnection.remoteDescription
) {
return;
}
while (this.pendingRemoteCandidates.length > 0) {
const candidateInit = this.pendingRemoteCandidates.shift();
try {
// eslint-disable-next-line no-await-in-loop
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} catch (error) {
this.fallbackToSDLiveview(
`Unable to add remote ICE candidate: ${error.message}`,
);
return;
}
}
}
startWebRTCLiveview() {
if (this.webrtcPeerConnection || this.webrtcSocket) {
return;
}
this.stopSDLiveview();
this.webrtcClientId = createUUID();
this.webrtcSessionId = createUUID();
this.pendingRemoteCandidates = [];
this.webrtcPeerConnection = new window.RTCPeerConnection(
this.buildPeerConnectionConfig()
);
this.webrtcPeerConnection.ontrack = (event) => {
const [remoteStream] = event.streams;
if (this.videoRef.current && remoteStream) {
this.videoRef.current.srcObject = remoteStream;
const playPromise = this.videoRef.current.play();
if (playPromise && playPromise.catch) {
playPromise.catch(() => {});
}
}
this.setState({
liveviewLoaded: true,
});
};
this.webrtcPeerConnection.onicecandidate = (event) => {
if (!event.candidate) {
return;
}
this.sendWebRTCMessage('webrtc-candidate', {
session_id: this.webrtcSessionId,
candidate: JSON.stringify(event.candidate.toJSON()),
});
};
this.webrtcPeerConnection.onconnectionstatechange = () => {
const { connectionState } = this.webrtcPeerConnection;
if (connectionState === 'connected') {
this.setState({
liveviewLoaded: true,
});
}
if (
connectionState === 'failed' ||
connectionState === 'disconnected' ||
connectionState === 'closed'
) {
this.fallbackToSDLiveview(
`WebRTC connection ${connectionState}, falling back to SD liveview.`,
);
}
};
this.webrtcSocket = new WebSocket(config.WS_URL);
this.webrtcSocket.onopen = () => {
this.sendWebRTCMessage('hello', {});
};
this.webrtcSocket.onmessage = this.handleWebRTCSignalMessage;
this.webrtcSocket.onerror = () => {
this.fallbackToSDLiveview('Unable to open the WebRTC signaling channel.');
};
this.webrtcSocket.onclose = () => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview('WebRTC signaling channel closed early.');
}
};
this.webrtcTimeout = window.setTimeout(() => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview(
'WebRTC connection timed out, falling back to SD liveview.'
);
}
}, 10000);
this.setState({
initialised: true,
liveviewLoaded: false,
liveviewMode: 'webrtc',
});
}
fallbackToSDLiveview(errorMessage) {
const { liveviewMode } = this.state;
if (liveviewMode === 'sd' && this.requestStreamSubscription) {
return;
}
this.stopWebRTCLiveview();
if (errorMessage) {
// eslint-disable-next-line no-console
console.warn(errorMessage);
}
this.setState(
{
initialised: true,
liveviewLoaded: false,
liveviewMode: 'sd',
},
() => {
this.initialiseSDLiveview();
}
);
}
openModal(file) {
@@ -116,7 +491,8 @@ class Dashboard extends React.Component {
render() {
const { dashboard, t, images } = this.props;
const { liveviewLoaded, open, currentRecording } = this.state;
const { liveviewLoaded, liveviewMode, open, currentRecording } = this.state;
const listenerCount = dashboard.webrtcReaders ? dashboard.webrtcReaders : 0;
// We check if the camera was getting a valid frame
// during the last 5 seconds, otherwise we assume the camera is offline.
@@ -170,7 +546,6 @@ class Dashboard extends React.Component {
divider="0"
footer={t('dashboard.total_recordings')}
/>
<Link to="/settings">
<Card
title="IP Camera"
@@ -309,7 +684,9 @@ class Dashboard extends React.Component {
)}
</div>
<div>
<h2>{t('dashboard.live_view')}</h2>
<h2>
{t('dashboard.live_view')} ({listenerCount})
</h2>
{(!liveviewLoaded || !isCameraOnline) && (
<SetupBox
btnicon="preferences"
@@ -326,12 +703,16 @@ class Dashboard extends React.Component {
liveviewLoaded && isCameraOnline ? 'visible' : 'hidden',
}}
>
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
{liveviewMode === 'webrtc' ? (
<video ref={this.videoRef} autoPlay muted playsInline />
) : (
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
)}
</div>
</div>
</div>
@@ -343,20 +724,25 @@ class Dashboard extends React.Component {
const mapStateToProps = (state /* , ownProps */) => ({
dashboard: state.agent.dashboard,
config: state.agent.config,
connected: state.wss.connected,
images: state.wss.images,
});
const mapDispatchToProps = (dispatch) => ({
dispatchSend: (message) => dispatch(send(message)),
dispatchGetConfig: (onSuccess, onError) =>
dispatch(getConfig(onSuccess, onError)),
});
Dashboard.propTypes = {
dashboard: PropTypes.object.isRequired,
config: PropTypes.object.isRequired,
connected: PropTypes.bool.isRequired,
images: PropTypes.array.isRequired,
t: PropTypes.func.isRequired,
dispatchSend: PropTypes.func.isRequired,
dispatchGetConfig: PropTypes.func.isRequired,
};
export default withTranslation()(

View File

@@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
import { withTranslation } from 'react-i18next';
import {
Breadcrumb,
ControlBar,
VideoCard,
Button,
Modal,
@@ -16,14 +17,53 @@ import { getEvents } from '../../actions/agent';
import config from '../../config';
import './Media.scss';
function formatDateTimeLocal(date) {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hours = String(date.getHours()).padStart(2, '0');
const minutes = String(date.getMinutes()).padStart(2, '0');
return `${year}-${month}-${day}T${hours}:${minutes}`;
}
function getDefaultTimeWindow() {
const endDate = new Date();
const startDate = new Date(endDate.getTime() - 60 * 60 * 1000);
return {
startDateTime: formatDateTimeLocal(startDate),
endDateTime: formatDateTimeLocal(endDate),
timestamp_offset_start: Math.floor(startDate.getTime() / 1000),
timestamp_offset_end: Math.floor(endDate.getTime() / 1000) + 59,
};
}
function normalizeInputValue(valueOrEvent) {
if (valueOrEvent && valueOrEvent.target) {
return valueOrEvent.target.value;
}
return valueOrEvent;
}
// eslint-disable-next-line react/prefer-stateless-function
class Media extends React.Component {
constructor() {
super();
this.state = {
timestamp_offset_start: 0,
timestamp_offset_end: 0,
const defaultTimeWindow = getDefaultTimeWindow();
const initialFilter = {
timestamp_offset_start: defaultTimeWindow.timestamp_offset_start,
timestamp_offset_end: defaultTimeWindow.timestamp_offset_end,
number_of_elements: 12,
};
this.state = {
appliedFilter: initialFilter,
startDateTime: defaultTimeWindow.startDateTime,
endDateTime: defaultTimeWindow.endDateTime,
isScrolling: false,
open: false,
currentRecording: '',
@@ -32,7 +72,8 @@ class Media extends React.Component {
componentDidMount() {
const { dispatchGetEvents } = this.props;
dispatchGetEvents(this.state);
const { appliedFilter } = this.state;
dispatchGetEvents(appliedFilter);
document.addEventListener('scroll', this.trackScrolling);
}
@@ -49,29 +90,107 @@ class Media extends React.Component {
trackScrolling = () => {
const { events, dispatchGetEvents } = this.props;
const { isScrolling } = this.state;
const { isScrolling, appliedFilter } = this.state;
const wrappedElement = document.getElementById('loader');
if (!isScrolling && this.isBottom(wrappedElement)) {
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
this.setState({
if (!wrappedElement || isScrolling || !this.isBottom(wrappedElement)) {
return;
}
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
dispatchGetEvents(
{
...appliedFilter,
timestamp_offset_end: parseInt(lastElement.timestamp, 10),
});
dispatchGetEvents(this.state, () => {
},
() => {
setTimeout(() => {
this.setState({
isScrolling: false,
});
}, 1000);
});
}
},
() => {
this.setState({
isScrolling: false,
});
},
true
);
} else {
this.setState({
isScrolling: false,
});
}
};
buildEventFilter(startDateTime, endDateTime) {
const { appliedFilter } = this.state;
return {
timestamp_offset_start: this.getTimestampFromInput(
startDateTime,
'start'
),
timestamp_offset_end: this.getTimestampFromInput(endDateTime, 'end'),
number_of_elements: appliedFilter.number_of_elements,
};
}
handleDateFilterChange(field, value) {
const { dispatchGetEvents } = this.props;
const { startDateTime, endDateTime } = this.state;
const normalizedValue = normalizeInputValue(value);
const nextStartDateTime =
field === 'startDateTime' ? normalizedValue : startDateTime;
const nextEndDateTime =
field === 'endDateTime' ? normalizedValue : endDateTime;
const nextFilter = this.buildEventFilter(
nextStartDateTime,
nextEndDateTime
);
const shouldApplyFilter =
(nextStartDateTime === '' || nextStartDateTime.length === 16) &&
(nextEndDateTime === '' || nextEndDateTime.length === 16);
this.setState(
{
[field]: normalizedValue,
appliedFilter: shouldApplyFilter
? nextFilter
: this.state.appliedFilter,
isScrolling: false,
},
() => {
if (shouldApplyFilter) {
dispatchGetEvents(nextFilter);
}
}
);
}
getTimestampFromInput(value, boundary) {
if (!value) {
return 0;
}
const date = new Date(value);
if (Number.isNaN(date.getTime())) {
return 0;
}
const seconds = Math.floor(date.getTime() / 1000);
if (boundary === 'end') {
return seconds + 59;
}
return seconds;
}
isBottom(el) {
return el.getBoundingClientRect().bottom + 50 <= window.innerHeight;
}
@@ -85,7 +204,9 @@ class Media extends React.Component {
render() {
const { events, eventsLoaded, t } = this.props;
const { isScrolling, open, currentRecording } = this.state;
const { isScrolling, open, currentRecording, startDateTime, endDateTime } =
this.state;
return (
<div id="media">
<Breadcrumb
@@ -102,6 +223,37 @@ class Media extends React.Component {
</Link>
</Breadcrumb>
<div className="media-control-bar">
<ControlBar>
<div className="media-filters">
<div className="media-filters__field">
<label htmlFor="recordings-start-time">Start time</label>
<input
className="media-filters__input"
id="recordings-start-time"
type="datetime-local"
value={startDateTime}
onChange={(value) =>
this.handleDateFilterChange('startDateTime', value)
}
/>
</div>
<div className="media-filters__field">
<label htmlFor="recordings-end-time">End time</label>
<input
className="media-filters__input"
id="recordings-end-time"
type="datetime-local"
value={endDateTime}
onChange={(value) =>
this.handleDateFilterChange('endDateTime', value)
}
/>
</div>
</div>
</ControlBar>
</div>
<div className="stats grid-container --four-columns">
{events.map((event) => (
<div
@@ -123,6 +275,11 @@ class Media extends React.Component {
</div>
))}
</div>
{events.length === 0 && eventsLoaded === 0 && (
<div className="media-empty-state">
No recordings found in the selected time range.
</div>
)}
{open && (
<Modal>
<ModalHeader
@@ -182,13 +339,13 @@ const mapStateToProps = (state /* , ownProps */) => ({
});
const mapDispatchToProps = (dispatch) => ({
dispatchGetEvents: (eventFilter, success, error) =>
dispatch(getEvents(eventFilter, success, error)),
dispatchGetEvents: (eventFilter, success, error, append) =>
dispatch(getEvents(eventFilter, success, error, append)),
});
Media.propTypes = {
t: PropTypes.func.isRequired,
events: PropTypes.objectOf(PropTypes.object).isRequired,
events: PropTypes.arrayOf(PropTypes.object).isRequired,
eventsLoaded: PropTypes.number.isRequired,
dispatchGetEvents: PropTypes.func.isRequired,
};

View File

@@ -1,4 +1,103 @@
#media {
.media-control-bar {
.control-bar {
display: block;
padding: 0 var(--main-content-gutter);
}
.control-bar .filtering {
display: block;
}
.control-bar .filtering > * {
border-right: 0;
flex: 1 1 100% !important;
max-width: none;
width: 100%;
}
}
.media-filters {
align-items: stretch;
display: grid;
gap: 0;
grid-template-columns: repeat(2, minmax(0, 1fr));
min-width: 0;
width: 100%;
}
.media-filters__field {
border-right: 1px solid var(--bg-muted);
min-width: 0;
padding: 16px 24px;
label {
display: block;
font-size: 14px;
font-weight: 600;
margin-bottom: 8px;
white-space: nowrap;
}
}
.media-filters__field:first-child {
padding-left: 0;
}
.media-filters__input {
appearance: none;
background: var(--white);
border: 1px solid var(--grey-light);
border-radius: 8px;
box-sizing: border-box;
color: var(--black);
font-size: 16px;
min-height: 48px;
padding: 0 14px 0 0;
width: 100%;
}
.media-filters__input::-webkit-datetime-edit,
.media-filters__input::-webkit-datetime-edit-fields-wrapper {
padding: 0;
}
.media-filters__input:focus {
border-color: var(--oss);
outline: 0;
}
.media-filters__field:first-child .media-filters__input {
padding-left: 0;
}
.media-filters__field:last-child {
border-right: 0;
padding-right: 0;
}
@media (max-width: 700px) {
.media-filters {
grid-template-columns: 1fr;
}
.media-filters__field {
border-right: 0;
border-bottom: 1px solid var(--bg-muted);
padding-left: 0;
padding-right: 0;
}
.media-filters__field:last-child {
border-bottom: 0;
}
}
.media-empty-state {
margin: 24px 0;
opacity: 0.8;
text-align: center;
}
#loader {
display: flex;

View File

@@ -159,7 +159,10 @@ class Settings extends React.Component {
componentWillUnmount() {
document.removeEventListener('keydown', this.escFunction, false);
clearInterval(this.interval);
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {

View File

@@ -123,16 +123,12 @@ const agent = (
};
case 'GET_EVENTS':
const { timestamp_offset_end } = action.filter;
const { events } = action;
return {
...state,
eventsLoaded: events.length,
events:
timestamp_offset_end === 0
? [...events]
: [...state.events, ...events],
eventfilter: action.eventfilter,
events: action.append ? [...state.events, ...events] : [...events],
eventfilter: action.filter,
};
default: