Compare commits

..

109 Commits

Author SHA1 Message Date
Cédric Verstraeten
8657765e5d Merge pull request #262 from kerberos-io/feature/concurrency-webrtc
feature/concurrency-webrtc
2026-03-09 21:37:04 +01:00
Cédric Verstraeten
76a136abc9 Add trailing commas to fallback calls
Add trailing commas to the arguments passed to fallbackToSDLiveview in ui/src/pages/Dashboard/Dashboard.jsx. This is a non-functional formatting change applied to the WebRTC initialization, ICE candidate handling, and connection-state fallback calls to align with the project's code style/formatter.
2026-03-09 21:34:17 +01:00
Cédric Verstraeten
5475b79459 Remove extraneous trailing commas in Dashboard
Clean up trailing commas and minor formatting in ui/src/pages/Dashboard/Dashboard.jsx. Adjusts object/argument commas and formatting around WebRTC message handling, peer connection setup, error handling, and SD liveview fallback callbacks to avoid potential syntax/lint issues.
2026-03-09 21:32:10 +01:00
Cédric Verstraeten
2ad768780f Format Dashboard.jsx: add trailing commas
Apply consistent formatting to ui/src/pages/Dashboard/Dashboard.jsx by adding trailing commas in object literals, function call argument lists, and callbacks (primarily around WebRTC handling and error messages). This is a non-functional style change to match the project's code style (e.g., Prettier/ESLint) and should not affect runtime behavior.
2026-03-09 21:29:30 +01:00
Cédric Verstraeten
f64b5fb65b Replace uuidv4 with local createUUID
Remove the uuidv4 import and introduce a local createUUID helper that uses window.crypto.randomUUID when available and falls back to a v4-style generator. Update webrtcClientId and webrtcSessionId to use createUUID(), removing the external dependency while preserving UUID generation for WebRTC session/client IDs.
2026-03-09 21:27:01 +01:00
Cédric Verstraeten
bb773316a2 Add trailing commas and tidy Media.scss
Add trailing commas to multi-line function calls and RTCPeerConnection instantiation in Dashboard.jsx for consistent formatting. In Media.scss remove an extra blank line and relocate the .media-filters__field:first-child rule to consolidate related styles. Purely stylistic/organizational changes with no intended behavior change.
2026-03-09 21:23:16 +01:00
Cédric Verstraeten
fc6fa9d425 Format Media.jsx and add newline
Reformat code in ui/src/pages/Media/Media.jsx for readability: wrap long argument lists (getTimestampFromInput, buildEventFilter) and reflow the appliedFilter ternary onto multiple lines. Also add the missing trailing newline to ui/public/locales/en/translation.json. No functional changes.
2026-03-09 21:18:52 +01:00
Cédric Verstraeten
aa183ee0fb Enable MQTT persistent sessions and resume subs
Switch MQTT client to persistent sessions by setting CleanSession to false, enabling ResumeSubs and using an in-memory store. Previously CleanSession was true and resume/store were commented out, which could drop subscriptions on reconnect; these changes ensure subscriptions are preserved across reconnects and re-subscribed from memory.
2026-03-09 21:12:22 +01:00
Cédric Verstraeten
730b1b2a40 Add WebRTC liveview signaling and UI fallback
Introduce structured WebRTC handshake signaling and client-side fallbacks. Changes:

- machinery: replace HandleLiveHDHandshake channel to carry LiveHDHandshake (payload + signaling callbacks) and expose active WebRTC reader count in dashboard data.
- routers: MQTT and WebSocket handlers now send/receive LiveHDHandshake structs; websocket supports stream-hd and webrtc-candidate messages and uses callback-based signaling to reply over the WS connection.
- webrtc: add helper functions to send MQTT or callback answers/candidates, adapt InitializeWebRTCConnection to the new handshake type, and expose GetActivePeerConnectionCount.
- utils: minor GetMediaFormatted filtering fix and unit test for timestamp range behavior.
- ui: Dashboard gains native WebRTC liveview with fallback to SD image stream, shows active listener count, and handles signaling/candidates; Media page adds datetime range filters, infinite-scroll append behavior, and styles; reducer/action updates to support appending events; package.json scripts disable ESLint plugin during start/build/test.

These changes enable browser-based HD liveviews with dual signaling paths (websocket callbacks or MQTT), improve media filtering, and provide graceful fallback to SD streaming when WebRTC fails.
2026-03-09 21:10:18 +01:00
Cédric Verstraeten
4efc80fecb Enhance WebRTC signaling robustness
Increase HD handshake channel buffer and harden signaling flow: enlarge HandleLiveHDHandshake buffer from 10 to 100 and add a nil-check to drop and log requests when the channel is not initialized. Add publishSignalingMessageAsync to publish MQTT messages with timeout and error logging, and replace blocking Publish().Wait() calls for ICE candidates and SDP answers with the async publisher. Reintroduce the remote-candidate processor goroutine after remote description handling to avoid AddICECandidate races. These changes reduce blocking, improve error handling, and make WebRTC/MQTT signaling more resilient.
2026-03-09 20:05:00 +01:00
Cédric Verstraeten
4fbee60e9f Merge pull request #261 from kerberos-io/feature/add-webrtc-aac-transcoder
feature/add-webrtc-aac-transcoder
2026-03-09 17:46:17 +01:00
Cédric Verstraeten
d6c25df280 Add missing imports for strconv and strings in AAC transcoder stub 2026-03-09 16:42:42 +00:00
Cédric Verstraeten
72a2d28e1e Update aac_transcoder_stub.go 2026-03-09 17:41:54 +01:00
Cédric Verstraeten
eb0972084f Implement AAC transcoding for WebRTC using FFmpeg; update Dockerfiles and launch configuration 2026-03-09 16:34:52 +00:00
Cédric Verstraeten
41a1d221fc Merge pull request #260 from kerberos-io/fix/set-clean-state
fix/set-clean-state
2026-03-09 16:56:36 +01:00
Cédric Verstraeten
eaacc93d2f Set MQTT clean session to true and disable resume subscriptions 2026-03-09 15:50:40 +00:00
Cédric Verstraeten
0e6a004c23 Merge pull request #259 from kerberos-io/fix/add-grace-period
feature/add-broadcasting-feature
2026-03-09 16:20:39 +01:00
Cédric Verstraeten
617f854534 Merge branch 'master' into fix/add-grace-period 2026-03-09 16:17:35 +01:00
Cédric Verstraeten
1bf8006055 Refactor WebRTC handling to use per-peer broadcasters for video and audio tracks 2026-03-09 15:12:01 +00:00
Cédric Verstraeten
ca0e426382 Add max signaling age constant and discard stale WebRTC messages 2026-03-09 14:50:00 +00:00
Cédric Verstraeten
726d0722d9 Merge pull request #258 from kerberos-io/fix/add-grace-period
fix/add-grace-period
2026-03-09 15:20:53 +01:00
Cédric Verstraeten
d8f320b040 Add disconnect grace period handling in WebRTC connection manager 2026-03-09 14:15:50 +00:00
Cédric Verstraeten
0131b87692 Merge pull request #257 from kerberos-io/security/middleware-exposure
security/middleware-exposure
2026-03-09 14:18:11 +01:00
Cédric Verstraeten
54e8198b65 Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-09 14:18:00 +01:00
Cédric Verstraeten
3bfb68f950 Update port configuration and secure routes with JWT authentication middleware 2026-03-09 12:42:05 +00:00
Cédric Verstraeten
c05e59c936 Merge pull request #255 from kerberos-io/feature/improve-mqtt-concurrency
feature/improve-mqtt-concurrency
2026-03-09 13:25:26 +01:00
Cédric Verstraeten
b42d63b668 Enhance WebRTC packet processing for improved latency handling and keyframe synchronization 2026-03-09 12:17:38 +00:00
Cédric Verstraeten
0ca007e424 Refactor session key usage in ConnectionManager and enhance candidate queuing 2026-03-09 12:09:22 +00:00
Cédric Verstraeten
229d085de7 Merge pull request #253 from kerberos-io/fix/mqtt-reconnection
fix/mqtt-reconnection
2026-03-09 12:43:42 +01:00
Cédric Verstraeten
30e2b8318d Refactor build workflow to support multi-architecture builds and enhance MQTT connection handling 2026-03-09 11:40:24 +00:00
Cédric Verstraeten
dbcf4e242c Enhance MQTT reconnection handling and improve WebRTC connection cleanup
- Enable automatic reconnection for MQTT with configurable intervals and timeouts.
- Add logging for connection loss and reconnection attempts.
- Refactor WebRTC connection cleanup to ensure proper resource management on disconnection.
- Improve event handling in ImageCanvas and Dashboard components for better performance and reliability.
2026-03-09 11:04:10 +00:00
Cédric Verstraeten
ccf4034cc8 Merge pull request #252 from kerberos-io/fix/close-mp4-after-started
fix/close-mp4-after-started
2026-03-03 15:21:12 +01:00
Cédric Verstraeten
a34836e8f4 Delay MP4 creation until the first keyframe is received to ensure valid recordings 2026-03-03 14:16:39 +00:00
Cédric Verstraeten
dd1464d1be Fix recording closure condition to ensure it only triggers after recording has started 2026-03-03 14:03:11 +00:00
Cédric Verstraeten
2c02e0aeb1 Merge pull request #250 from kerberos-io/fix/add-avc-description-fallback
fix/add-avc-description-fallback
2026-02-27 11:48:34 +01:00
cedricve
d5464362bb Add AVC descriptor fallback for SPS parse errors
When setting the AVC descriptor fails in MP4.Close(), attempt a fallback that constructs an AvcC/avc1 sample entry from available SPS/PPS NALUs. Adds github.com/Eyevinn/mp4ff/avc import and two helpers: addAVCDescriptorFallback (builds a visual sample entry, sets tkhd width/height if available, and inserts it into stsd) and buildAVCDecConfRecFromSPS (creates an avc.DecConfRec from SPS/PPS bytes by extracting profile/compat/level and filling defaults). Logs errors and warns when the fallback is used. This provides resilience against SPS parsing errors when writing the MP4 track descriptor.
2026-02-27 11:35:22 +01:00
Cédric Verstraeten
5bcefd0015 Merge pull request #249 from kerberos-io/feature/enhance-avc-hevc-ssp-nalus
feature/enhance-avc-hevc-ssp-nalus
2026-02-27 11:12:03 +01:00
cedricve
5bb9def42d Normalize and debug H264/H265 parameter sets
Replace direct sanitizeParameterSets usage with normalizeH264ParameterSets and normalizeH265ParameterSets in mp4.Close. The new functions split Annex-B blobs, strip start codes, detect NALU types (SPS/PPS for AVC; VPS/SPS/PPS for HEVC), aggregate distinct parameter sets and fall back to sanitizeParameterSets if none are found. Added splitParamSetNALUs and formatNaluDebug helpers and debug logging to output concise parameter-set summaries before setting AVC/HEVC descriptors. These changes improve handling of concatenated Annex-B parameter set blobs and make debugging parameter extraction easier.
2026-02-27 11:09:28 +01:00
Cédric Verstraeten
ff38ccbadf Merge pull request #248 from kerberos-io/fix/sanitize-parameter-sets
fix/sanitize-parameter-sets
2026-02-26 20:43:53 +01:00
cedricve
f64e899de9 Populate/sanitize NALUs and avoid empty MP4
Fill missing SPS/PPS/VPS from camera config before closing recordings and warn when parameter sets are incomplete (for both continuous and motion-detection flows). Sanitize parameter sets (remove Annex-B start codes and drop empty NALUs) before writing AVC/HEVC descriptors. Prevent creation of empty MP4 files by flushing/closing and removing files when no audio/video samples were added, and only add an audio track when audio samples exist.
2026-02-26 20:37:10 +01:00
Cédric Verstraeten
b8a81d18af Merge pull request #247 from kerberos-io/fix/ensure-stsd
fix/ensure-stsd
2026-02-26 17:13:45 +01:00
cedricve
8c2e3e4cdd Recover video parameter sets from Annex B NALUs
Add updateVideoParameterSetsFromAnnexB to parse Annex B NALUs and populate missing SPS/PPS/VPS for H.264/H.265 streams. Call this helper when adding video samples so in-band parameter sets can be recovered early. Also add error logging in Close() when setting AVC/HEVC descriptors fails. These changes improve robustness for streams that carry SPS/PPS/VPS inline.
2026-02-26 17:05:09 +01:00
Cédric Verstraeten
11c4ee518d Merge pull request #246 from kerberos-io/fix/handle-sps-pps-unknown-state
fix/handle-sps-pps-unknown-state
2026-02-26 16:24:54 +01:00
cedricve
51b9d76973 Improve SPS/PPS handling: add warnings for missing SPS/PPS during recording start 2026-02-26 15:24:34 +00:00
cedricve
f3c1cb9b82 Enhance SPS/PPS handling for main stream in gortsplib: add fallback for missing SDP 2026-02-26 15:21:54 +00:00
Cédric Verstraeten
a1368361e4 Merge pull request #242 from kerberos-io/fix/update-workflows-for-nightly-build
fix/update-workflows-for-nightly-build
2026-02-16 12:44:40 +01:00
Cédric Verstraeten
abfdea0179 Update issue-userstory-create.yml 2026-02-16 12:37:49 +01:00
Cédric Verstraeten
8aaeb62fa3 Merge pull request #241 from kerberos-io/fix/update-workflows-for-nightly-build
fix/update-workflows-for-nightly-build
2026-02-16 12:21:06 +01:00
Cédric Verstraeten
e30dd7d4a0 Add nightly build workflow for Docker images 2026-02-16 12:16:39 +01:00
Cédric Verstraeten
ac3f9aa4e8 Merge pull request #240 from kerberos-io/feature/add-issue-generator-workflow
feature/add-issue-generator-workflow
2026-02-16 11:58:06 +01:00
Cédric Verstraeten
04c568f488 Add workflow to create user story issues with customizable inputs 2026-02-16 11:54:07 +01:00
Cédric Verstraeten
e270223968 Merge pull request #238 from kerberos-io/fix/docker-build-release-action
fix/docker-build-release-action
2026-02-13 22:17:33 +01:00
cedricve
01ab1a9218 Disable build provenance in Docker builds
Add --provenance=false to docker build invocations in .github/workflows/release-create.yml (both default and arm64 steps) to suppress Docker provenance metadata during CI builds.
2026-02-13 22:16:23 +01:00
Cédric Verstraeten
6f0794b09c Merge pull request #237 from kerberos-io/feature/fix-quicktime-duration
feature/fix-quicktime-duration
2026-02-13 21:55:41 +01:00
cedricve
1ae6a46d88 Embed build version into binaries
Pass VERSION from CI into Docker builds and embed it into the Go binary via ldflags. Updated .github workflow to supply --build-arg VERSION for both architectures. Added ARG VERSION and logic in Dockerfile and Dockerfile.arm64 to derive the version from git (git describe --tags) or fall back to the provided build-arg, then set it with -X during go build. Changed VERSION in machinery/src/utils/main.go from a const to a var defaulting to "0.0.0" and documented that it is overridden at build time. This ensures released images contain the correct agent version while local/dev builds keep a sensible default.
2026-02-13 21:50:09 +01:00
cedricve
9d83cab5cc Set mdhd.Duration to 0 for fragmented MP4
Uncomment and explicitly set mdhd.Duration = 0 in machinery/src/video/mp4.go for relevant tracks (video H264/H265 and audio track). This ensures mdhd.Duration is zero for fragmented MP4 so players derive duration from fragments (avoiding QuickTime adding fragment durations and doubling the reported duration).
2026-02-13 21:46:32 +01:00
cedricve
6f559c2f00 Align MP4 headers to fragment durations
Compute actual video duration from SegmentDurations and ensure container headers reflect fragment durations. Set mvhd.Duration and mvex/mehd.FragmentDuration to the maximum of video (sum of segments) and audio durations so the overall mvhd matches the longest track. Use the summed segment duration for track tkhd.Duration and keep mdhd.Duration at 0 for fragmented MP4s (to avoid double-counting). Add a warning log when accumulated video duration differs from the recorded VideoTotalDuration. Harden fingerprint generation and private key handling with nil checks.

Add mp4_duration_test.go: unit test that creates a simulated H.264 fragmented MP4 (150 frames at 40ms), closes it, parses the output and verifies that mvhd/mehd and trun sample durations are consistent and that mdhd.Duration is zero.
2026-02-13 21:35:57 +01:00
cedricve
c147944f5a Convert MP4 timestamps to Mac HFS epoch
Add MacEpochOffset constant and convert mp4.StartTime to Mac HFS time for QuickTime compatibility. Compute macTime = mp4.StartTime + MacEpochOffset and use it for mvhd CreationTime/ModificationTime, as well as track tkhd and mdhd creation/modification timestamps for video and audio tracks. Also set mvhd Rate, Volume and NextTrackID. These changes ensure generated MP4s use QuickTime-compatible epoch and include proper mvhd metadata.
2026-02-13 21:01:45 +01:00
Cédric Verstraeten
e8ca776e4e Merge pull request #236 from kerberos-io/fix/debugging-lost-keyframes
fix/debugging-lost-keyframes
2026-02-11 16:51:07 +01:00
Cédric Verstraeten
de5c4b6e0a Merge branch 'master' into fix/debugging-lost-keyframes 2026-02-11 16:48:08 +01:00
Cédric Verstraeten
9ba64de090 add additional logging 2026-02-11 16:48:01 +01:00
Cédric Verstraeten
7ceeebe76e Merge pull request #235 from kerberos-io/fix/debugging-lost-keyframes
fix/debugging-lost-keyframes
2026-02-11 16:15:57 +01:00
Cédric Verstraeten
bd7dbcfcf2 Enhance FPS tracking and logging for keyframes in gortsplib and mp4 modules 2026-02-11 15:11:52 +00:00
Cédric Verstraeten
8c7a46e3ae Merge pull request #234 from kerberos-io/fix/fps-gop-size
fix/fps-gop-size
2026-02-11 15:05:31 +01:00
Cédric Verstraeten
57ccfaabf5 Merge branch 'fix/fps-gop-size' of github.com:kerberos-io/agent into fix/fps-gop-size 2026-02-11 14:59:34 +01:00
Cédric Verstraeten
4a9cb51e95 Update machinery/src/capture/gortsplib.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-11 14:59:15 +01:00
Cédric Verstraeten
ab6f621e76 Merge branch 'fix/fps-gop-size' of github.com:kerberos-io/agent into fix/fps-gop-size 2026-02-11 14:58:44 +01:00
Cédric Verstraeten
c365ae5af2 Ensure thread-safe closure of peer connections in InitializeWebRTCConnection 2026-02-11 13:58:29 +00:00
Cédric Verstraeten
b05c3d1baa Update machinery/src/capture/gortsplib.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-11 14:52:40 +01:00
Cédric Verstraeten
c7c7203fad Merge branch 'master' into fix/fps-gop-size 2026-02-11 14:48:05 +01:00
Cédric Verstraeten
d93f85b4f3 Refactor FPS calculation to use per-stream trackers for improved accuracy 2026-02-11 13:45:07 +00:00
Cédric Verstraeten
031212b98c Merge pull request #232 from kerberos-io/fix/fps-gop-size
fix/fps-gop-size
2026-02-11 14:27:18 +01:00
Cédric Verstraeten
a4837b3cb3 Implement PTS-based FPS calculation and GOP size adjustments 2026-02-11 13:14:29 +00:00
Cédric Verstraeten
77629ac9b8 Merge pull request #231 from kerberos-io/feature/improve-keyframe-interval
feature/improve-keyframe-interval
2026-02-11 12:28:33 +01:00
cedricve
59608394af Use Warning instead of Warn in mp4.go
Replace call to log.Log.Warn with log.Log.Warning in MP4.flushPendingVideoSample to match the logger API. This is a non-functional change that preserves the original message and behavior while using the correct logging method name.
2026-02-11 12:26:18 +01:00
cedricve
9dfcaa466f Refactor video sample flushing logic into a dedicated function 2026-02-11 11:48:15 +01:00
cedricve
88442e4525 Add pending video sample to segment before flush
Before flushing a segment when mp4.Start is true, add any pending VideoFullSample for the current video track to the current fragment. The change computes and updates LastVideoSampleDTS and VideoTotalDuration, adjusts the sample DecodeTime and Dur, calls AddFullSampleToTrack, logs errors, and clears VideoFullSample so the pending sample is included in the segment before starting a new one. This ensures segments contain all frames up to (but not including) the keyframe that triggered the flush.
2026-02-11 11:38:51 +01:00
Cédric Verstraeten
891ae2e5d5 Merge pull request #230 from kerberos-io/feature/improve-video-format
feature/improve-video-format
2026-02-10 17:25:23 +01:00
Cédric Verstraeten
32b471f570 Update machinery/src/video/mp4.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-10 17:20:40 +01:00
Cédric Verstraeten
5d745fc989 Update machinery/src/video/mp4.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-10 17:20:29 +01:00
Cédric Verstraeten
edfa6ec4c6 Update machinery/src/video/mp4.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-10 17:20:16 +01:00
Cédric Verstraeten
0c460efea6 Refactor PR description workflow to include organization variable and correct pull request URL format 2026-02-10 16:17:10 +00:00
Cédric Verstraeten
96df049e59 Enhance MP4 initialization by adding max recording duration parameter, improving placeholder size calculation for segments. 2026-02-10 15:59:59 +00:00
Cédric Verstraeten
2cb454e618 Merge branch 'master' into feature/improve-video-format 2026-02-10 16:57:47 +01:00
Cédric Verstraeten
7f2ebb655e Fix sidx.FirstOffset calculation and re-encode init segment for accurate MP4 structure 2026-02-10 15:56:10 +00:00
Cédric Verstraeten
63857fb5cc Merge pull request #229 from kerberos-io/feature/improve-video-format
feature/improve-video-format
2026-02-10 16:53:34 +01:00
Cédric Verstraeten
f4c75f9aa9 Add environment variables for PR number and project name in workflow 2026-02-10 15:31:37 +00:00
Cédric Verstraeten
c3936dc884 Enhance MP4 segment handling by adding segment durations and base decode times, improving fragment management and data integrity 2026-02-10 14:47:47 +00:00
Cédric Verstraeten
2868ddc499 Add fragment duration handling and improve MP4 segment management 2026-02-10 13:52:58 +00:00
Cédric Verstraeten
176610a694 Update mp4.go 2026-02-10 13:39:55 +01:00
Cédric Verstraeten
f60aff4fd6 Enhance MP4 closing process by adding final video and audio samples, ensuring data integrity and updating track metadata 2026-02-10 12:45:46 +01:00
Cédric Verstraeten
847f62303a Merge pull request #228 from kerberos-io/feature/improve-webrtc-tracing
feature/improve-webrtc-tracing
2026-01-23 15:22:45 +01:00
Cédric Verstraeten
f174e2697e Enhance WebRTC handling with connection management and error logging improvements 2026-01-23 14:16:55 +00:00
Cédric Verstraeten
acac2d5d42 Refactor main function to improve code structure and readability 2026-01-23 13:48:24 +00:00
Cédric Verstraeten
f304c2ed3e Merge pull request #219 from kerberos-io/fix/release-process
fix/release-process
2025-09-17 16:32:58 +02:00
cedricve
2003a38cdc Add release creation workflow with multi-arch Docker builds and artifact handling 2025-09-17 14:32:06 +00:00
Cédric Verstraeten
a67c5a1f39 Merge pull request #216 from kerberos-io/feature/upgrade-build-process-avoid-base
feature/upgrade-build-process-avoid-base
2025-09-11 16:22:53 +02:00
Cédric Verstraeten
b7a87f95e5 Update Docker workflow to use Ubuntu 24.04 and simplify build steps for multi-arch images 2025-09-11 15:00:37 +02:00
Cédric Verstraeten
0aa0b8ad8f Refactor build steps in PR workflow to streamline Docker operations and improve artifact handling 2025-09-11 14:09:22 +02:00
Cédric Verstraeten
2bff868de6 Update upload artifact action to v4 in PR build workflow 2025-09-11 13:45:34 +02:00
Cédric Verstraeten
8b59828126 Add steps to strip binary and upload artifact in PR build workflow 2025-09-11 13:39:27 +02:00
Cédric Verstraeten
f55e25db07 Remove Golang build steps from Dockerfiles for amd64 and arm64 2025-09-11 10:29:05 +02:00
Cédric Verstraeten
243c969666 Add missing go version check in Dockerfile build step 2025-09-11 10:26:54 +02:00
Cédric Verstraeten
ec7f2e0303 Update ARM64 build step to specify Dockerfile for architecture 2025-09-11 10:18:19 +02:00
Cédric Verstraeten
a4a032d994 Update GitHub Actions workflow and Dockerfiles for architecture support and dependency management 2025-09-11 10:17:51 +02:00
Cédric Verstraeten
0a84744e49 Remove arm-v6 architecture from build matrix in PR workflow 2025-09-09 14:38:51 +00:00
Cédric Verstraeten
1425430376 Update .gitignore to include __debug* and change Dockerfile base image to golang:1.24.5-bullseye 2025-09-09 14:36:32 +00:00
Cédric Verstraeten
ca8d88ffce Update GitHub Actions workflow to support multiple architectures in build matrix 2025-09-09 14:34:39 +00:00
Cédric Verstraeten
af3f8bb639 Add GitHub Actions workflow for pull request builds and update Dockerfile dependencies 2025-09-09 16:28:19 +02:00
36 changed files with 4104 additions and 1000 deletions

View File

@@ -1,58 +0,0 @@
name: Docker development build
on:
push:
branches: [develop]
jobs:
build-amd64:
runs-on: ubuntu-latest
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: docker buildx imagetools create -t kerberos/agent-dev:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
- name: Create new and append to latest manifest
run: docker buildx imagetools create -t kerberos/agent-dev:latest kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
build-other:
runs-on: ubuntu-latest
strategy:
matrix:
#architecture: [arm64, arm/v7, arm/v6]
architecture: [arm64, arm/v7]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: docker buildx imagetools create --append -t kerberos/agent-dev:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
- name: Create new and append to manifest latest
run: docker buildx imagetools create --append -t kerberos/agent-dev:latest kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)

View File

@@ -1,120 +0,0 @@
name: Create a new release
on:
release:
types: [created]
workflow_dispatch:
inputs:
tag:
description: "Tag for the Docker image"
required: true
default: "test"
env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-latest
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}} --push .
- name: Create new and append to manifest
run: docker buildx imagetools create -t $REPO:${{ github.event.inputs.tag || github.ref_name }} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Create new and append to manifest latest
run: docker buildx imagetools create -t $REPO:latest $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
if: github.event.inputs.tag == 'test'
- name: Run Buildx with output
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-$(echo ${{matrix.architecture}} | tr / -)-${{github.event.inputs.tag || github.ref_name}} --output type=tar,dest=output-${{matrix.architecture}}.tar .
- name: Strip binary
run: mkdir -p output/ && tar -xf output-${{matrix.architecture}}.tar -C output && rm output-${{matrix.architecture}}.tar && cd output/ && tar -cf ../agent-${{matrix.architecture}}.tar -C home/agent . && rm -rf output
- name: Create a release
uses: ncipollo/release-action@v1
with:
latest: true
allowUpdates: true
name: ${{ github.event.inputs.tag || github.ref_name }}
tag: ${{ github.event.inputs.tag || github.ref_name }}
generateReleaseNotes: false
omitBodyDuringUpdate: true
artifacts: "agent-${{matrix.architecture}}.tar"
# Taken from GoReleaser's own release workflow.
# The available Snapcraft Action has some bugs described in the issue below.
# The mkdirs are a hack for https://github.com/goreleaser/goreleaser/issues/1715.
#- name: Setup Snapcraft
# run: |
# sudo apt-get update
# sudo apt-get -yq --no-install-suggests --no-install-recommends install snapcraft
# mkdir -p $HOME/.cache/snapcraft/download
# mkdir -p $HOME/.cache/snapcraft/stage-packages
#- name: Use Snapcraft
# run: tar -xf agent-${{matrix.architecture}}.tar && snapcraft
build-other:
runs-on: ubuntu-latest
permissions:
contents: write
needs: build-amd64
strategy:
matrix:
architecture: [arm64, arm-v7, arm-v6]
#architecture: [arm64, arm-v7]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}} --push .
- name: Create new and append to manifest
run: docker buildx imagetools create --append -t $REPO:${{ github.event.inputs.tag || github.ref_name }} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Create new and append to manifest latest
run: docker buildx imagetools create --append -t $REPO:latest $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
if: github.event.inputs.tag == 'test'
- name: Run Buildx with output
run: docker buildx build --platform linux/$(echo ${{matrix.architecture}} | tr - /) -t $REPO-arch:arch-$(echo ${{matrix.architecture}} | tr / -)-${{github.event.inputs.tag || github.ref_name}} --output type=tar,dest=output-${{matrix.architecture}}.tar .
- name: Strip binary
run: mkdir -p output/ && tar -xf output-${{matrix.architecture}}.tar -C output && rm output-${{matrix.architecture}}.tar && cd output/ && tar -cf ../agent-${{matrix.architecture}}.tar -C home/agent . && rm -rf output
- name: Create a release
uses: ncipollo/release-action@v1
with:
latest: true
allowUpdates: true
name: ${{ github.event.inputs.tag || github.ref_name }}
tag: ${{ github.event.inputs.tag || github.ref_name }}
generateReleaseNotes: false
omitBodyDuringUpdate: true
artifacts: "agent-${{matrix.architecture}}.tar"

View File

@@ -0,0 +1,51 @@
name: Create User Story Issue
on:
workflow_dispatch:
inputs:
issue_title:
description: 'Title for the issue'
required: true
issue_description:
description: 'Brief description of the feature'
required: true
complexity:
description: 'Complexity of the feature'
required: true
type: choice
options:
- 'Low'
- 'Medium'
- 'High'
default: 'Medium'
duration:
description: 'Estimated duration'
required: true
type: choice
options:
- '1 day'
- '3 days'
- '1 week'
- '2 weeks'
- '1 month'
default: '1 week'
jobs:
create-issue:
runs-on: ubuntu-latest
permissions:
issues: write
steps:
- name: Create Issue with User Story
uses: cedricve/llm-create-issue-user-story@main
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
azure_openai_api_key: ${{ secrets.AZURE_OPENAI_API_KEY }}
azure_openai_endpoint: ${{ secrets.AZURE_OPENAI_ENDPOINT }}
azure_openai_version: ${{ secrets.AZURE_OPENAI_VERSION }}
openai_model: ${{ secrets.OPENAI_MODEL }}
issue_title: ${{ github.event.inputs.issue_title }}
issue_description: ${{ github.event.inputs.issue_description }}
complexity: ${{ github.event.inputs.complexity }}
duration: ${{ github.event.inputs.duration }}
labels: 'user-story,feature'
assignees: ${{ github.actor }}

View File

@@ -1,12 +1,14 @@
name: Docker nightly build
name: Nightly build
on:
# Triggers the workflow every day at 9PM (CET).
schedule:
- cron: "0 22 * * *"
# Allows manual triggering from the Actions tab.
workflow_dispatch:
jobs:
build-amd64:
nightly-build-amd64:
runs-on: ubuntu-latest
strategy:
matrix:
@@ -18,7 +20,9 @@ jobs:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
run: git clone https://github.com/kerberos-io/agent && cd agent
uses: actions/checkout@v4
with:
ref: master
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
@@ -26,10 +30,10 @@ jobs:
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: cd agent && docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: cd agent && docker buildx imagetools create -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
build-other:
run: docker buildx imagetools create -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
nightly-build-other:
runs-on: ubuntu-latest
strategy:
matrix:
@@ -41,7 +45,9 @@ jobs:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
run: git clone https://github.com/kerberos-io/agent && cd agent
uses: actions/checkout@v4
with:
ref: master
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
@@ -49,6 +55,6 @@ jobs:
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: cd agent && docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: cd agent && docker buildx imagetools create --append -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
run: docker buildx imagetools create --append -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)

48
.github/workflows/pr-build.yml vendored Normal file
View File

@@ -0,0 +1,48 @@
name: Build pull request
on:
pull_request:
types: [opened, synchronize]
env:
REPO: kerberos/agent
jobs:
build:
runs-on: ${{ matrix.runner }}
permissions:
contents: write
strategy:
matrix:
include:
- architecture: amd64
runner: ubuntu-24.04
dockerfile: Dockerfile
- architecture: arm64
runner: ubuntu-24.04-arm
dockerfile: Dockerfile.arm64
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{ matrix.architecture }} -f ${{ matrix.dockerfile }} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar

View File

@@ -2,6 +2,11 @@ name: Autofill PR description
on: pull_request
env:
ORGANIZATION: uugai
PROJECT: ${{ github.event.repository.name }}
PR_NUMBER: ${{ github.event.number }}
jobs:
openai-pr-description:
runs-on: ubuntu-22.04
@@ -16,4 +21,6 @@ jobs:
azure_openai_api_key: ${{ secrets.AZURE_OPENAI_API_KEY }}
azure_openai_endpoint: ${{ secrets.AZURE_OPENAI_ENDPOINT }}
azure_openai_version: ${{ secrets.AZURE_OPENAI_VERSION }}
openai_model: ${{ secrets.OPENAI_MODEL }}
pull_request_url: https://pr${{ env.PR_NUMBER }}.api.kerberos.lol
overwrite_description: true

130
.github/workflows/release-create.yml vendored Normal file
View File

@@ -0,0 +1,130 @@
name: Create a new release
on:
release:
types: [created]
workflow_dispatch:
inputs:
tag:
description: "Tag for the Docker image"
required: true
default: "test"
env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-24.04
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build --provenance=false --build-arg VERSION=${{github.event.inputs.tag || github.ref_name}} -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Build and push Docker image
run: |
docker tag ${{matrix.architecture}} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
docker push $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
build-arm64:
runs-on: ubuntu-24.04-arm
permissions:
contents: write
strategy:
matrix:
architecture: [arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build --provenance=false --build-arg VERSION=${{github.event.inputs.tag || github.ref_name}} -t ${{matrix.architecture}} -f Dockerfile.arm64 .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Build and push Docker image
run: |
docker tag ${{matrix.architecture}} $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
docker push $REPO-arch:arch-${{matrix.architecture}}-${{github.event.inputs.tag || github.ref_name}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
create-manifest:
runs-on: ubuntu-24.04
needs: [build-amd64, build-arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Create and push multi-arch manifest
run: |
docker manifest create $REPO:${{ github.event.inputs.tag || github.ref_name }} \
$REPO-arch:arch-amd64-${{github.event.inputs.tag || github.ref_name}} \
$REPO-arch:arch-arm64-${{github.event.inputs.tag || github.ref_name}}
docker manifest push $REPO:${{ github.event.inputs.tag || github.ref_name }}
- name: Create and push latest manifest
run: |
docker manifest create $REPO:latest \
$REPO-arch:arch-amd64-${{github.event.inputs.tag || github.ref_name}} \
$REPO-arch:arch-arm64-${{github.event.inputs.tag || github.ref_name}}
docker manifest push $REPO:latest
if: github.event.inputs.tag == 'test'
create-release:
runs-on: ubuntu-24.04
needs: [build-amd64, build-arm64]
permissions:
contents: write
steps:
- name: Download all artifacts
uses: actions/download-artifact@v4
- name: Create a release
uses: ncipollo/release-action@v1
with:
latest: true
allowUpdates: true
name: ${{ github.event.inputs.tag || github.ref_name }}
tag: ${{ github.event.inputs.tag || github.ref_name }}
generateReleaseNotes: false
omitBodyDuringUpdate: true
artifacts: "agent-*.tar/agent-*.tar"

5
.gitignore vendored
View File

@@ -1,6 +1,8 @@
ui/node_modules
ui/build
ui/public/assets/env.js
.DS_Store
__debug*
.idea
machinery/www
yarn.lock
@@ -12,4 +14,5 @@ machinery/test*
machinery/init-dev.sh
machinery/.env.local
machinery/vendor
deployments/docker/private-docker-compose.yaml
deployments/docker/private-docker-compose.yaml
video.mp4

View File

@@ -1,7 +1,8 @@
ARG BASE_IMAGE_VERSION=70ec57e
ARG BASE_IMAGE_VERSION=amd64-ddbe40e
ARG VERSION=0.0.0
FROM kerberos/base:${BASE_IMAGE_VERSION} AS build-machinery
LABEL AUTHOR=Kerberos.io
LABEL AUTHOR=uug.ai
ENV GOROOT=/usr/local/go
ENV GOPATH=/go
@@ -34,7 +35,8 @@ RUN cat /go/src/github.com/kerberos-io/agent/machinery/version
RUN cd /go/src/github.com/kerberos-io/agent/machinery && \
go mod download && \
go build -tags timetzdata,netgo,osusergo --ldflags '-s -w -extldflags "-static -latomic"' main.go && \
VERSION=$(cd /go/src/github.com/kerberos-io/agent && git describe --tags --always 2>/dev/null || echo "${VERSION}") && \
go build -tags timetzdata,netgo,osusergo --ldflags "-s -w -X github.com/kerberos-io/agent/machinery/src/utils.VERSION=${VERSION} -extldflags '-static -latomic'" main.go && \
mkdir -p /agent && \
mv main /agent && \
mv version /agent && \
@@ -93,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

140
Dockerfile.arm64 Normal file
View File

@@ -0,0 +1,140 @@
ARG BASE_IMAGE_VERSION=arm64-ddbe40e
ARG VERSION=0.0.0
FROM kerberos/base:${BASE_IMAGE_VERSION} AS build-machinery
LABEL AUTHOR=uug.ai
ENV GOROOT=/usr/local/go
ENV GOPATH=/go
ENV PATH=$GOPATH/bin:$GOROOT/bin:/usr/local/lib:$PATH
ENV GOSUMDB=off
##########################################
# Installing some additional dependencies.
RUN apt-get upgrade -y && apt-get update && apt-get install -y --fix-missing --no-install-recommends \
git build-essential cmake pkg-config unzip libgtk2.0-dev \
curl ca-certificates libcurl4-openssl-dev libssl-dev libjpeg62-turbo-dev && \
rm -rf /var/lib/apt/lists/*
##############################################################################
# Copy all the relevant source code in the Docker image, so we can build this.
RUN mkdir -p /go/src/github.com/kerberos-io/agent
COPY machinery /go/src/github.com/kerberos-io/agent/machinery
RUN rm -rf /go/src/github.com/kerberos-io/agent/machinery/.env
##################################################################
# Get the latest commit hash, so we know which version we're running
COPY .git /go/src/github.com/kerberos-io/agent/.git
RUN cd /go/src/github.com/kerberos-io/agent/.git && git log --format="%H" -n 1 | head -c7 > /go/src/github.com/kerberos-io/agent/machinery/version
RUN cat /go/src/github.com/kerberos-io/agent/machinery/version
##################
# Build Machinery
RUN cd /go/src/github.com/kerberos-io/agent/machinery && \
go mod download && \
VERSION=$(cd /go/src/github.com/kerberos-io/agent && git describe --tags --always 2>/dev/null || echo "${VERSION}") && \
go build -tags timetzdata,netgo,osusergo --ldflags "-s -w -X github.com/kerberos-io/agent/machinery/src/utils.VERSION=${VERSION} -extldflags '-static -latomic'" main.go && \
mkdir -p /agent && \
mv main /agent && \
mv version /agent && \
mv data /agent && \
mkdir -p /agent/data/cloud && \
mkdir -p /agent/data/snapshots && \
mkdir -p /agent/data/log && \
mkdir -p /agent/data/recordings && \
mkdir -p /agent/data/capture-test && \
mkdir -p /agent/data/config
####################################
# Let's create a /dist folder containing just the files necessary for runtime.
# Later, it will be copied as the / (root) of the output image.
WORKDIR /dist
RUN cp -r /agent ./
####################################################################################
# This will collect dependent libraries so they're later copied to the final image.
RUN /dist/agent/main version
FROM node:18.14.0-alpine3.16 AS build-ui
RUN apk update && apk upgrade --available && sync
########################
# Build Web (React app)
RUN mkdir -p /go/src/github.com/kerberos-io/agent/machinery/www
COPY ui /go/src/github.com/kerberos-io/agent/ui
RUN cd /go/src/github.com/kerberos-io/agent/ui && rm -rf yarn.lock && yarn config set network-timeout 300000 && \
yarn && yarn build
####################################
# Let's create a /dist folder containing just the files necessary for runtime.
# Later, it will be copied as the / (root) of the output image.
WORKDIR /dist
RUN mkdir -p ./agent && cp -r /go/src/github.com/kerberos-io/agent/machinery/www ./agent/
############################################
# Publish main binary to GitHub release
FROM alpine:latest
############################
# Protect by non-root user.
RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent video
#################################
# Copy files from previous images
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent
RUN mv /agent/* /home/agent/
RUN /home/agent/main version
#######################
# Make template config
RUN cp /home/agent/data/config/config.json /home/agent/data/config.template.json
###########################
# Set permissions correctly
RUN chown -R agent:kerberosio /home/agent/data
RUN chown -R agent:kerberosio /home/agent/www
###########################
# Grant the necessary root capabilities to the process trying to bind to the privileged port
RUN apk add libcap && setcap 'cap_net_bind_service=+ep' /home/agent/main
###################
# Run non-root user
USER agent
######################################
# By default the app runs on port 80
EXPOSE 80
######################################
# Check if agent is still running
HEALTHCHECK CMD curl --fail http://localhost:80 || exit 1
###################################################
# Leeeeettttt'ssss goooooo!!!
# Run the shizzle from the right working directory.
WORKDIR /home/agent
CMD ["./main", "-action", "run", "-port", "80"]

View File

@@ -101,31 +101,35 @@ func main() {
switch action {
case "version":
log.Log.Info("main.Main(): You are currrently running Kerberos Agent " + VERSION)
{
log.Log.Info("main.Main(): You are currrently running Kerberos Agent " + VERSION)
}
case "discover":
// Convert duration to int
timeout, err := time.ParseDuration(timeout + "ms")
if err != nil {
log.Log.Fatal("main.Main(): could not parse timeout: " + err.Error())
return
{
// Convert duration to int
timeout, err := time.ParseDuration(timeout + "ms")
if err != nil {
log.Log.Fatal("main.Main(): could not parse timeout: " + err.Error())
return
}
onvif.Discover(timeout)
}
onvif.Discover(timeout)
case "decrypt":
log.Log.Info("main.Main(): Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1))
symmetricKey := []byte(flag.Arg(1))
{
log.Log.Info("main.Main(): Decrypting: " + flag.Arg(0) + " with key: " + flag.Arg(1))
symmetricKey := []byte(flag.Arg(1))
if symmetricKey == nil || len(symmetricKey) == 0 {
log.Log.Fatal("main.Main(): symmetric key should not be empty")
return
}
if len(symmetricKey) != 32 {
log.Log.Fatal("main.Main(): symmetric key should be 32 bytes")
return
}
if len(symmetricKey) == 0 {
log.Log.Fatal("main.Main(): symmetric key should not be empty")
return
}
if len(symmetricKey) != 32 {
log.Log.Fatal("main.Main(): symmetric key should be 32 bytes")
return
}
utils.Decrypt(flag.Arg(0), symmetricKey)
utils.Decrypt(flag.Arg(0), symmetricKey)
}
case "run":
{
@@ -213,6 +217,8 @@ func main() {
routers.StartWebserver(configDirectory, &configuration, &communication, &capture)
}
default:
log.Log.Error("main.Main(): Sorry I don't understand :(")
{
log.Log.Error("main.Main(): Sorry I don't understand :(")
}
}
}

View File

@@ -85,12 +85,8 @@ type Golibrtsp struct {
Streams []packets.Stream
// FPS calculation fields
lastFrameTime time.Time
frameTimeBuffer []time.Duration
frameBufferSize int
frameBufferIndex int
fpsMutex sync.Mutex
// Per-stream FPS calculation (keyed by stream index)
fpsTrackers map[int8]*fpsTracker
// I-frame interval tracking fields
packetsSinceLastKeyframe int
@@ -101,6 +97,78 @@ type Golibrtsp struct {
keyframeMutex sync.Mutex
}
// fpsTracker holds per-stream state for PTS-based FPS calculation.
// Each video stream (H264 / H265) gets its own tracker so PTS
// samples from different codecs never interleave.
type fpsTracker struct {
mu sync.Mutex
lastPTS time.Duration
hasPTS bool
frameTimeBuffer []time.Duration
bufferSize int
bufferIndex int
cachedFPS float64 // latest computed FPS
}
func newFPSTracker(bufferSize int) *fpsTracker {
return &fpsTracker{
frameTimeBuffer: make([]time.Duration, bufferSize),
bufferSize: bufferSize,
}
}
// update records a new PTS sample and returns the latest FPS estimate.
// It must be called once per complete decoded frame (after Decode()
// succeeds), not on every RTP packet fragment.
func (ft *fpsTracker) update(pts time.Duration) float64 {
ft.mu.Lock()
defer ft.mu.Unlock()
if !ft.hasPTS {
ft.lastPTS = pts
ft.hasPTS = true
return 0
}
interval := pts - ft.lastPTS
ft.lastPTS = pts
// Skip invalid intervals (zero, negative, or very large which
// indicate a PTS discontinuity or wrap).
if interval <= 0 || interval > 5*time.Second {
return ft.cachedFPS
}
ft.frameTimeBuffer[ft.bufferIndex] = interval
ft.bufferIndex = (ft.bufferIndex + 1) % ft.bufferSize
var totalInterval time.Duration
validSamples := 0
for _, iv := range ft.frameTimeBuffer {
if iv > 0 {
totalInterval += iv
validSamples++
}
}
if validSamples == 0 {
return ft.cachedFPS
}
avgInterval := totalInterval / time.Duration(validSamples)
if avgInterval == 0 {
return ft.cachedFPS
}
ft.cachedFPS = float64(time.Second) / float64(avgInterval)
return ft.cachedFPS
}
// fps returns the most recent FPS estimate without recording a new sample.
func (ft *fpsTracker) fps() float64 {
ft.mu.Lock()
defer ft.mu.Unlock()
return ft.cachedFPS
}
// Init function
var H264FrameDecoder *Decoder
var H265FrameDecoder *Decoder
@@ -548,18 +616,17 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
if len(rtppkt.Payload) > 0 {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
// decode timestamps — validate each call separately
pts, okPTS := g.Client.PacketPTS(g.VideoH264Media, rtppkt)
pts2, okPTS2 := g.Client.PacketPTS2(g.VideoH264Media, rtppkt)
if !okPTS2 {
log.Log.Debug("capture.golibrtsp.Start(): unable to get PTS2 from PacketPTS2")
return
}
// Extract access units from RTP packets
// We need to do this, because the decoder expects a full
// access unit. Once we have a full access unit, we can
// decode it, and know if it's a keyframe or not.
// Extract access units from RTP packets.
// We need a complete access unit to determine whether
// this is a keyframe.
au, errDecode := g.VideoH264Decoder.Decode(rtppkt)
if errDecode != nil {
if errDecode != rtph264.ErrNonStartingPacketAndNoPrevious && errDecode != rtph264.ErrMorePacketsNeeded {
@@ -568,6 +635,18 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}
// Frame is complete — update per-stream FPS from PTS.
if okPTS {
ft := g.fpsTrackers[g.VideoH264Index]
if ft == nil {
ft = newFPSTracker(30)
g.fpsTrackers[g.VideoH264Index] = ft
}
if ptsFPS := ft.update(pts); ptsFPS > 0 && ptsFPS <= 120 {
g.Streams[g.VideoH264Index].FPS = ptsFPS
}
}
// We'll need to read out a few things.
// prepend an AUD. This is required by some players
filteredAU = [][]byte{
@@ -578,8 +657,10 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
nonIDRPresent := false
idrPresent := false
var naluTypes []string
for _, nalu := range au {
typ := h264.NALUType(nalu[0] & 0x1F)
naluTypes = append(naluTypes, fmt.Sprintf("%s(%d,sz=%d)", typ.String(), int(typ), len(nalu)))
switch typ {
case h264.NALUTypeAccessUnitDelimiter:
continue
@@ -614,18 +695,46 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
g.Streams[g.VideoH264Index].FPS = fps
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): Final FPS=%.2f", streamType, fps))
g.VideoH264Forma.SPS = nalu
if streamType == "main" && len(nalu) > 0 {
// Fallback: store SPS from in-band NALUs when SDP was missing it.
configuration.Config.Capture.IPCamera.SPSNALUs = [][]byte{nalu}
}
}
case h264.NALUTypePPS:
g.VideoH264Forma.PPS = nalu
if streamType == "main" && len(nalu) > 0 {
// Fallback: store PPS from in-band NALUs when SDP was missing it.
configuration.Config.Capture.IPCamera.PPSNALUs = [][]byte{nalu}
}
}
filteredAU = append(filteredAU, nalu)
}
if idrPresent && streamType == "main" {
// Ensure config has parameter sets before recordings start.
if len(configuration.Config.Capture.IPCamera.SPSNALUs) == 0 && len(g.VideoH264Forma.SPS) > 0 {
configuration.Config.Capture.IPCamera.SPSNALUs = [][]byte{g.VideoH264Forma.SPS}
log.Log.Warning("capture.golibrtsp.Start(main): fallback SPS set from keyframe")
}
if len(configuration.Config.Capture.IPCamera.PPSNALUs) == 0 && len(g.VideoH264Forma.PPS) > 0 {
configuration.Config.Capture.IPCamera.PPSNALUs = [][]byte{g.VideoH264Forma.PPS}
log.Log.Warning("capture.golibrtsp.Start(main): fallback PPS set from keyframe")
}
if len(configuration.Config.Capture.IPCamera.SPSNALUs) == 0 || len(configuration.Config.Capture.IPCamera.PPSNALUs) == 0 {
log.Log.Warning("capture.golibrtsp.Start(main): SPS/PPS still missing after IDR keyframe")
}
}
if len(filteredAU) <= 1 || (!nonIDRPresent && !idrPresent) {
return
}
if idrPresent {
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): IDR frame NALUs: [%s]",
streamType, fmt.Sprintf("%v", naluTypes)))
}
// Convert to packet.
enc, err := h264.AnnexBMarshal(filteredAU)
if err != nil {
@@ -651,7 +760,11 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
keyframeInterval := g.trackKeyframeInterval(idrPresent)
if idrPresent && keyframeInterval > 0 {
avgInterval := g.getAverageKeyframeInterval()
gopDuration := float64(keyframeInterval) / g.Streams[g.VideoH265Index].FPS
fps := g.Streams[g.VideoH264Index].FPS
if fps <= 0 {
fps = 25.0 // Default fallback FPS
}
gopDuration := float64(keyframeInterval) / fps
gopSize := int(avgInterval) // Store GOP size in a separate variable
g.Streams[g.VideoH264Index].GopSize = gopSize
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): Keyframe interval=%d packets, Avg=%.1f, GOP=%.1fs, GOPSize=%d",
@@ -716,18 +829,17 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
if len(rtppkt.Payload) > 0 {
// decode timestamp
pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt)
pts2, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt)
if !ok {
log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS")
// decode timestamps — validate each call separately
pts, okPTS := g.Client.PacketPTS(g.VideoH265Media, rtppkt)
pts2, okPTS2 := g.Client.PacketPTS2(g.VideoH265Media, rtppkt)
if !okPTS2 {
log.Log.Debug("capture.golibrtsp.Start(): unable to get PTS")
return
}
// Extract access units from RTP packets
// We need to do this, because the decoder expects a full
// access unit. Once we have a full access unit, we can
// decode it, and know if it's a keyframe or not.
// Extract access units from RTP packets.
// We need a complete access unit to determine whether
// this is a keyframe.
au, errDecode := g.VideoH265Decoder.Decode(rtppkt)
if errDecode != nil {
if errDecode != rtph265.ErrNonStartingPacketAndNoPrevious && errDecode != rtph265.ErrMorePacketsNeeded {
@@ -736,6 +848,18 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
return
}
// Frame is complete — update per-stream FPS from PTS.
if okPTS {
ft := g.fpsTrackers[g.VideoH265Index]
if ft == nil {
ft = newFPSTracker(30)
g.fpsTrackers[g.VideoH265Index] = ft
}
if ptsFPS := ft.update(pts); ptsFPS > 0 && ptsFPS <= 120 {
g.Streams[g.VideoH265Index].FPS = ptsFPS
}
}
filteredAU = [][]byte{
{byte(h265.NALUType_AUD_NUT) << 1, 1, 0x50},
}
@@ -796,7 +920,11 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
keyframeInterval := g.trackKeyframeInterval(isRandomAccess)
if isRandomAccess && keyframeInterval > 0 {
avgInterval := g.getAverageKeyframeInterval()
gopDuration := float64(keyframeInterval) / g.Streams[g.VideoH265Index].FPS
fps := g.Streams[g.VideoH265Index].FPS
if fps <= 0 {
fps = 25.0 // Default fallback FPS
}
gopDuration := float64(keyframeInterval) / fps
gopSize := int(avgInterval) // Store GOP size in a separate variable
g.Streams[g.VideoH265Index].GopSize = gopSize
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): Keyframe interval=%d packets, Avg=%.1f, GOP=%.1fs, GOPSize=%d",
@@ -1179,10 +1307,11 @@ func WriteMPEG4Audio(forma *format.MPEG4Audio, aus [][]byte) ([]byte, error) {
// Initialize FPS calculation buffers
func (g *Golibrtsp) initFPSCalculation() {
g.frameBufferSize = 30 // Store last 30 frame intervals
g.frameTimeBuffer = make([]time.Duration, g.frameBufferSize)
g.frameBufferIndex = 0
g.lastFrameTime = time.Time{}
// Ensure the per-stream FPS trackers map exists. Individual trackers
// can be created lazily when a given stream index is first used.
if g.fpsTrackers == nil {
g.fpsTrackers = make(map[int8]*fpsTracker)
}
// Initialize I-frame interval tracking
g.keyframeBufferSize = 10 // Store last 10 keyframe intervals
@@ -1192,50 +1321,11 @@ func (g *Golibrtsp) initFPSCalculation() {
g.lastKeyframePacketCount = 0
}
// Calculate FPS from frame timestamps
func (g *Golibrtsp) calculateFPSFromTimestamps() float64 {
g.fpsMutex.Lock()
defer g.fpsMutex.Unlock()
if g.lastFrameTime.IsZero() {
g.lastFrameTime = time.Now()
return 0
}
now := time.Now()
interval := now.Sub(g.lastFrameTime)
g.lastFrameTime = now
// Store the interval
g.frameTimeBuffer[g.frameBufferIndex] = interval
g.frameBufferIndex = (g.frameBufferIndex + 1) % g.frameBufferSize
// Calculate average FPS from stored intervals
var totalInterval time.Duration
validSamples := 0
for _, interval := range g.frameTimeBuffer {
if interval > 0 {
totalInterval += interval
validSamples++
}
}
if validSamples == 0 {
return 0
}
avgInterval := totalInterval / time.Duration(validSamples)
if avgInterval == 0 {
return 0
}
return float64(time.Second) / float64(avgInterval)
}
// Get enhanced FPS information from SPS with fallback
// Get enhanced FPS information from SPS with fallback to PTS-based calculation.
// The PTS-based FPS is computed per completed frame via fpsTracker.update(),
// so by the time this is called we already have a good estimate.
func (g *Golibrtsp) getEnhancedFPS(sps *h264.SPS, streamIndex int8) float64 {
// First try to get FPS from SPS
// First try to get FPS from SPS VUI parameters
spsFPS := sps.FPS()
// Check if SPS FPS is reasonable (between 1 and 120 fps)
@@ -1244,11 +1334,13 @@ func (g *Golibrtsp) getEnhancedFPS(sps *h264.SPS, streamIndex int8) float64 {
return spsFPS
}
// Fallback to timestamp-based calculation
timestampFPS := g.calculateFPSFromTimestamps()
if timestampFPS > 0 && timestampFPS <= 120 {
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.getEnhancedFPS(): Timestamp FPS: %.2f", timestampFPS))
return timestampFPS
// Fallback to PTS-based FPS (already calculated per-frame)
if ft := g.fpsTrackers[streamIndex]; ft != nil {
ptsFPS := ft.fps()
if ptsFPS > 0 && ptsFPS <= 120 {
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.getEnhancedFPS(): PTS FPS: %.2f", ptsFPS))
return ptsFPS
}
}
// Return SPS FPS even if it seems unreasonable, or default

View File

@@ -159,6 +159,19 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
}
// Close mp4
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
}
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
}
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
}
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
log.Log.Warning("capture.main.HandleRecordStream(continuous): closing MP4 without full parameter sets, moov may be incomplete")
}
mp4Video.Close(&config)
log.Log.Info("capture.main.HandleRecordStream(continuous): recording finished: file save: " + name)
@@ -279,8 +292,11 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
ppsNALUS := configuration.Config.Capture.IPCamera.PPSNALUs
vpsNALUS := configuration.Config.Capture.IPCamera.VPSNALUs
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
log.Log.Warning("capture.main.HandleRecordStream(continuous): missing SPS/PPS at recording start")
}
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS)
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
@@ -499,21 +515,11 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
ppsNALUS := configuration.Config.Capture.IPCamera.PPSNALUs
vpsNALUS := configuration.Config.Capture.IPCamera.VPSNALUs
// Create a video file, and set the dimensions.
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): missing SPS/PPS at recording start")
}
// Create the MP4 only once the first keyframe arrives.
var mp4Video *video.MP4
for cursorError == nil {
@@ -532,7 +538,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
default:
}
if (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
if start && (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
log.Log.Info("capture.main.HandleRecordStream(motiondetection): timestamp+postRecording-now < 0 - " + strconv.FormatInt(motionTimestamp+postRecording-now, 10) + " < 0")
log.Log.Info("capture.main.HandleRecordStream(motiondetection): now-startRecording > maxRecordingPeriod-500 - " + strconv.FormatInt(now-startRecording, 10) + " > " + strconv.FormatInt(maxRecordingPeriod-500, 10))
log.Log.Info("capture.main.HandleRecordStream(motiondetection): closing recording (timestamp: " + strconv.FormatInt(motionTimestamp, 10) + ", postRecording: " + strconv.FormatInt(postRecording, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
@@ -542,20 +548,44 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// We start the recording if we have a keyframe and the last duration is 0 or less than the current packet time.
// It could be start we start from the beginning of the recording.
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): write frames")
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): recording started on keyframe")
// Align duration timers with the first keyframe.
startRecording = pkt.CurrentTime
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
start = true
}
if start {
pts := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add video sample")
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.IsAudio {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add audio sample")
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
@@ -573,7 +603,25 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// This is used to determine if we need to start a new recording.
lastRecordingTime = pkt.CurrentTime
if mp4Video == nil {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): recording closed without keyframe; no MP4 created")
continue
}
// This will close the recording and write the last packet.
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
}
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
}
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
}
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): closing MP4 without full parameter sets, moov may be incomplete")
}
mp4Video.Close(&config)
log.Log.Info("capture.main.HandleRecordStream(motiondetection): file save: " + name)

View File

@@ -800,11 +800,19 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
// Create per-peer broadcasters instead of shared tracks.
// Each viewer gets its own track with independent, non-blocking writes
// so a slow/congested peer cannot stall the others.
streams, _ := rtspClient.GetStreams()
videoTrack := webrtc.NewVideoTrack(streams)
audioTrack := webrtc.NewAudioTrack(streams)
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
videoBroadcaster := webrtc.NewVideoBroadcaster(streams)
audioBroadcaster := webrtc.NewAudioBroadcaster(streams)
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio broadcasters")
return
}
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, rtspClient)
if config.Capture.ForwardWebRTC == "true" {
@@ -812,7 +820,7 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Info("cloud.HandleLiveStreamHD(): Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("cloud.HandleLiveStreamHD(): setting up a peer connection.")
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, handshake)
}
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/packets"
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
"github.com/tevino/abool"
)
@@ -303,7 +304,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}
// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10)
communication.HandleLiveHDHandshake = make(chan models.LiveHDHandshake, 100)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
@@ -552,6 +553,11 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
// The total number of recordings stored in the directory.
recordingDirectory := configDirectory + "/data/recordings"
numberOfRecordings := utils.NumberOfMP4sInDirectory(recordingDirectory)
activeWebRTCReaders := webrtc.GetActivePeerConnectionCount()
pendingWebRTCHandshakes := 0
if communication.HandleLiveHDHandshake != nil {
pendingWebRTCHandshakes = len(communication.HandleLiveHDHandshake)
}
// All days stored in this agent.
days := []string{}
@@ -574,6 +580,8 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
"cameraOnline": cameraIsOnline,
"cloudOnline": cloudIsOnline,
"numberOfRecordings": numberOfRecordings,
"webrtcReaders": activeWebRTCReaders,
"webrtcPending": pendingWebRTCHandshakes,
"days": days,
"latestEvents": latestEvents,
})

View File

@@ -8,6 +8,17 @@ import (
"github.com/tevino/abool"
)
type LiveHDSignalingCallbacks struct {
SendAnswer func(sessionID string, sdp string) error
SendCandidate func(sessionID string, candidate string) error
SendError func(sessionID string, message string) error
}
type LiveHDHandshake struct {
Payload RequestHDStreamPayload
Signaling *LiveHDSignalingCallbacks
}
// The communication struct that is managing
// all the communication between the different goroutines.
type Communication struct {
@@ -27,7 +38,7 @@ type Communication struct {
HandleHeartBeat chan string
HandleLiveSD chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan RequestHDStreamPayload
HandleLiveHDHandshake chan LiveHDHandshake
HandleLiveHDPeers chan string
HandleONVIF chan OnvifAction
IsConfiguring *abool.AtomicBool

View File

@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.GET("/config", func(c *gin.Context) {
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.POST("/config", func(c *gin.Context) {
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
api := r.Group("/api")
{
// Public endpoints (no authentication required)
api.POST("/login", authMiddleware.LoginHandler)
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods. Doesn't require any authorization.
// These are available for anyone, but require the agent, to reach
// the camera.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods. Doesn't require any authorization.
// Will verify the current onvif settings.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
// Secured endpoints..
// Apply JWT authentication middleware.
// All routes registered below this line require a valid JWT token.
api.Use(authMiddleware.MiddlewareFunc())
{
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
}
}
return api

View File

@@ -90,10 +90,32 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(true)
//opts.SetCleanSession(true)
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
opts.SetStore(mqtt.NewMemoryStore())
opts.SetConnectRetry(true)
//opts.SetAutoReconnect(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetMaxReconnectInterval(1 * time.Minute)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetWriteTimeout(10 * time.Second)
opts.SetOrderMatters(false)
opts.SetConnectTimeout(30 * time.Second)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
if err != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error())
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost")
}
})
opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker")
})
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): MQTT session is online")
})
hubKey := ""
// This is the old way ;)
@@ -123,7 +145,6 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
opts.SetClientID(mqttClientID)
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): Set ClientID " + mqttClientID)
rand.Seed(time.Now().UnixNano())
webrtc.CandidateArrays = make(map[string](chan string))
opts.OnConnect = func(c mqtt.Client) {
// We managed to connect to the MQTT broker, hurray!
@@ -134,10 +155,14 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
}
}
mqc := mqtt.NewClient(opts)
if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) {
if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): initial MQTT connection established")
}
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection")
}
return mqc
}
@@ -145,12 +170,18 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
return nil
}
// maxSignalingAge is the maximum age of a WebRTC signaling message (request-hd-stream,
// receive-hd-candidates) before it is considered stale and discarded. With CleanSession=false
// the MQTT broker may replay queued messages from previous sessions; this prevents the agent
// from setting up peer connections for viewers that are no longer waiting.
const maxSignalingAge = 30 * time.Second
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey)
mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
// Decode the message, we are expecting following format.
// {
@@ -250,6 +281,18 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
// We'll find out which message we received, and act accordingly.
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): received message with action: " + payload.Action)
// For time-sensitive WebRTC signaling messages, discard stale ones that may
// have been queued by the broker while CleanSession=false.
if payload.Action == "request-hd-stream" || payload.Action == "receive-hd-candidates" {
messageAge := time.Since(time.Unix(message.Timestamp, 0))
if messageAge > maxSignalingAge {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): discarding stale " + payload.Action +
" message (age: " + messageAge.Round(time.Second).String() + ")")
return
}
}
switch payload.Action {
case "record":
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
@@ -277,6 +320,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
}
})
if token.WaitTimeout(10 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener)
}
} else {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener)
}
}
}
@@ -485,9 +538,13 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models
if communication.CameraConnected {
// Set the Hub key, so we can send back the answer.
requestHDStreamPayload.HubKey = hubKey
select {
case communication.HandleLiveHDHandshake <- requestHDStreamPayload:
default:
if communication.HandleLiveHDHandshake == nil {
log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request")
return
}
communication.HandleLiveHDHandshake <- models.LiveHDHandshake{
Payload: requestHDStreamPayload,
}
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.")
} else {

View File

@@ -6,6 +6,7 @@ import (
"image"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@@ -14,6 +15,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
)
type Message struct {
@@ -28,6 +30,23 @@ type Connection struct {
Cancels map[string]context.CancelFunc
}
func writeWebRTCError(connection *Connection, clientID string, sessionID string, errorMessage string) {
if connection == nil {
return
}
if err := connection.WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-error",
Message: map[string]string{
"session_id": sessionID,
"message": errorMessage,
},
}); err != nil {
log.Log.Error("routers.websocket.main.writeWebRTCError(): " + err.Error())
}
}
// Concurrency handling - sending messages
func (c *Connection) WriteJson(message Message) error {
c.mu.Lock()
@@ -115,6 +134,82 @@ func WebsocketHandler(c *gin.Context, configuration *models.Configuration, commu
go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice)
}
}
case "stream-hd":
sessionID := message.Message["session_id"]
sessionDescription := message.Message["sdp"]
if sessionID == "" || sessionDescription == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or sdp")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
if communication.HandleLiveHDHandshake == nil {
writeWebRTCError(sockets[clientID], clientID, sessionID, "webrtc liveview is not available")
break
}
handshake := models.LiveHDHandshake{
Payload: models.RequestHDStreamPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
SessionDescription: sessionDescription,
},
Signaling: &models.LiveHDSignalingCallbacks{
SendAnswer: func(callbackSessionID string, sdp string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-answer",
Message: map[string]string{
"session_id": callbackSessionID,
"sdp": sdp,
},
})
},
SendCandidate: func(callbackSessionID string, candidate string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-candidate",
Message: map[string]string{
"session_id": callbackSessionID,
"candidate": candidate,
},
})
},
SendError: func(callbackSessionID string, errorMessage string) error {
writeWebRTCError(sockets[clientID], clientID, callbackSessionID, errorMessage)
return nil
},
},
}
communication.HandleLiveHDHandshake <- handshake
case "webrtc-candidate":
sessionID := message.Message["session_id"]
candidate := message.Message["candidate"]
if sessionID == "" || candidate == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or candidate")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
key := configuration.Config.Key + "/" + sessionID
go webrtc.RegisterCandidates(key, models.ReceiveHDCandidatesPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
Candidate: candidate,
})
}
err = conn.ReadJSON(&message)

View File

@@ -25,7 +25,11 @@ import (
"github.com/nfnt/resize"
)
const VERSION = "3.5.0"
// VERSION is the agent version. It defaults to "0.0.0" for local dev builds
// and is overridden at build time via:
//
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
var VERSION = "0.0.0"
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@@ -195,6 +199,13 @@ func GetMediaFormatted(files []os.FileInfo, recordingDirectory string, configura
timestampInt, err := strconv.ParseInt(timestamp, 10, 64)
if err == nil {
if eventFilter.TimestampOffsetStart > 0 {
// TimestampOffsetStart represents the newest lower bound to include.
if timestampInt < eventFilter.TimestampOffsetStart {
continue
}
}
// If we have an offset we will check if we should skip or not
if eventFilter.TimestampOffsetEnd > 0 {
// Medias are sorted from new to older. TimestampOffsetEnd holds the oldest

View File

@@ -0,0 +1,54 @@
package utils
import (
"os"
"testing"
"time"
"github.com/kerberos-io/agent/machinery/src/models"
)
type stubFileInfo struct {
name string
}
func (s stubFileInfo) Name() string { return s.name }
func (s stubFileInfo) Size() int64 { return 0 }
func (s stubFileInfo) Mode() os.FileMode { return 0 }
func (s stubFileInfo) ModTime() time.Time { return time.Unix(0, 0) }
func (s stubFileInfo) IsDir() bool { return false }
func (s stubFileInfo) Sys() interface{} { return nil }
func TestGetMediaFormattedHonorsTimestampRange(t *testing.T) {
configuration := &models.Configuration{}
configuration.Config.Timezone = "UTC"
configuration.Config.Name = "Front Door"
configuration.Config.Key = "camera-1"
files := []os.FileInfo{
stubFileInfo{name: "1700000200_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000100_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000000_6_7_8_9_10.mp4"},
}
media := GetMediaFormatted(files, "/tmp/recordings", configuration, models.EventFilter{
TimestampOffsetStart: 1700000050,
TimestampOffsetEnd: 1700000200,
NumberOfElements: 10,
})
if len(media) != 1 {
t.Fatalf("expected 1 media item in time range, got %d", len(media))
}
if media[0].Timestamp != "1700000100" {
t.Fatalf("expected timestamp 1700000100, got %s", media[0].Timestamp)
}
if media[0].CameraName != "Front Door" {
t.Fatalf("expected camera name to be preserved, got %s", media[0].CameraName)
}
if media[0].CameraKey != "camera-1" {
t.Fatalf("expected camera key to be preserved, got %s", media[0].CameraKey)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,176 @@
package video
import (
"fmt"
"os"
"testing"
mp4ff "github.com/Eyevinn/mp4ff/mp4"
"github.com/kerberos-io/agent/machinery/src/models"
)
// TestMP4Duration creates an MP4 file simulating a 5-second video recording
// and verifies that the durations in all boxes match the sum of sample durations.
func TestMP4Duration(t *testing.T) {
tmpFile := "/tmp/test_duration.mp4"
defer os.Remove(tmpFile)
// Minimal SPS for H.264 (baseline, 640x480) - proper Annex B format with start code
sps := []byte{0x67, 0x42, 0xc0, 0x1e, 0xd9, 0x00, 0xa0, 0x47, 0xfe, 0xc8}
pps := []byte{0x68, 0xce, 0x38, 0x80}
mp4Video := NewMP4(tmpFile, [][]byte{sps}, [][]byte{pps}, nil, 10)
mp4Video.SetWidth(640)
mp4Video.SetHeight(480)
videoTrack := mp4Video.AddVideoTrack("H264")
// Simulate 5 seconds at 25fps (200 frames, keyframe every 50 frames = 2s)
// PTS in milliseconds (timescale=1000)
frameDuration := uint64(40) // 40ms per frame = 25fps
numFrames := 150
gopSize := 50
// Create a fake Annex B NAL unit (keyframe IDR = type 5, non-keyframe = type 1)
makeFrame := func(isKey bool) []byte {
nalType := byte(0x01) // non-IDR slice
if isKey {
nalType = 0x65 // IDR slice
}
// Start code (4 bytes) + NAL header + some data
frame := []byte{0x00, 0x00, 0x00, 0x01, nalType}
// Add some padding data
for i := 0; i < 100; i++ {
frame = append(frame, byte(i))
}
return frame
}
var expectedDuration uint64
for i := 0; i < numFrames; i++ {
pts := uint64(i) * frameDuration
isKeyframe := i%gopSize == 0
err := mp4Video.AddSampleToTrack(videoTrack, isKeyframe, makeFrame(isKeyframe), pts)
if err != nil {
t.Fatalf("AddSampleToTrack failed at frame %d: %v", i, err)
}
}
expectedDuration = uint64(numFrames) * frameDuration // Should be 6000ms (150 * 40)
// Close with config that has signing key to avoid nil panics
config := &models.Config{
Signing: &models.Signing{
PrivateKey: "",
},
}
mp4Video.Close(config)
// Log what the code computed
t.Logf("VideoTotalDuration: %d ms", mp4Video.VideoTotalDuration)
t.Logf("Expected duration: %d ms", expectedDuration)
t.Logf("Segments: %d", len(mp4Video.SegmentDurations))
var sumSegDur uint64
for i, d := range mp4Video.SegmentDurations {
t.Logf(" Segment %d: duration=%d ms", i, d)
sumSegDur += d
}
t.Logf("Sum of segment durations: %d ms", sumSegDur)
// Now read back the file and inspect the boxes
f, err := os.Open(tmpFile)
if err != nil {
t.Fatalf("Failed to open output file: %v", err)
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
t.Fatalf("Failed to stat output file: %v", err)
}
parsedFile, err := mp4ff.DecodeFile(f)
if err != nil {
t.Fatalf("Failed to decode MP4: %v", err)
}
t.Logf("File size: %d bytes", fi.Size())
// Check moov box
if parsedFile.Moov == nil {
t.Fatal("No moov box found")
}
// Check mvhd duration
mvhd := parsedFile.Moov.Mvhd
t.Logf("mvhd.Duration: %d (timescale=%d) = %.2f seconds", mvhd.Duration, mvhd.Timescale, float64(mvhd.Duration)/float64(mvhd.Timescale))
t.Logf("mvhd.Rate: 0x%08x", mvhd.Rate)
t.Logf("mvhd.Volume: 0x%04x", mvhd.Volume)
// Check each trak
for i, trak := range parsedFile.Moov.Traks {
t.Logf("Track %d:", i)
t.Logf(" tkhd.Duration: %d", trak.Tkhd.Duration)
t.Logf(" mdhd.Duration: %d (timescale=%d) = %.2f seconds", trak.Mdia.Mdhd.Duration, trak.Mdia.Mdhd.Timescale, float64(trak.Mdia.Mdhd.Duration)/float64(trak.Mdia.Mdhd.Timescale))
}
// Check mvex/mehd
if parsedFile.Moov.Mvex != nil && parsedFile.Moov.Mvex.Mehd != nil {
t.Logf("mehd.FragmentDuration: %d", parsedFile.Moov.Mvex.Mehd.FragmentDuration)
}
// Sum up actual sample durations from trun boxes in all segments
var actualTrunDuration uint64
var sampleCount int
for _, seg := range parsedFile.Segments {
for _, frag := range seg.Fragments {
for _, traf := range frag.Moof.Trafs {
// Only count video track (track 1)
if traf.Tfhd.TrackID == 1 {
for _, trun := range traf.Truns {
for _, s := range trun.Samples {
actualTrunDuration += uint64(s.Dur)
sampleCount++
}
}
}
}
}
}
t.Logf("Actual trun sample count: %d", sampleCount)
t.Logf("Actual trun total duration: %d ms", actualTrunDuration)
// Check sidx
if parsedFile.Sidx != nil {
var sidxDuration uint64
for _, ref := range parsedFile.Sidx.SidxRefs {
sidxDuration += uint64(ref.SubSegmentDuration)
}
t.Logf("sidx total duration: %d ms", sidxDuration)
}
// VERIFY: All duration values should be consistent
// The expected duration for 150 frames at 40ms each:
// - The sample-buffering pattern means the LAST sample uses LastVideoSampleDTS as duration
// - So all 150 samples should produce 150 * 40ms = 6000ms total
// But due to the pending sample pattern, the actual trun durations might differ
fmt.Println()
fmt.Println("=== DURATION CONSISTENCY CHECK ===")
fmt.Printf("Expected (150 * 40ms): %d ms\n", expectedDuration)
fmt.Printf("mvhd.Duration: %d ms\n", mvhd.Duration)
fmt.Printf("tkhd.Duration: %d ms\n", parsedFile.Moov.Traks[0].Tkhd.Duration)
fmt.Printf("mdhd.Duration: %d ms\n", parsedFile.Moov.Traks[0].Mdia.Mdhd.Duration)
fmt.Printf("Actual trun durations sum: %d ms\n", actualTrunDuration)
fmt.Printf("VideoTotalDuration: %d ms\n", mp4Video.VideoTotalDuration)
fmt.Printf("Sum of SegmentDurations: %d ms\n", sumSegDur)
fmt.Println()
// The key assertion: header duration must equal trun sum
if mvhd.Duration != actualTrunDuration {
t.Errorf("MISMATCH: mvhd.Duration (%d) != actual trun sum (%d), diff = %d ms",
mvhd.Duration, actualTrunDuration, int64(mvhd.Duration)-int64(actualTrunDuration))
}
if parsedFile.Moov.Traks[0].Mdia.Mdhd.Duration != 0 {
t.Errorf("MISMATCH: mdhd.Duration should be 0 for fragmented MP4, got %d",
parsedFile.Moov.Traks[0].Mdia.Mdhd.Duration)
}
}

View File

@@ -0,0 +1,270 @@
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
// Build with: go build -tags ffmpeg ...
//
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
// and an AAC decoder compiled into the FFmpeg build (usually the default).
//
//go:build ffmpeg
package webrtc
/*
#cgo pkg-config: libavcodec libavutil libswresample
#cgo CFLAGS: -Wno-deprecated-declarations
#include <libavcodec/avcodec.h>
#include <libavutil/channel_layout.h>
#include <libavutil/frame.h>
#include <libavutil/mem.h>
#include <libavutil/opt.h>
#include <libswresample/swresample.h>
#include <stdlib.h>
#include <string.h>
// ── Transcoder handle ───────────────────────────────────────────────────
typedef struct {
AVCodecContext *codec_ctx;
AVCodecParserContext *parser;
SwrContext *swr_ctx;
AVFrame *frame;
AVPacket *pkt;
int swr_initialized;
int in_sample_rate;
int in_channels;
} aac_transcoder_t;
// ── Create / Destroy ────────────────────────────────────────────────────
static aac_transcoder_t* aac_transcoder_create(void) {
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
if (!codec) return NULL;
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
if (!t) return NULL;
t->codec_ctx = avcodec_alloc_context3(codec);
if (!t->codec_ctx) { free(t); return NULL; }
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->parser = av_parser_init(AV_CODEC_ID_AAC);
if (!t->parser) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->frame = av_frame_alloc();
t->pkt = av_packet_alloc();
if (!t->frame || !t->pkt) {
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
av_parser_close(t->parser);
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
return t;
}
static void aac_transcoder_destroy(aac_transcoder_t *t) {
if (!t) return;
if (t->swr_ctx) swr_free(&t->swr_ctx);
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
if (t->parser) av_parser_close(t->parser);
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
free(t);
}
// ── Lazy resampler init (called after the first decoded frame) ──────────
static int aac_init_swr(aac_transcoder_t *t) {
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
if (in_ch_layout == 0)
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
t->swr_ctx = swr_alloc_set_opts(
NULL,
AV_CH_LAYOUT_MONO, // out: mono
AV_SAMPLE_FMT_S16, // out: signed 16-bit
8000, // out: 8 kHz
in_ch_layout, // in: from decoder
t->codec_ctx->sample_fmt, // in: from decoder
t->codec_ctx->sample_rate, // in: from decoder
0, NULL);
if (!t->swr_ctx) return -1;
if (swr_init(t->swr_ctx) < 0) {
swr_free(&t->swr_ctx);
return -1;
}
t->in_sample_rate = t->codec_ctx->sample_rate;
t->in_channels = t->codec_ctx->channels;
t->swr_initialized = 1;
return 0;
}
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
// Caller must free *out_pcm with av_free() when non-NULL.
static int aac_transcode_to_pcm(aac_transcoder_t *t,
const uint8_t *data, int data_size,
uint8_t **out_pcm, int *out_size) {
*out_pcm = NULL;
*out_size = 0;
if (!data || data_size <= 0) return 0;
int buf_cap = 8192;
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
if (!buf) return -1;
int buf_len = 0;
while (data_size > 0) {
uint8_t *pout = NULL;
int pout_size = 0;
int used = av_parser_parse2(t->parser, t->codec_ctx,
&pout, &pout_size,
data, data_size,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
if (used < 0) break;
data += used;
data_size -= used;
if (pout_size == 0) continue;
// Feed parsed frame to decoder
t->pkt->data = pout;
t->pkt->size = pout_size;
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
// Pull all decoded frames
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
if (!t->swr_initialized) {
if (aac_init_swr(t) < 0) {
av_frame_unref(t->frame);
av_free(buf);
return -1;
}
}
int out_samples = swr_get_out_samples(t->swr_ctx,
t->frame->nb_samples);
if (out_samples <= 0) out_samples = t->frame->nb_samples;
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
if (needed > buf_cap) {
buf_cap = needed * 2;
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
buf = tmp;
}
uint8_t *dst = buf + buf_len;
int converted = swr_convert(t->swr_ctx,
&dst, out_samples,
(const uint8_t**)t->frame->extended_data,
t->frame->nb_samples);
if (converted > 0)
buf_len += converted * 2;
av_frame_unref(t->frame);
}
}
if (buf_len == 0) {
av_free(buf);
return 0;
}
*out_pcm = buf;
*out_size = buf_len;
return 0;
}
*/
import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/zaf/g711"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is compiled in (requires the "ffmpeg" build tag).
func AACTranscodingAvailable() bool { return true }
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
// and encodes it as G.711 µ-law for WebRTC transport.
type AACTranscoder struct {
handle *C.aac_transcoder_t
}
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
func NewAACTranscoder() (*AACTranscoder, error) {
h := C.aac_transcoder_create()
if h == nil {
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
}
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
return &AACTranscoder{handle: h}, nil
}
// Transcode converts an ADTS buffer (one or more AAC frames) into
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || t.handle == nil || len(adtsData) == 0 {
return nil, nil
}
var outPCM *C.uint8_t
var outSize C.int
ret := C.aac_transcode_to_pcm(
t.handle,
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
C.int(len(adtsData)),
&outPCM, &outSize,
)
if ret < 0 {
return nil, errors.New("AAC decode/resample failed")
}
if outSize == 0 || outPCM == nil {
return nil, nil // decoder buffering, no output yet
}
defer C.av_free(unsafe.Pointer(outPCM))
// Copy S16LE PCM to Go slice, then encode to µ-law.
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
ulaw := g711.EncodeUlaw(pcm)
// Log resampler details once.
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
log.Log.Info(fmt.Sprintf(
"webrtc.aac_transcoder: first output resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
// Prevent repeated logging by zeroing the field we check.
t.handle.in_sample_rate = 0
}
return ulaw, nil
}
// Close releases all FFmpeg resources held by the transcoder.
func (t *AACTranscoder) Close() {
if t != nil && t.handle != nil {
C.aac_transcoder_destroy(t.handle)
t.handle = nil
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
}
}

View File

@@ -0,0 +1,205 @@
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
//
//go:build !ffmpeg
package webrtc
import (
"bytes"
"errors"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/kerberos-io/agent/machinery/src/log"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is available in the current runtime.
func AACTranscodingAvailable() bool {
_, err := exec.LookPath("ffmpeg")
return err == nil
}
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
type AACTranscoder struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderrBuf bytes.Buffer
mu sync.Mutex
outMu sync.Mutex
outBuf bytes.Buffer
closed bool
closeOnce sync.Once
}
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
func NewAACTranscoder() (*AACTranscoder, error) {
ffmpegPath, err := exec.LookPath("ffmpeg")
if err != nil {
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
}
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
cmd := exec.Command(
ffmpegPath,
"-hide_banner",
"-loglevel", "error",
"-fflags", "+nobuffer",
"-flags", "low_delay",
"-f", "aac",
"-i", "pipe:0",
"-vn",
"-ac", "1",
"-ar", "8000",
"-acodec", "pcm_mulaw",
"-f", "mulaw",
"pipe:1",
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stderr = &bytes.Buffer{}
if err := cmd.Start(); err != nil {
return nil, err
}
t := &AACTranscoder{
cmd: cmd,
stdin: stdin,
stdout: stdout,
}
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
t.stderrBuf = *stderrBuf
}
go func() {
buf := make([]byte, 4096)
for {
n, readErr := stdout.Read(buf)
if n > 0 {
t.outMu.Lock()
_, _ = t.outBuf.Write(buf[:n])
buffered := t.outBuf.Len()
t.outMu.Unlock()
if buffered <= 8192 || buffered%16000 == 0 {
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
}
}
if readErr != nil {
if readErr != io.EOF {
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
}
return
}
}
}()
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
return t, nil
}
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || len(adtsData) == 0 {
return nil, nil
}
t.mu.Lock()
defer t.mu.Unlock()
if t.closed {
return nil, errors.New("AAC transcoder is closed")
}
if _, err := t.stdin.Write(adtsData); err != nil {
return nil, err
}
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
}
deadline := time.Now().Add(75 * time.Millisecond)
for {
data := t.readAvailable()
if len(data) > 0 {
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
return data, nil
}
if time.Now().After(deadline) {
if stderr := t.stderrString(); stderr != "" {
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
} else {
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
}
return nil, nil
}
time.Sleep(5 * time.Millisecond)
}
}
func (t *AACTranscoder) readAvailable() []byte {
t.outMu.Lock()
defer t.outMu.Unlock()
if t.outBuf.Len() == 0 {
return nil
}
out := make([]byte, t.outBuf.Len())
copy(out, t.outBuf.Bytes())
t.outBuf.Reset()
return out
}
func (t *AACTranscoder) stderrString() string {
if t == nil {
return ""
}
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
return strings.TrimSpace(stderrBuf.String())
}
return strings.TrimSpace(t.stderrBuf.String())
}
// Close stops the ffmpeg subprocess.
func (t *AACTranscoder) Close() {
if t == nil {
return
}
t.closeOnce.Do(func() {
t.mu.Lock()
t.closed = true
if t.stdin != nil {
_ = t.stdin.Close()
}
t.mu.Unlock()
if t.stdout != nil {
_ = t.stdout.Close()
}
if t.cmd != nil {
_ = t.cmd.Process.Kill()
_, _ = t.cmd.Process.Wait()
if stderr := t.stderrString(); stderr != "" {
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
}
}
})
}

View File

@@ -0,0 +1,137 @@
package webrtc
import (
"io"
"sync"
"github.com/kerberos-io/agent/machinery/src/log"
pionWebRTC "github.com/pion/webrtc/v4"
pionMedia "github.com/pion/webrtc/v4/pkg/media"
)
const (
// peerSampleBuffer controls how many samples can be buffered per peer before
// dropping. Keeps slow peers from blocking the broadcaster.
peerSampleBuffer = 60
)
// peerTrack is a per-peer track with its own non-blocking sample channel.
type peerTrack struct {
track *pionWebRTC.TrackLocalStaticSample
samples chan pionMedia.Sample
done chan struct{}
}
// TrackBroadcaster fans out media samples to multiple peer-specific tracks
// without blocking. Each peer gets its own TrackLocalStaticSample and a
// goroutine that drains samples independently, so a slow/congested peer
// cannot stall the others.
type TrackBroadcaster struct {
mu sync.RWMutex
peers map[string]*peerTrack
mimeType string
id string
streamID string
}
// NewTrackBroadcaster creates a new broadcaster for either video or audio.
func NewTrackBroadcaster(mimeType string, id string, streamID string) *TrackBroadcaster {
return &TrackBroadcaster{
peers: make(map[string]*peerTrack),
mimeType: mimeType,
id: id,
streamID: streamID,
}
}
// AddPeer creates a new per-peer track and starts a writer goroutine.
// Returns the track to be added to the PeerConnection via AddTrack().
func (b *TrackBroadcaster) AddPeer(sessionKey string) (*pionWebRTC.TrackLocalStaticSample, error) {
track, err := pionWebRTC.NewTrackLocalStaticSample(
pionWebRTC.RTPCodecCapability{MimeType: b.mimeType},
b.id,
b.streamID,
)
if err != nil {
return nil, err
}
pt := &peerTrack{
track: track,
samples: make(chan pionMedia.Sample, peerSampleBuffer),
done: make(chan struct{}),
}
b.mu.Lock()
b.peers[sessionKey] = pt
b.mu.Unlock()
// Per-peer writer goroutine — drains samples independently.
go func() {
defer close(pt.done)
for sample := range pt.samples {
if err := pt.track.WriteSample(sample); err != nil {
if err == io.ErrClosedPipe {
return
}
log.Log.Error("webrtc.broadcaster.peerWriter(): error writing sample for " + sessionKey + ": " + err.Error())
}
}
}()
log.Log.Info("webrtc.broadcaster.AddPeer(): added peer track for " + sessionKey)
return track, nil
}
// RemovePeer stops the writer goroutine and removes the peer.
func (b *TrackBroadcaster) RemovePeer(sessionKey string) {
b.mu.Lock()
pt, exists := b.peers[sessionKey]
if exists {
delete(b.peers, sessionKey)
}
b.mu.Unlock()
if exists {
close(pt.samples)
<-pt.done // wait for writer goroutine to finish
log.Log.Info("webrtc.broadcaster.RemovePeer(): removed peer track for " + sessionKey)
}
}
// WriteSample fans out a sample to all connected peers without blocking.
// If a peer's buffer is full (slow consumer), the sample is dropped for
// that peer only — other peers are unaffected.
func (b *TrackBroadcaster) WriteSample(sample pionMedia.Sample) {
b.mu.RLock()
defer b.mu.RUnlock()
for sessionKey, pt := range b.peers {
select {
case pt.samples <- sample:
default:
log.Log.Warning("webrtc.broadcaster.WriteSample(): dropping sample for slow peer " + sessionKey)
}
}
}
// PeerCount returns the current number of connected peers.
func (b *TrackBroadcaster) PeerCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.peers)
}
// Close removes all peers and stops all writer goroutines.
func (b *TrackBroadcaster) Close() {
b.mu.Lock()
keys := make([]string, 0, len(b.peers))
for k := range b.peers {
keys = append(keys, k)
}
b.mu.Unlock()
for _, key := range keys {
b.RemovePeer(key)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -45,9 +45,9 @@
"crypto": false
},
"scripts": {
"start": "react-scripts start",
"build": "GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "react-scripts test",
"start": "DISABLE_ESLINT_PLUGIN=true react-scripts start",
"build": "DISABLE_ESLINT_PLUGIN=true GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "DISABLE_ESLINT_PLUGIN=true react-scripts test",
"eject": "react-scripts eject",
"lint": "eslint --debug 'src/**/*.{js,jsx,ts,tsx}'",
"format": "prettier --write \"**/*.{js,jsx,json,md}\""

View File

@@ -237,4 +237,4 @@
"remove_after_upload_enabled": "Enable delete on upload"
}
}
}
}

View File

@@ -194,7 +194,7 @@ export const getDashboardInformation = (onSuccess, onError) => {
};
};
export const getEvents = (eventfilter, onSuccess, onError) => {
export const getEvents = (eventfilter, onSuccess, onError, append = false) => {
return (dispatch) => {
doGetEvents(
eventfilter,
@@ -203,6 +203,7 @@ export const getEvents = (eventfilter, onSuccess, onError) => {
type: 'GET_EVENTS',
events: data.events,
filter: eventfilter,
append,
});
if (onSuccess) {
onSuccess();

View File

@@ -7,6 +7,7 @@ import './ImageCanvas.css';
class ImageCanvas extends React.Component {
componentDidMount() {
this.isUnmounted = false;
this.width = 0;
this.height = 0;
@@ -58,6 +59,9 @@ class ImageCanvas extends React.Component {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -71,6 +75,9 @@ class ImageCanvas extends React.Component {
componentDidUpdate() {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -82,11 +89,57 @@ class ImageCanvas extends React.Component {
});
}
componentWillUnmount() {
this.isUnmounted = true;
if (this.pendingImage) {
this.pendingImage.onload = null;
this.pendingImage.src = '';
this.pendingImage = null;
}
if (this.editor) {
this.editor.onSelectionEnd = null;
this.editor.onRegionMoveEnd = null;
this.editor.onRegionDelete = null;
if (this.editor.RM) {
this.editor.RM.deleteAllRegions();
}
if (typeof this.editor.dispose === 'function') {
this.editor.dispose();
} else if (typeof this.editor.destroy === 'function') {
this.editor.destroy();
}
this.editor = null;
}
if (this.toolbarContainer) {
this.toolbarContainer.innerHTML = '';
this.toolbarContainer = null;
}
if (this.editorContainer) {
this.editorContainer.innerHTML = '';
this.editorContainer = null;
}
}
loadData = (image) => {
if (!this.editor) {
return;
}
const w = image.width;
const h = image.height;
this.editor.addContentSource(image).then(() => {
if (this.isUnmounted || !this.editor) {
return;
}
// Add exisiting polygons
this.editor.RM.deleteAllRegions();
const { polygons } = this.props;
@@ -152,11 +205,19 @@ class ImageCanvas extends React.Component {
// eslint-disable-next-line class-methods-use-this
loadImage = (path, onready) => {
if (this.pendingImage) {
this.pendingImage.onload = null;
}
const image = new Image();
image.src = path;
image.addEventListener('load', (e) => {
this.pendingImage = image;
image.onload = (e) => {
if (this.pendingImage === image) {
this.pendingImage = null;
}
onready(e.target);
});
};
image.src = path;
};
// eslint-disable-next-line class-methods-use-this

View File

@@ -26,6 +26,23 @@ import {
import './Dashboard.scss';
import ReactTooltip from 'react-tooltip';
import config from '../../config';
import { getConfig } from '../../actions/agent';
function createUUID() {
if (
typeof window !== 'undefined' &&
window.crypto &&
typeof window.crypto.randomUUID === 'function'
) {
return window.crypto.randomUUID();
}
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (char) => {
const random = Math.floor(Math.random() * 16);
const value = char === 'x' ? random : 8 + Math.floor(random / 4);
return value.toString(16);
});
}
// eslint-disable-next-line react/prefer-stateless-function
class Dashboard extends React.Component {
@@ -33,43 +50,55 @@ class Dashboard extends React.Component {
super();
this.state = {
liveviewLoaded: false,
liveviewMode: 'webrtc',
open: false,
currentRecording: '',
initialised: false,
};
this.videoRef = React.createRef();
this.pendingRemoteCandidates = [];
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.initialiseSDLiveview = this.initialiseSDLiveview.bind(this);
this.startWebRTCLiveview = this.startWebRTCLiveview.bind(this);
this.handleWebRTCSignalMessage = this.handleWebRTCSignalMessage.bind(this);
this.fallbackToSDLiveview = this.fallbackToSDLiveview.bind(this);
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].addEventListener('load', () => {
this.setState({
liveviewLoaded: true,
});
});
}
const { dispatchGetConfig } = this.props;
dispatchGetConfig(() => this.initialiseLiveview());
this.initialiseLiveview();
}
componentDidUpdate() {
this.initialiseLiveview();
componentDidUpdate(prevProps) {
const { images, dashboard } = this.props;
const { liveviewLoaded, liveviewMode } = this.state;
const configLoaded = this.hasAgentConfig(this.props);
const prevConfigLoaded = this.hasAgentConfig(prevProps);
if (!prevConfigLoaded && configLoaded) {
this.initialiseLiveview();
}
if (
liveviewMode === 'sd' &&
!liveviewLoaded &&
prevProps.images !== images &&
images.length > 0
) {
this.setState({
liveviewLoaded: true,
});
}
if (!prevProps.dashboard.cameraOnline && dashboard.cameraOnline) {
this.initialiseLiveview();
}
}
componentWillUnmount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].remove();
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
}
const { dispatchSend } = this.props;
const message = {
message_type: 'stop-sd',
};
dispatchSend(message);
this.stopSDLiveview();
this.stopWebRTCLiveview();
}
handleClose() {
@@ -79,32 +108,378 @@ class Dashboard extends React.Component {
});
}
getCurrentTimestamp() {
return Math.round(Date.now() / 1000);
// eslint-disable-next-line react/sort-comp
hasAgentConfig(props) {
const currentProps = props || this.props;
const { config: configResponse } = currentProps;
return !!(configResponse && configResponse.config);
}
browserSupportsWebRTC() {
return (
typeof window !== 'undefined' &&
typeof window.RTCPeerConnection !== 'undefined'
);
}
buildPeerConnectionConfig() {
const { config: configResponse } = this.props;
const agentConfig =
configResponse && configResponse.config ? configResponse.config : {};
const iceServers = [];
if (agentConfig.stunuri) {
iceServers.push({
urls: [agentConfig.stunuri],
});
}
if (agentConfig.turnuri) {
const turnServer = {
urls: [agentConfig.turnuri],
};
if (agentConfig.turn_username) {
turnServer.username = agentConfig.turn_username;
}
if (agentConfig.turn_password) {
turnServer.credential = agentConfig.turn_password;
}
iceServers.push(turnServer);
}
return {
iceServers,
iceTransportPolicy: agentConfig.turn_force === 'true' ? 'relay' : 'all',
};
}
initialiseLiveview() {
const { initialised } = this.state;
if (!initialised) {
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
const { dashboard } = this.props;
if (initialised || !dashboard.cameraOnline) {
return;
}
if (!this.hasAgentConfig()) {
return;
}
if (this.browserSupportsWebRTC()) {
this.startWebRTCLiveview();
} else {
this.fallbackToSDLiveview('WebRTC is not supported in this browser.');
}
}
initialiseSDLiveview() {
if (this.requestStreamSubscription) {
return;
}
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
dispatchSend(message);
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
this.setState({
initialised: true,
});
stopSDLiveview() {
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
dispatchSend({
message_type: 'stop-sd',
});
}
stopWebRTCLiveview() {
if (this.webrtcTimeout) {
window.clearTimeout(this.webrtcTimeout);
this.webrtcTimeout = null;
}
if (this.webrtcSocket) {
this.webrtcSocket.onopen = null;
this.webrtcSocket.onmessage = null;
this.webrtcSocket.onerror = null;
this.webrtcSocket.onclose = null;
this.webrtcSocket.close();
this.webrtcSocket = null;
}
if (this.webrtcPeerConnection) {
this.webrtcPeerConnection.ontrack = null;
this.webrtcPeerConnection.onicecandidate = null;
this.webrtcPeerConnection.onconnectionstatechange = null;
this.webrtcPeerConnection.close();
this.webrtcPeerConnection = null;
}
this.pendingRemoteCandidates = [];
this.webrtcOfferStarted = false;
this.webrtcSessionId = null;
this.webrtcClientId = null;
if (this.videoRef.current) {
this.videoRef.current.srcObject = null;
}
}
sendWebRTCMessage(messageType, message = {}) {
if (!this.webrtcSocket || this.webrtcSocket.readyState !== WebSocket.OPEN) {
return;
}
this.webrtcSocket.send(
JSON.stringify({
client_id: this.webrtcClientId,
message_type: messageType,
message,
})
);
}
async handleWebRTCSignalMessage(event) {
let data;
try {
data = JSON.parse(event.data);
} catch (error) {
return;
}
const { message_type: messageType, message = {} } = data;
const { session_id: sessionID, sdp, candidate } = message;
if (messageType === 'hello-back') {
await this.beginWebRTCLiveview();
return;
}
if (sessionID && sessionID !== this.webrtcSessionId) {
return;
}
switch (messageType) {
case 'webrtc-answer':
try {
await this.webrtcPeerConnection.setRemoteDescription({
type: 'answer',
sdp: window.atob(sdp),
});
await this.flushPendingRemoteCandidates();
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC answer: ${error.message}`
);
}
break;
case 'webrtc-candidate': {
try {
const candidateInit = JSON.parse(candidate);
if (
this.webrtcPeerConnection.remoteDescription &&
this.webrtcPeerConnection.remoteDescription.type
) {
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} else {
this.pendingRemoteCandidates.push(candidateInit);
}
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC candidate: ${error.message}`
);
}
break;
}
case 'webrtc-error':
this.fallbackToSDLiveview(
message.message || 'The agent could not start the WebRTC liveview.'
);
break;
default:
break;
}
}
async beginWebRTCLiveview() {
if (!this.webrtcPeerConnection || this.webrtcOfferStarted) {
return;
}
try {
this.webrtcOfferStarted = true;
const offer = await this.webrtcPeerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
await this.webrtcPeerConnection.setLocalDescription(offer);
this.sendWebRTCMessage('stream-hd', {
session_id: this.webrtcSessionId,
sdp: window.btoa(this.webrtcPeerConnection.localDescription.sdp),
});
} catch (error) {
this.fallbackToSDLiveview(
`Unable to initialise WebRTC liveview: ${error.message}`,
);
}
}
async flushPendingRemoteCandidates() {
if (
!this.webrtcPeerConnection ||
!this.webrtcPeerConnection.remoteDescription
) {
return;
}
while (this.pendingRemoteCandidates.length > 0) {
const candidateInit = this.pendingRemoteCandidates.shift();
try {
// eslint-disable-next-line no-await-in-loop
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} catch (error) {
this.fallbackToSDLiveview(
`Unable to add remote ICE candidate: ${error.message}`,
);
return;
}
}
}
startWebRTCLiveview() {
if (this.webrtcPeerConnection || this.webrtcSocket) {
return;
}
this.stopSDLiveview();
this.webrtcClientId = createUUID();
this.webrtcSessionId = createUUID();
this.pendingRemoteCandidates = [];
this.webrtcPeerConnection = new window.RTCPeerConnection(
this.buildPeerConnectionConfig()
);
this.webrtcPeerConnection.ontrack = (event) => {
const [remoteStream] = event.streams;
if (this.videoRef.current && remoteStream) {
this.videoRef.current.srcObject = remoteStream;
const playPromise = this.videoRef.current.play();
if (playPromise && playPromise.catch) {
playPromise.catch(() => {});
}
}
this.setState({
liveviewLoaded: true,
});
};
this.webrtcPeerConnection.onicecandidate = (event) => {
if (!event.candidate) {
return;
}
this.sendWebRTCMessage('webrtc-candidate', {
session_id: this.webrtcSessionId,
candidate: JSON.stringify(event.candidate.toJSON()),
});
};
this.webrtcPeerConnection.onconnectionstatechange = () => {
const { connectionState } = this.webrtcPeerConnection;
if (connectionState === 'connected') {
this.setState({
liveviewLoaded: true,
});
}
if (
connectionState === 'failed' ||
connectionState === 'disconnected' ||
connectionState === 'closed'
) {
this.fallbackToSDLiveview(
`WebRTC connection ${connectionState}, falling back to SD liveview.`,
);
}
};
this.webrtcSocket = new WebSocket(config.WS_URL);
this.webrtcSocket.onopen = () => {
this.sendWebRTCMessage('hello', {});
};
this.webrtcSocket.onmessage = this.handleWebRTCSignalMessage;
this.webrtcSocket.onerror = () => {
this.fallbackToSDLiveview('Unable to open the WebRTC signaling channel.');
};
this.webrtcSocket.onclose = () => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview('WebRTC signaling channel closed early.');
}
};
this.webrtcTimeout = window.setTimeout(() => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview(
'WebRTC connection timed out, falling back to SD liveview.'
);
}
}, 10000);
this.setState({
initialised: true,
liveviewLoaded: false,
liveviewMode: 'webrtc',
});
}
fallbackToSDLiveview(errorMessage) {
const { liveviewMode } = this.state;
if (liveviewMode === 'sd' && this.requestStreamSubscription) {
return;
}
this.stopWebRTCLiveview();
if (errorMessage) {
// eslint-disable-next-line no-console
console.warn(errorMessage);
}
this.setState(
{
initialised: true,
liveviewLoaded: false,
liveviewMode: 'sd',
},
() => {
this.initialiseSDLiveview();
}
);
}
openModal(file) {
@@ -116,7 +491,8 @@ class Dashboard extends React.Component {
render() {
const { dashboard, t, images } = this.props;
const { liveviewLoaded, open, currentRecording } = this.state;
const { liveviewLoaded, liveviewMode, open, currentRecording } = this.state;
const listenerCount = dashboard.webrtcReaders ? dashboard.webrtcReaders : 0;
// We check if the camera was getting a valid frame
// during the last 5 seconds, otherwise we assume the camera is offline.
@@ -170,7 +546,6 @@ class Dashboard extends React.Component {
divider="0"
footer={t('dashboard.total_recordings')}
/>
<Link to="/settings">
<Card
title="IP Camera"
@@ -309,7 +684,9 @@ class Dashboard extends React.Component {
)}
</div>
<div>
<h2>{t('dashboard.live_view')}</h2>
<h2>
{t('dashboard.live_view')} ({listenerCount})
</h2>
{(!liveviewLoaded || !isCameraOnline) && (
<SetupBox
btnicon="preferences"
@@ -326,12 +703,16 @@ class Dashboard extends React.Component {
liveviewLoaded && isCameraOnline ? 'visible' : 'hidden',
}}
>
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
{liveviewMode === 'webrtc' ? (
<video ref={this.videoRef} autoPlay muted playsInline />
) : (
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
)}
</div>
</div>
</div>
@@ -343,20 +724,25 @@ class Dashboard extends React.Component {
const mapStateToProps = (state /* , ownProps */) => ({
dashboard: state.agent.dashboard,
config: state.agent.config,
connected: state.wss.connected,
images: state.wss.images,
});
const mapDispatchToProps = (dispatch) => ({
dispatchSend: (message) => dispatch(send(message)),
dispatchGetConfig: (onSuccess, onError) =>
dispatch(getConfig(onSuccess, onError)),
});
Dashboard.propTypes = {
dashboard: PropTypes.object.isRequired,
config: PropTypes.object.isRequired,
connected: PropTypes.bool.isRequired,
images: PropTypes.array.isRequired,
t: PropTypes.func.isRequired,
dispatchSend: PropTypes.func.isRequired,
dispatchGetConfig: PropTypes.func.isRequired,
};
export default withTranslation()(

View File

@@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
import { withTranslation } from 'react-i18next';
import {
Breadcrumb,
ControlBar,
VideoCard,
Button,
Modal,
@@ -16,14 +17,53 @@ import { getEvents } from '../../actions/agent';
import config from '../../config';
import './Media.scss';
function formatDateTimeLocal(date) {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hours = String(date.getHours()).padStart(2, '0');
const minutes = String(date.getMinutes()).padStart(2, '0');
return `${year}-${month}-${day}T${hours}:${minutes}`;
}
function getDefaultTimeWindow() {
const endDate = new Date();
const startDate = new Date(endDate.getTime() - 60 * 60 * 1000);
return {
startDateTime: formatDateTimeLocal(startDate),
endDateTime: formatDateTimeLocal(endDate),
timestamp_offset_start: Math.floor(startDate.getTime() / 1000),
timestamp_offset_end: Math.floor(endDate.getTime() / 1000) + 59,
};
}
function normalizeInputValue(valueOrEvent) {
if (valueOrEvent && valueOrEvent.target) {
return valueOrEvent.target.value;
}
return valueOrEvent;
}
// eslint-disable-next-line react/prefer-stateless-function
class Media extends React.Component {
constructor() {
super();
this.state = {
timestamp_offset_start: 0,
timestamp_offset_end: 0,
const defaultTimeWindow = getDefaultTimeWindow();
const initialFilter = {
timestamp_offset_start: defaultTimeWindow.timestamp_offset_start,
timestamp_offset_end: defaultTimeWindow.timestamp_offset_end,
number_of_elements: 12,
};
this.state = {
appliedFilter: initialFilter,
startDateTime: defaultTimeWindow.startDateTime,
endDateTime: defaultTimeWindow.endDateTime,
isScrolling: false,
open: false,
currentRecording: '',
@@ -32,7 +72,8 @@ class Media extends React.Component {
componentDidMount() {
const { dispatchGetEvents } = this.props;
dispatchGetEvents(this.state);
const { appliedFilter } = this.state;
dispatchGetEvents(appliedFilter);
document.addEventListener('scroll', this.trackScrolling);
}
@@ -49,29 +90,107 @@ class Media extends React.Component {
trackScrolling = () => {
const { events, dispatchGetEvents } = this.props;
const { isScrolling } = this.state;
const { isScrolling, appliedFilter } = this.state;
const wrappedElement = document.getElementById('loader');
if (!isScrolling && this.isBottom(wrappedElement)) {
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
this.setState({
if (!wrappedElement || isScrolling || !this.isBottom(wrappedElement)) {
return;
}
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
dispatchGetEvents(
{
...appliedFilter,
timestamp_offset_end: parseInt(lastElement.timestamp, 10),
});
dispatchGetEvents(this.state, () => {
},
() => {
setTimeout(() => {
this.setState({
isScrolling: false,
});
}, 1000);
});
}
},
() => {
this.setState({
isScrolling: false,
});
},
true
);
} else {
this.setState({
isScrolling: false,
});
}
};
buildEventFilter(startDateTime, endDateTime) {
const { appliedFilter } = this.state;
return {
timestamp_offset_start: this.getTimestampFromInput(
startDateTime,
'start'
),
timestamp_offset_end: this.getTimestampFromInput(endDateTime, 'end'),
number_of_elements: appliedFilter.number_of_elements,
};
}
handleDateFilterChange(field, value) {
const { dispatchGetEvents } = this.props;
const { startDateTime, endDateTime } = this.state;
const normalizedValue = normalizeInputValue(value);
const nextStartDateTime =
field === 'startDateTime' ? normalizedValue : startDateTime;
const nextEndDateTime =
field === 'endDateTime' ? normalizedValue : endDateTime;
const nextFilter = this.buildEventFilter(
nextStartDateTime,
nextEndDateTime
);
const shouldApplyFilter =
(nextStartDateTime === '' || nextStartDateTime.length === 16) &&
(nextEndDateTime === '' || nextEndDateTime.length === 16);
this.setState(
{
[field]: normalizedValue,
appliedFilter: shouldApplyFilter
? nextFilter
: this.state.appliedFilter,
isScrolling: false,
},
() => {
if (shouldApplyFilter) {
dispatchGetEvents(nextFilter);
}
}
);
}
getTimestampFromInput(value, boundary) {
if (!value) {
return 0;
}
const date = new Date(value);
if (Number.isNaN(date.getTime())) {
return 0;
}
const seconds = Math.floor(date.getTime() / 1000);
if (boundary === 'end') {
return seconds + 59;
}
return seconds;
}
isBottom(el) {
return el.getBoundingClientRect().bottom + 50 <= window.innerHeight;
}
@@ -85,7 +204,9 @@ class Media extends React.Component {
render() {
const { events, eventsLoaded, t } = this.props;
const { isScrolling, open, currentRecording } = this.state;
const { isScrolling, open, currentRecording, startDateTime, endDateTime } =
this.state;
return (
<div id="media">
<Breadcrumb
@@ -102,6 +223,37 @@ class Media extends React.Component {
</Link>
</Breadcrumb>
<div className="media-control-bar">
<ControlBar>
<div className="media-filters">
<div className="media-filters__field">
<label htmlFor="recordings-start-time">Start time</label>
<input
className="media-filters__input"
id="recordings-start-time"
type="datetime-local"
value={startDateTime}
onChange={(value) =>
this.handleDateFilterChange('startDateTime', value)
}
/>
</div>
<div className="media-filters__field">
<label htmlFor="recordings-end-time">End time</label>
<input
className="media-filters__input"
id="recordings-end-time"
type="datetime-local"
value={endDateTime}
onChange={(value) =>
this.handleDateFilterChange('endDateTime', value)
}
/>
</div>
</div>
</ControlBar>
</div>
<div className="stats grid-container --four-columns">
{events.map((event) => (
<div
@@ -123,6 +275,11 @@ class Media extends React.Component {
</div>
))}
</div>
{events.length === 0 && eventsLoaded === 0 && (
<div className="media-empty-state">
No recordings found in the selected time range.
</div>
)}
{open && (
<Modal>
<ModalHeader
@@ -182,13 +339,13 @@ const mapStateToProps = (state /* , ownProps */) => ({
});
const mapDispatchToProps = (dispatch) => ({
dispatchGetEvents: (eventFilter, success, error) =>
dispatch(getEvents(eventFilter, success, error)),
dispatchGetEvents: (eventFilter, success, error, append) =>
dispatch(getEvents(eventFilter, success, error, append)),
});
Media.propTypes = {
t: PropTypes.func.isRequired,
events: PropTypes.objectOf(PropTypes.object).isRequired,
events: PropTypes.arrayOf(PropTypes.object).isRequired,
eventsLoaded: PropTypes.number.isRequired,
dispatchGetEvents: PropTypes.func.isRequired,
};

View File

@@ -1,4 +1,103 @@
#media {
.media-control-bar {
.control-bar {
display: block;
padding: 0 var(--main-content-gutter);
}
.control-bar .filtering {
display: block;
}
.control-bar .filtering > * {
border-right: 0;
flex: 1 1 100% !important;
max-width: none;
width: 100%;
}
}
.media-filters {
align-items: stretch;
display: grid;
gap: 0;
grid-template-columns: repeat(2, minmax(0, 1fr));
min-width: 0;
width: 100%;
}
.media-filters__field {
border-right: 1px solid var(--bg-muted);
min-width: 0;
padding: 16px 24px;
label {
display: block;
font-size: 14px;
font-weight: 600;
margin-bottom: 8px;
white-space: nowrap;
}
}
.media-filters__field:first-child {
padding-left: 0;
}
.media-filters__input {
appearance: none;
background: var(--white);
border: 1px solid var(--grey-light);
border-radius: 8px;
box-sizing: border-box;
color: var(--black);
font-size: 16px;
min-height: 48px;
padding: 0 14px 0 0;
width: 100%;
}
.media-filters__input::-webkit-datetime-edit,
.media-filters__input::-webkit-datetime-edit-fields-wrapper {
padding: 0;
}
.media-filters__input:focus {
border-color: var(--oss);
outline: 0;
}
.media-filters__field:first-child .media-filters__input {
padding-left: 0;
}
.media-filters__field:last-child {
border-right: 0;
padding-right: 0;
}
@media (max-width: 700px) {
.media-filters {
grid-template-columns: 1fr;
}
.media-filters__field {
border-right: 0;
border-bottom: 1px solid var(--bg-muted);
padding-left: 0;
padding-right: 0;
}
.media-filters__field:last-child {
border-bottom: 0;
}
}
.media-empty-state {
margin: 24px 0;
opacity: 0.8;
text-align: center;
}
#loader {
display: flex;

View File

@@ -159,7 +159,10 @@ class Settings extends React.Component {
componentWillUnmount() {
document.removeEventListener('keydown', this.escFunction, false);
clearInterval(this.interval);
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {

View File

@@ -123,16 +123,12 @@ const agent = (
};
case 'GET_EVENTS':
const { timestamp_offset_end } = action.filter;
const { events } = action;
return {
...state,
eventsLoaded: events.length,
events:
timestamp_offset_end === 0
? [...events]
: [...state.events, ...events],
eventfilter: action.eventfilter,
events: action.append ? [...state.events, ...events] : [...events],
eventfilter: action.filter,
};
default: