Compare commits

...

51 Commits

Author SHA1 Message Date
Cédric Verstraeten
4fbee60e9f Merge pull request #261 from kerberos-io/feature/add-webrtc-aac-transcoder
feature/add-webrtc-aac-transcoder
2026-03-09 17:46:17 +01:00
Cédric Verstraeten
d6c25df280 Add missing imports for strconv and strings in AAC transcoder stub 2026-03-09 16:42:42 +00:00
Cédric Verstraeten
72a2d28e1e Update aac_transcoder_stub.go 2026-03-09 17:41:54 +01:00
Cédric Verstraeten
eb0972084f Implement AAC transcoding for WebRTC using FFmpeg; update Dockerfiles and launch configuration 2026-03-09 16:34:52 +00:00
Cédric Verstraeten
41a1d221fc Merge pull request #260 from kerberos-io/fix/set-clean-state
fix/set-clean-state
2026-03-09 16:56:36 +01:00
Cédric Verstraeten
eaacc93d2f Set MQTT clean session to true and disable resume subscriptions 2026-03-09 15:50:40 +00:00
Cédric Verstraeten
0e6a004c23 Merge pull request #259 from kerberos-io/fix/add-grace-period
feature/add-broadcasting-feature
2026-03-09 16:20:39 +01:00
Cédric Verstraeten
617f854534 Merge branch 'master' into fix/add-grace-period 2026-03-09 16:17:35 +01:00
Cédric Verstraeten
1bf8006055 Refactor WebRTC handling to use per-peer broadcasters for video and audio tracks 2026-03-09 15:12:01 +00:00
Cédric Verstraeten
ca0e426382 Add max signaling age constant and discard stale WebRTC messages 2026-03-09 14:50:00 +00:00
Cédric Verstraeten
726d0722d9 Merge pull request #258 from kerberos-io/fix/add-grace-period
fix/add-grace-period
2026-03-09 15:20:53 +01:00
Cédric Verstraeten
d8f320b040 Add disconnect grace period handling in WebRTC connection manager 2026-03-09 14:15:50 +00:00
Cédric Verstraeten
0131b87692 Merge pull request #257 from kerberos-io/security/middleware-exposure
security/middleware-exposure
2026-03-09 14:18:11 +01:00
Cédric Verstraeten
54e8198b65 Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-09 14:18:00 +01:00
Cédric Verstraeten
3bfb68f950 Update port configuration and secure routes with JWT authentication middleware 2026-03-09 12:42:05 +00:00
Cédric Verstraeten
c05e59c936 Merge pull request #255 from kerberos-io/feature/improve-mqtt-concurrency
feature/improve-mqtt-concurrency
2026-03-09 13:25:26 +01:00
Cédric Verstraeten
b42d63b668 Enhance WebRTC packet processing for improved latency handling and keyframe synchronization 2026-03-09 12:17:38 +00:00
Cédric Verstraeten
0ca007e424 Refactor session key usage in ConnectionManager and enhance candidate queuing 2026-03-09 12:09:22 +00:00
Cédric Verstraeten
229d085de7 Merge pull request #253 from kerberos-io/fix/mqtt-reconnection
fix/mqtt-reconnection
2026-03-09 12:43:42 +01:00
Cédric Verstraeten
30e2b8318d Refactor build workflow to support multi-architecture builds and enhance MQTT connection handling 2026-03-09 11:40:24 +00:00
Cédric Verstraeten
dbcf4e242c Enhance MQTT reconnection handling and improve WebRTC connection cleanup
- Enable automatic reconnection for MQTT with configurable intervals and timeouts.
- Add logging for connection loss and reconnection attempts.
- Refactor WebRTC connection cleanup to ensure proper resource management on disconnection.
- Improve event handling in ImageCanvas and Dashboard components for better performance and reliability.
2026-03-09 11:04:10 +00:00
Cédric Verstraeten
ccf4034cc8 Merge pull request #252 from kerberos-io/fix/close-mp4-after-started
fix/close-mp4-after-started
2026-03-03 15:21:12 +01:00
Cédric Verstraeten
a34836e8f4 Delay MP4 creation until the first keyframe is received to ensure valid recordings 2026-03-03 14:16:39 +00:00
Cédric Verstraeten
dd1464d1be Fix recording closure condition to ensure it only triggers after recording has started 2026-03-03 14:03:11 +00:00
Cédric Verstraeten
2c02e0aeb1 Merge pull request #250 from kerberos-io/fix/add-avc-description-fallback
fix/add-avc-description-fallback
2026-02-27 11:48:34 +01:00
cedricve
d5464362bb Add AVC descriptor fallback for SPS parse errors
When setting the AVC descriptor fails in MP4.Close(), attempt a fallback that constructs an AvcC/avc1 sample entry from available SPS/PPS NALUs. Adds github.com/Eyevinn/mp4ff/avc import and two helpers: addAVCDescriptorFallback (builds a visual sample entry, sets tkhd width/height if available, and inserts it into stsd) and buildAVCDecConfRecFromSPS (creates an avc.DecConfRec from SPS/PPS bytes by extracting profile/compat/level and filling defaults). Logs errors and warns when the fallback is used. This provides resilience against SPS parsing errors when writing the MP4 track descriptor.
2026-02-27 11:35:22 +01:00
Cédric Verstraeten
5bcefd0015 Merge pull request #249 from kerberos-io/feature/enhance-avc-hevc-ssp-nalus
feature/enhance-avc-hevc-ssp-nalus
2026-02-27 11:12:03 +01:00
cedricve
5bb9def42d Normalize and debug H264/H265 parameter sets
Replace direct sanitizeParameterSets usage with normalizeH264ParameterSets and normalizeH265ParameterSets in mp4.Close. The new functions split Annex-B blobs, strip start codes, detect NALU types (SPS/PPS for AVC; VPS/SPS/PPS for HEVC), aggregate distinct parameter sets and fall back to sanitizeParameterSets if none are found. Added splitParamSetNALUs and formatNaluDebug helpers and debug logging to output concise parameter-set summaries before setting AVC/HEVC descriptors. These changes improve handling of concatenated Annex-B parameter set blobs and make debugging parameter extraction easier.
2026-02-27 11:09:28 +01:00
Cédric Verstraeten
ff38ccbadf Merge pull request #248 from kerberos-io/fix/sanitize-parameter-sets
fix/sanitize-parameter-sets
2026-02-26 20:43:53 +01:00
cedricve
f64e899de9 Populate/sanitize NALUs and avoid empty MP4
Fill missing SPS/PPS/VPS from camera config before closing recordings and warn when parameter sets are incomplete (for both continuous and motion-detection flows). Sanitize parameter sets (remove Annex-B start codes and drop empty NALUs) before writing AVC/HEVC descriptors. Prevent creation of empty MP4 files by flushing/closing and removing files when no audio/video samples were added, and only add an audio track when audio samples exist.
2026-02-26 20:37:10 +01:00
Cédric Verstraeten
b8a81d18af Merge pull request #247 from kerberos-io/fix/ensure-stsd
fix/ensure-stsd
2026-02-26 17:13:45 +01:00
cedricve
8c2e3e4cdd Recover video parameter sets from Annex B NALUs
Add updateVideoParameterSetsFromAnnexB to parse Annex B NALUs and populate missing SPS/PPS/VPS for H.264/H.265 streams. Call this helper when adding video samples so in-band parameter sets can be recovered early. Also add error logging in Close() when setting AVC/HEVC descriptors fails. These changes improve robustness for streams that carry SPS/PPS/VPS inline.
2026-02-26 17:05:09 +01:00
Cédric Verstraeten
11c4ee518d Merge pull request #246 from kerberos-io/fix/handle-sps-pps-unknown-state
fix/handle-sps-pps-unknown-state
2026-02-26 16:24:54 +01:00
cedricve
51b9d76973 Improve SPS/PPS handling: add warnings for missing SPS/PPS during recording start 2026-02-26 15:24:34 +00:00
cedricve
f3c1cb9b82 Enhance SPS/PPS handling for main stream in gortsplib: add fallback for missing SDP 2026-02-26 15:21:54 +00:00
Cédric Verstraeten
a1368361e4 Merge pull request #242 from kerberos-io/fix/update-workflows-for-nightly-build
fix/update-workflows-for-nightly-build
2026-02-16 12:44:40 +01:00
Cédric Verstraeten
abfdea0179 Update issue-userstory-create.yml 2026-02-16 12:37:49 +01:00
Cédric Verstraeten
8aaeb62fa3 Merge pull request #241 from kerberos-io/fix/update-workflows-for-nightly-build
fix/update-workflows-for-nightly-build
2026-02-16 12:21:06 +01:00
Cédric Verstraeten
e30dd7d4a0 Add nightly build workflow for Docker images 2026-02-16 12:16:39 +01:00
Cédric Verstraeten
ac3f9aa4e8 Merge pull request #240 from kerberos-io/feature/add-issue-generator-workflow
feature/add-issue-generator-workflow
2026-02-16 11:58:06 +01:00
Cédric Verstraeten
04c568f488 Add workflow to create user story issues with customizable inputs 2026-02-16 11:54:07 +01:00
Cédric Verstraeten
e270223968 Merge pull request #238 from kerberos-io/fix/docker-build-release-action
fix/docker-build-release-action
2026-02-13 22:17:33 +01:00
cedricve
01ab1a9218 Disable build provenance in Docker builds
Add --provenance=false to docker build invocations in .github/workflows/release-create.yml (both default and arm64 steps) to suppress Docker provenance metadata during CI builds.
2026-02-13 22:16:23 +01:00
Cédric Verstraeten
6f0794b09c Merge pull request #237 from kerberos-io/feature/fix-quicktime-duration
feature/fix-quicktime-duration
2026-02-13 21:55:41 +01:00
cedricve
1ae6a46d88 Embed build version into binaries
Pass VERSION from CI into Docker builds and embed it into the Go binary via ldflags. Updated .github workflow to supply --build-arg VERSION for both architectures. Added ARG VERSION and logic in Dockerfile and Dockerfile.arm64 to derive the version from git (git describe --tags) or fall back to the provided build-arg, then set it with -X during go build. Changed VERSION in machinery/src/utils/main.go from a const to a var defaulting to "0.0.0" and documented that it is overridden at build time. This ensures released images contain the correct agent version while local/dev builds keep a sensible default.
2026-02-13 21:50:09 +01:00
cedricve
9d83cab5cc Set mdhd.Duration to 0 for fragmented MP4
Uncomment and explicitly set mdhd.Duration = 0 in machinery/src/video/mp4.go for relevant tracks (video H264/H265 and audio track). This ensures mdhd.Duration is zero for fragmented MP4 so players derive duration from fragments (avoiding QuickTime adding fragment durations and doubling the reported duration).
2026-02-13 21:46:32 +01:00
cedricve
6f559c2f00 Align MP4 headers to fragment durations
Compute actual video duration from SegmentDurations and ensure container headers reflect fragment durations. Set mvhd.Duration and mvex/mehd.FragmentDuration to the maximum of video (sum of segments) and audio durations so the overall mvhd matches the longest track. Use the summed segment duration for track tkhd.Duration and keep mdhd.Duration at 0 for fragmented MP4s (to avoid double-counting). Add a warning log when accumulated video duration differs from the recorded VideoTotalDuration. Harden fingerprint generation and private key handling with nil checks.

Add mp4_duration_test.go: unit test that creates a simulated H.264 fragmented MP4 (150 frames at 40ms), closes it, parses the output and verifies that mvhd/mehd and trun sample durations are consistent and that mdhd.Duration is zero.
2026-02-13 21:35:57 +01:00
cedricve
c147944f5a Convert MP4 timestamps to Mac HFS epoch
Add MacEpochOffset constant and convert mp4.StartTime to Mac HFS time for QuickTime compatibility. Compute macTime = mp4.StartTime + MacEpochOffset and use it for mvhd CreationTime/ModificationTime, as well as track tkhd and mdhd creation/modification timestamps for video and audio tracks. Also set mvhd Rate, Volume and NextTrackID. These changes ensure generated MP4s use QuickTime-compatible epoch and include proper mvhd metadata.
2026-02-13 21:01:45 +01:00
Cédric Verstraeten
e8ca776e4e Merge pull request #236 from kerberos-io/fix/debugging-lost-keyframes
fix/debugging-lost-keyframes
2026-02-11 16:51:07 +01:00
Cédric Verstraeten
de5c4b6e0a Merge branch 'master' into fix/debugging-lost-keyframes 2026-02-11 16:48:08 +01:00
Cédric Verstraeten
9ba64de090 add additional logging 2026-02-11 16:48:01 +01:00
23 changed files with 1999 additions and 417 deletions

View File

@@ -1,58 +0,0 @@
name: Docker development build
on:
push:
branches: [develop]
jobs:
build-amd64:
runs-on: ubuntu-latest
strategy:
matrix:
architecture: [amd64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: docker buildx imagetools create -t kerberos/agent-dev:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
- name: Create new and append to latest manifest
run: docker buildx imagetools create -t kerberos/agent-dev:latest kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
build-other:
runs-on: ubuntu-latest
strategy:
matrix:
#architecture: [arm64, arm/v7, arm/v6]
architecture: [arm64, arm/v7]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: docker buildx imagetools create --append -t kerberos/agent-dev:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
- name: Create new and append to manifest latest
run: docker buildx imagetools create --append -t kerberos/agent-dev:latest kerberos/agent-dev:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)

View File

@@ -0,0 +1,51 @@
name: Create User Story Issue
on:
workflow_dispatch:
inputs:
issue_title:
description: 'Title for the issue'
required: true
issue_description:
description: 'Brief description of the feature'
required: true
complexity:
description: 'Complexity of the feature'
required: true
type: choice
options:
- 'Low'
- 'Medium'
- 'High'
default: 'Medium'
duration:
description: 'Estimated duration'
required: true
type: choice
options:
- '1 day'
- '3 days'
- '1 week'
- '2 weeks'
- '1 month'
default: '1 week'
jobs:
create-issue:
runs-on: ubuntu-latest
permissions:
issues: write
steps:
- name: Create Issue with User Story
uses: cedricve/llm-create-issue-user-story@main
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
azure_openai_api_key: ${{ secrets.AZURE_OPENAI_API_KEY }}
azure_openai_endpoint: ${{ secrets.AZURE_OPENAI_ENDPOINT }}
azure_openai_version: ${{ secrets.AZURE_OPENAI_VERSION }}
openai_model: ${{ secrets.OPENAI_MODEL }}
issue_title: ${{ github.event.inputs.issue_title }}
issue_description: ${{ github.event.inputs.issue_description }}
complexity: ${{ github.event.inputs.complexity }}
duration: ${{ github.event.inputs.duration }}
labels: 'user-story,feature'
assignees: ${{ github.actor }}

View File

@@ -1,12 +1,14 @@
name: Docker nightly build
name: Nightly build
on:
# Triggers the workflow every day at 9PM (CET).
schedule:
- cron: "0 22 * * *"
# Allows manual triggering from the Actions tab.
workflow_dispatch:
jobs:
build-amd64:
nightly-build-amd64:
runs-on: ubuntu-latest
strategy:
matrix:
@@ -18,7 +20,9 @@ jobs:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
run: git clone https://github.com/kerberos-io/agent && cd agent
uses: actions/checkout@v4
with:
ref: master
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
@@ -26,10 +30,10 @@ jobs:
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: cd agent && docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: cd agent && docker buildx imagetools create -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
build-other:
run: docker buildx imagetools create -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
nightly-build-other:
runs-on: ubuntu-latest
strategy:
matrix:
@@ -41,7 +45,9 @@ jobs:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
run: git clone https://github.com/kerberos-io/agent && cd agent
uses: actions/checkout@v4
with:
ref: master
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
@@ -49,6 +55,6 @@ jobs:
- name: Available platforms
run: echo ${{ steps.buildx.outputs.platforms }}
- name: Run Buildx
run: cd agent && docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
run: docker buildx build --platform linux/${{matrix.architecture}} -t kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7) --push .
- name: Create new and append to manifest
run: cd agent && docker buildx imagetools create --append -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)
run: docker buildx imagetools create --append -t kerberos/agent-nightly:$(echo $GITHUB_SHA | cut -c1-7) kerberos/agent-nightly:arch-$(echo ${{matrix.architecture}} | tr / -)-$(echo $GITHUB_SHA | cut -c1-7)

View File

@@ -7,61 +7,34 @@ env:
REPO: kerberos/agent
jobs:
build-amd64:
runs-on: ubuntu-24.04
build:
runs-on: ${{ matrix.runner }}
permissions:
contents: write
strategy:
matrix:
architecture: [amd64]
include:
- architecture: amd64
runner: ubuntu-24.04
dockerfile: Dockerfile
- architecture: arm64
runner: ubuntu-24.04-arm
dockerfile: Dockerfile.arm64
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
- name: Strip binary
run: tar -cf agent-${{matrix.architecture}}.tar -C output-${{matrix.architecture}} . && rm -rf output-${{matrix.architecture}}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: agent-${{matrix.architecture}}.tar
path: agent-${{matrix.architecture}}.tar
build-arm64:
runs-on: ubuntu-24.04-arm
permissions:
contents: write
strategy:
matrix:
architecture: [arm64]
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Checkout
uses: actions/checkout@v3
- uses: benjlevesque/short-sha@v2.1
id: short-sha
with:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
docker build -t ${{ matrix.architecture }} -f ${{ matrix.dockerfile }} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}

View File

@@ -34,7 +34,7 @@ jobs:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} .
docker build --provenance=false --build-arg VERSION=${{github.event.inputs.tag || github.ref_name}} -t ${{matrix.architecture}} .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}
@@ -71,7 +71,7 @@ jobs:
length: 7
- name: Run Build
run: |
docker build -t ${{matrix.architecture}} -f Dockerfile.arm64 .
docker build --provenance=false --build-arg VERSION=${{github.event.inputs.tag || github.ref_name}} -t ${{matrix.architecture}} -f Dockerfile.arm64 .
CID=$(docker create ${{matrix.architecture}})
docker cp ${CID}:/home/agent ./output-${{matrix.architecture}}
docker rm ${CID}

3
.gitignore vendored
View File

@@ -14,4 +14,5 @@ machinery/test*
machinery/init-dev.sh
machinery/.env.local
machinery/vendor
deployments/docker/private-docker-compose.yaml
deployments/docker/private-docker-compose.yaml
video.mp4

View File

@@ -1,5 +1,6 @@
ARG BASE_IMAGE_VERSION=amd64-ddbe40e
ARG VERSION=0.0.0
FROM kerberos/base:${BASE_IMAGE_VERSION} AS build-machinery
LABEL AUTHOR=uug.ai
@@ -34,7 +35,8 @@ RUN cat /go/src/github.com/kerberos-io/agent/machinery/version
RUN cd /go/src/github.com/kerberos-io/agent/machinery && \
go mod download && \
go build -tags timetzdata,netgo,osusergo --ldflags '-s -w -extldflags "-static -latomic"' main.go && \
VERSION=$(cd /go/src/github.com/kerberos-io/agent && git describe --tags --always 2>/dev/null || echo "${VERSION}") && \
go build -tags timetzdata,netgo,osusergo --ldflags "-s -w -X github.com/kerberos-io/agent/machinery/src/utils.VERSION=${VERSION} -extldflags '-static -latomic'" main.go && \
mkdir -p /agent && \
mv main /agent && \
mv version /agent && \
@@ -93,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -1,5 +1,6 @@
ARG BASE_IMAGE_VERSION=arm64-ddbe40e
ARG VERSION=0.0.0
FROM kerberos/base:${BASE_IMAGE_VERSION} AS build-machinery
LABEL AUTHOR=uug.ai
@@ -34,7 +35,8 @@ RUN cat /go/src/github.com/kerberos-io/agent/machinery/version
RUN cd /go/src/github.com/kerberos-io/agent/machinery && \
go mod download && \
go build -tags timetzdata,netgo,osusergo --ldflags '-s -w -extldflags "-static -latomic"' main.go && \
VERSION=$(cd /go/src/github.com/kerberos-io/agent && git describe --tags --always 2>/dev/null || echo "${VERSION}") && \
go build -tags timetzdata,netgo,osusergo --ldflags "-s -w -X github.com/kerberos-io/agent/machinery/src/utils.VERSION=${VERSION} -extldflags '-static -latomic'" main.go && \
mkdir -p /agent && \
mv main /agent && \
mv version /agent && \
@@ -93,7 +95,7 @@ RUN addgroup -S kerberosio && adduser -S agent -G kerberosio && addgroup agent v
COPY --chown=0:0 --from=build-machinery /dist /
COPY --chown=0:0 --from=build-ui /dist /
RUN apk update && apk add ca-certificates curl libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
RUN apk update && apk add ca-certificates curl ffmpeg libstdc++ libc6-compat --no-cache && rm -rf /var/cache/apk/*
##################
# Try running agent

View File

@@ -695,14 +695,37 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
g.Streams[g.VideoH264Index].FPS = fps
log.Log.Debug(fmt.Sprintf("capture.golibrtsp.Start(%s): Final FPS=%.2f", streamType, fps))
g.VideoH264Forma.SPS = nalu
if streamType == "main" && len(nalu) > 0 {
// Fallback: store SPS from in-band NALUs when SDP was missing it.
configuration.Config.Capture.IPCamera.SPSNALUs = [][]byte{nalu}
}
}
case h264.NALUTypePPS:
g.VideoH264Forma.PPS = nalu
if streamType == "main" && len(nalu) > 0 {
// Fallback: store PPS from in-band NALUs when SDP was missing it.
configuration.Config.Capture.IPCamera.PPSNALUs = [][]byte{nalu}
}
}
filteredAU = append(filteredAU, nalu)
}
if idrPresent && streamType == "main" {
// Ensure config has parameter sets before recordings start.
if len(configuration.Config.Capture.IPCamera.SPSNALUs) == 0 && len(g.VideoH264Forma.SPS) > 0 {
configuration.Config.Capture.IPCamera.SPSNALUs = [][]byte{g.VideoH264Forma.SPS}
log.Log.Warning("capture.golibrtsp.Start(main): fallback SPS set from keyframe")
}
if len(configuration.Config.Capture.IPCamera.PPSNALUs) == 0 && len(g.VideoH264Forma.PPS) > 0 {
configuration.Config.Capture.IPCamera.PPSNALUs = [][]byte{g.VideoH264Forma.PPS}
log.Log.Warning("capture.golibrtsp.Start(main): fallback PPS set from keyframe")
}
if len(configuration.Config.Capture.IPCamera.SPSNALUs) == 0 || len(configuration.Config.Capture.IPCamera.PPSNALUs) == 0 {
log.Log.Warning("capture.golibrtsp.Start(main): SPS/PPS still missing after IDR keyframe")
}
}
if len(filteredAU) <= 1 || (!nonIDRPresent && !idrPresent) {
return
}

View File

@@ -159,6 +159,19 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
}
// Close mp4
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
}
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
}
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
}
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
log.Log.Warning("capture.main.HandleRecordStream(continuous): closing MP4 without full parameter sets, moov may be incomplete")
}
mp4Video.Close(&config)
log.Log.Info("capture.main.HandleRecordStream(continuous): recording finished: file save: " + name)
@@ -279,6 +292,9 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
ppsNALUS := configuration.Config.Capture.IPCamera.PPSNALUs
vpsNALUS := configuration.Config.Capture.IPCamera.VPSNALUs
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
log.Log.Warning("capture.main.HandleRecordStream(continuous): missing SPS/PPS at recording start")
}
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
@@ -499,21 +515,11 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
ppsNALUS := configuration.Config.Capture.IPCamera.PPSNALUs
vpsNALUS := configuration.Config.Capture.IPCamera.VPSNALUs
// Create a video file, and set the dimensions.
mp4Video := video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
if len(spsNALUS) == 0 || len(ppsNALUS) == 0 {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): missing SPS/PPS at recording start")
}
// Create the MP4 only once the first keyframe arrives.
var mp4Video *video.MP4
for cursorError == nil {
@@ -532,7 +538,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
default:
}
if (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
if start && (motionTimestamp+postRecording-now < 0 || now-startRecording > maxRecordingPeriod-500) && nextPkt.IsKeyFrame {
log.Log.Info("capture.main.HandleRecordStream(motiondetection): timestamp+postRecording-now < 0 - " + strconv.FormatInt(motionTimestamp+postRecording-now, 10) + " < 0")
log.Log.Info("capture.main.HandleRecordStream(motiondetection): now-startRecording > maxRecordingPeriod-500 - " + strconv.FormatInt(now-startRecording, 10) + " > " + strconv.FormatInt(maxRecordingPeriod-500, 10))
log.Log.Info("capture.main.HandleRecordStream(motiondetection): closing recording (timestamp: " + strconv.FormatInt(motionTimestamp, 10) + ", postRecording: " + strconv.FormatInt(postRecording, 10) + ", now: " + strconv.FormatInt(now, 10) + ", startRecording: " + strconv.FormatInt(startRecording, 10) + ", maxRecordingPeriod: " + strconv.FormatInt(maxRecordingPeriod, 10))
@@ -542,20 +548,44 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// We start the recording if we have a keyframe and the last duration is 0 or less than the current packet time.
// It could be start we start from the beginning of the recording.
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): write frames")
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): recording started on keyframe")
// Align duration timers with the first keyframe.
startRecording = pkt.CurrentTime
// Create a video file, and set the dimensions.
mp4Video = video.NewMP4(fullName, spsNALUS, ppsNALUS, vpsNALUS, configuration.Config.Capture.MaxLengthRecording)
mp4Video.SetWidth(width)
mp4Video.SetHeight(height)
if videoCodec == "H264" {
videoTrack = mp4Video.AddVideoTrack("H264")
} else if videoCodec == "H265" {
videoTrack = mp4Video.AddVideoTrack("H265")
}
if audioCodec == "AAC" {
audioTrack = mp4Video.AddAudioTrack("AAC")
} else if audioCodec == "PCM_MULAW" {
log.Log.Debug("capture.main.HandleRecordStream(continuous): no AAC audio codec detected, skipping audio track.")
}
start = true
}
if start {
pts := convertPTS(pkt.TimeLegacy)
if pkt.IsVideo {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add video sample")
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(videoTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.IsAudio {
log.Log.Debug("capture.main.HandleRecordStream(motiondetection): add audio sample")
if pkt.Codec == "AAC" {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
if mp4Video != nil {
if err := mp4Video.AddSampleToTrack(audioTrack, pkt.IsKeyFrame, pkt.Data, pts); err != nil {
log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error())
}
}
} else if pkt.Codec == "PCM_MULAW" {
// TODO: transcode to AAC, some work to do..
@@ -573,7 +603,25 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat
// This is used to determine if we need to start a new recording.
lastRecordingTime = pkt.CurrentTime
if mp4Video == nil {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): recording closed without keyframe; no MP4 created")
continue
}
// This will close the recording and write the last packet.
if len(mp4Video.SPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.SPSNALUs) > 0 {
mp4Video.SPSNALUs = configuration.Config.Capture.IPCamera.SPSNALUs
}
if len(mp4Video.PPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.PPSNALUs) > 0 {
mp4Video.PPSNALUs = configuration.Config.Capture.IPCamera.PPSNALUs
}
if len(mp4Video.VPSNALUs) == 0 && len(configuration.Config.Capture.IPCamera.VPSNALUs) > 0 {
mp4Video.VPSNALUs = configuration.Config.Capture.IPCamera.VPSNALUs
}
if (videoCodec == "H264" && (len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) ||
(videoCodec == "H265" && (len(mp4Video.VPSNALUs) == 0 || len(mp4Video.SPSNALUs) == 0 || len(mp4Video.PPSNALUs) == 0)) {
log.Log.Warning("capture.main.HandleRecordStream(motiondetection): closing MP4 without full parameter sets, moov may be incomplete")
}
mp4Video.Close(&config)
log.Log.Info("capture.main.HandleRecordStream(motiondetection): file save: " + name)

View File

@@ -800,17 +800,19 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {
// Should create a track here.
// Create per-peer broadcasters instead of shared tracks.
// Each viewer gets its own track with independent, non-blocking writes
// so a slow/congested peer cannot stall the others.
streams, _ := rtspClient.GetStreams()
videoTrack := webrtc.NewVideoTrack(streams)
audioTrack := webrtc.NewAudioTrack(streams)
videoBroadcaster := webrtc.NewVideoBroadcaster(streams)
audioBroadcaster := webrtc.NewAudioBroadcaster(streams)
if videoTrack == nil && audioTrack == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio tracks")
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("cloud.HandleLiveStreamHD(): failed to create both video and audio broadcasters")
return
}
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoTrack, audioTrack, rtspClient)
go webrtc.WriteToTrack(livestreamCursor, configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, rtspClient)
if config.Capture.ForwardWebRTC == "true" {
@@ -818,7 +820,7 @@ func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Info("cloud.HandleLiveStreamHD(): Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("cloud.HandleLiveStreamHD(): setting up a peer connection.")
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoBroadcaster, audioBroadcaster, handshake)
}
}

View File

@@ -20,96 +20,93 @@ func AddRoutes(r *gin.Engine, authMiddleware *jwt.GinJWTMiddleware, configDirect
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.GET("/config", func(c *gin.Context) {
r.GET("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
// This is legacy should be removed in future! Now everything
// lives under the /api prefix.
r.POST("/config", func(c *gin.Context) {
r.POST("/config", authMiddleware.MiddlewareFunc(), func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
api := r.Group("/api")
{
// Public endpoints (no authentication required)
api.POST("/login", authMiddleware.LoginHandler)
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods. Doesn't require any authorization.
// These are available for anyone, but require the agent, to reach
// the camera.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods. Doesn't require any authorization.
// Will verify the current onvif settings.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
// Secured endpoints..
// Apply JWT authentication middleware.
// All routes registered below this line require a valid JWT token.
api.Use(authMiddleware.MiddlewareFunc())
{
api.GET("/dashboard", func(c *gin.Context) {
components.GetDashboard(c, configDirectory, configuration, communication)
})
api.POST("/latest-events", func(c *gin.Context) {
components.GetLatestEvents(c, configDirectory, configuration, communication)
})
api.GET("/days", func(c *gin.Context) {
components.GetDays(c, configDirectory, configuration, communication)
})
api.GET("/config", func(c *gin.Context) {
components.GetConfig(c, captureDevice, configuration, communication)
})
api.POST("/config", func(c *gin.Context) {
components.UpdateConfig(c, configDirectory, configuration, communication)
})
// Will verify the hub settings.
api.POST("/hub/verify", func(c *gin.Context) {
cloud.VerifyHub(c)
})
// Will verify the persistence settings.
api.POST("/persistence/verify", func(c *gin.Context) {
cloud.VerifyPersistence(c, configDirectory)
})
// Will verify the secondary persistence settings.
api.POST("/persistence/secondary/verify", func(c *gin.Context) {
cloud.VerifySecondaryPersistence(c, configDirectory)
})
// Camera specific methods.
api.POST("/camera/restart", func(c *gin.Context) {
components.RestartAgent(c, communication)
})
api.POST("/camera/stop", func(c *gin.Context) {
components.StopAgent(c, communication)
})
api.POST("/camera/record", func(c *gin.Context) {
components.MakeRecording(c, communication)
})
api.GET("/camera/snapshot/jpeg", func(c *gin.Context) {
components.GetSnapshotRaw(c, captureDevice, configuration, communication)
})
api.GET("/camera/snapshot/base64", func(c *gin.Context) {
components.GetSnapshotBase64(c, captureDevice, configuration, communication)
})
// Onvif specific methods.
api.POST("/camera/onvif/verify", onvif.VerifyOnvifConnection)
api.POST("/camera/onvif/login", LoginToOnvif)
api.POST("/camera/onvif/capabilities", GetOnvifCapabilities)
api.POST("/camera/onvif/presets", GetOnvifPresets)
api.POST("/camera/onvif/gotopreset", GoToOnvifPreset)
api.POST("/camera/onvif/pantilt", DoOnvifPanTilt)
api.POST("/camera/onvif/zoom", DoOnvifZoom)
api.POST("/camera/onvif/inputs", DoGetDigitalInputs)
api.POST("/camera/onvif/outputs", DoGetRelayOutputs)
api.POST("/camera/onvif/outputs/:output", DoTriggerRelayOutput)
api.POST("/camera/verify/:streamType", capture.VerifyCamera)
}
}
return api

View File

@@ -91,9 +91,30 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(true)
//opts.SetResumeSubs(true)
//opts.SetStore(mqtt.NewMemoryStore())
opts.SetConnectRetry(true)
//opts.SetAutoReconnect(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetMaxReconnectInterval(1 * time.Minute)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetWriteTimeout(10 * time.Second)
opts.SetOrderMatters(false)
opts.SetConnectTimeout(30 * time.Second)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
if err != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error())
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost")
}
})
opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker")
})
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): MQTT session is online")
})
hubKey := ""
// This is the old way ;)
@@ -133,10 +154,14 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
}
}
mqc := mqtt.NewClient(opts)
if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) {
if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.ConfigureMQTT(): initial MQTT connection established")
}
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection")
}
return mqc
}
@@ -144,12 +169,18 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
return nil
}
// maxSignalingAge is the maximum age of a WebRTC signaling message (request-hd-stream,
// receive-hd-candidates) before it is considered stale and discarded. With CleanSession=false
// the MQTT broker may replay queued messages from previous sessions; this prevents the agent
// from setting up peer connections for viewers that are no longer waiting.
const maxSignalingAge = 30 * time.Second
func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory string, configuration *models.Configuration, communication *models.Communication) {
if hubKey == "" {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey)
mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
// Decode the message, we are expecting following format.
// {
@@ -249,6 +280,18 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
// We'll find out which message we received, and act accordingly.
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): received message with action: " + payload.Action)
// For time-sensitive WebRTC signaling messages, discard stale ones that may
// have been queued by the broker while CleanSession=false.
if payload.Action == "request-hd-stream" || payload.Action == "receive-hd-candidates" {
messageAge := time.Since(time.Unix(message.Timestamp, 0))
if messageAge > maxSignalingAge {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): discarding stale " + payload.Action +
" message (age: " + messageAge.Round(time.Second).String() + ")")
return
}
}
switch payload.Action {
case "record":
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
@@ -276,6 +319,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
}
})
if token.WaitTimeout(10 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener)
}
} else {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener)
}
}
}

View File

@@ -25,7 +25,10 @@ import (
"github.com/nfnt/resize"
)
const VERSION = "3.5.0"
// VERSION is the agent version. It defaults to "0.0.0" for local dev builds
// and is overridden at build time via:
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
var VERSION = "0.0.0"
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"

View File

@@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/Eyevinn/mp4ff/avc"
mp4ff "github.com/Eyevinn/mp4ff/mp4"
"github.com/kerberos-io/agent/machinery/src/encryption"
"github.com/kerberos-io/agent/machinery/src/log"
@@ -22,6 +23,10 @@ import (
var LastPTS uint64 = 0 // Last PTS for the current segment
// MacEpochOffset is the number of seconds between Mac HFS epoch (1904-01-01)
// and Unix epoch (1970-01-01). QuickTime requires timestamps in Mac HFS format.
const MacEpochOffset uint64 = 2082844800
// FragmentDurationMs is the target duration for each fragment in milliseconds.
// Fragments will be flushed at the first keyframe after this duration has elapsed,
// resulting in ~3 second fragments (assuming a typical GOP interval).
@@ -29,42 +34,46 @@ const FragmentDurationMs = 3000
type MP4 struct {
// FileName is the name of the file
FileName string
width int
height int
Segments []*mp4ff.MediaSegment // List of media segments
Segment *mp4ff.MediaSegment
MultiTrackFragment *mp4ff.Fragment
TrackIDs []uint32
FileWriter *os.File
Writer *bufio.Writer
SegmentCount int
SampleCount int
StartPTS uint64
VideoTotalDuration uint64
AudioTotalDuration uint64
AudioPTS uint64
Start bool
SPSNALUs [][]byte // SPS NALUs for H264
PPSNALUs [][]byte // PPS NALUs for H264
VPSNALUs [][]byte // VPS NALUs for H264
FreeBoxSize int64
FragmentStartRawPTS uint64 // Raw PTS for timing when to flush fragments
FragmentStartDTS uint64 // Accumulated VideoTotalDuration at fragment start (matches tfdt)
MoofBoxes int64 // Number of moof boxes in the file
MoofBoxSizes []int64 // Sizes of each moof box
SegmentDurations []uint64 // Duration of each segment in timescale units
SegmentBaseDecTimes []uint64 // Base decode time of each segment
StartTime uint64 // Start time of the MP4 file
VideoTrackName string // Name of the video track
VideoTrack int // Track ID for the video track
AudioTrackName string // Name of the audio track
AudioTrack int // Track ID for the audio track
VideoFullSample *mp4ff.FullSample // Full sample for video track
AudioFullSample *mp4ff.FullSample // Full sample for audio track
LastAudioSampleDTS uint64 // Last PTS for audio sample
LastVideoSampleDTS uint64 // Last PTS for video sample
SampleType string // Type of the sample (e.g., "video", "audio", "subtitle")
FileName string
width int
height int
Segments []*mp4ff.MediaSegment // List of media segments
Segment *mp4ff.MediaSegment
MultiTrackFragment *mp4ff.Fragment
TrackIDs []uint32
FileWriter *os.File
Writer *bufio.Writer
SegmentCount int
SampleCount int
StartPTS uint64
VideoTotalDuration uint64
AudioTotalDuration uint64
AudioPTS uint64
Start bool
SPSNALUs [][]byte // SPS NALUs for H264
PPSNALUs [][]byte // PPS NALUs for H264
VPSNALUs [][]byte // VPS NALUs for H264
FreeBoxSize int64
FragmentStartRawPTS uint64 // Raw PTS for timing when to flush fragments
FragmentStartDTS uint64 // Accumulated VideoTotalDuration at fragment start (matches tfdt)
MoofBoxes int64 // Number of moof boxes in the file
MoofBoxSizes []int64 // Sizes of each moof box
SegmentDurations []uint64 // Duration of each segment in timescale units
SegmentBaseDecTimes []uint64 // Base decode time of each segment
StartTime uint64 // Start time of the MP4 file
VideoTrackName string // Name of the video track
VideoTrack int // Track ID for the video track
AudioTrackName string // Name of the audio track
AudioTrack int // Track ID for the audio track
VideoFullSample *mp4ff.FullSample // Full sample for video track
AudioFullSample *mp4ff.FullSample // Full sample for audio track
LastAudioSampleDTS uint64 // Last PTS for audio sample
LastVideoSampleDTS uint64 // Last PTS for video sample
SampleType string // Type of the sample (e.g., "video", "audio", "subtitle")
TotalKeyframesReceived int // Total keyframes received by AddSampleToTrack
TotalKeyframesWritten int // Total keyframes written to trun boxes
FragmentKeyframeCount int // Keyframes in the current fragment
PendingSampleIsKeyframe bool // Whether the pending video sample is a keyframe
}
// NewMP4 creates a new MP4 object.
@@ -150,6 +159,68 @@ func (mp4 *MP4) AddAudioTrack(codec string) uint32 {
func (mp4 *MP4) AddMediaSegment(segNr int) {
}
// updateVideoParameterSetsFromAnnexB inspects Annex B data to fill missing SPS/PPS/VPS.
func (mp4 *MP4) updateVideoParameterSetsFromAnnexB(data []byte) {
if len(data) == 0 {
return
}
needSPS := len(mp4.SPSNALUs) == 0
needPPS := len(mp4.PPSNALUs) == 0
needVPS := len(mp4.VPSNALUs) == 0
if !(needSPS || needPPS || needVPS) {
return
}
for _, nalu := range splitNALUs(data) {
nalu = removeAnnexBStartCode(nalu)
if len(nalu) == 0 {
continue
}
switch mp4.VideoTrackName {
case "H264", "AVC1":
nalType := nalu[0] & 0x1F
switch nalType {
case 7: // SPS
if needSPS {
mp4.SPSNALUs = [][]byte{nalu}
needSPS = false
log.Log.Warning("mp4.updateVideoParameterSetsFromAnnexB(): SPS recovered from in-band NALU")
}
case 8: // PPS
if needPPS {
mp4.PPSNALUs = [][]byte{nalu}
needPPS = false
log.Log.Warning("mp4.updateVideoParameterSetsFromAnnexB(): PPS recovered from in-band NALU")
}
}
case "H265", "HVC1":
nalType := (nalu[0] >> 1) & 0x3F
switch nalType {
case 32: // VPS
if needVPS {
mp4.VPSNALUs = [][]byte{nalu}
needVPS = false
log.Log.Warning("mp4.updateVideoParameterSetsFromAnnexB(): VPS recovered from in-band NALU")
}
case 33: // SPS
if needSPS {
mp4.SPSNALUs = [][]byte{nalu}
needSPS = false
log.Log.Warning("mp4.updateVideoParameterSetsFromAnnexB(): SPS recovered from in-band NALU")
}
case 34: // PPS
if needPPS {
mp4.PPSNALUs = [][]byte{nalu}
needPPS = false
log.Log.Warning("mp4.updateVideoParameterSetsFromAnnexB(): PPS recovered from in-band NALU")
}
}
}
}
}
// flushPendingVideoSample writes the pending video sample to the current fragment.
// If nextPTS is provided (non-zero), it calculates duration from the PTS difference.
// If nextPTS is 0 (e.g., at Close time), it uses the last known duration.
@@ -178,20 +249,33 @@ func (mp4 *MP4) flushPendingVideoSample(nextPTS uint64) bool {
mp4.VideoFullSample.DecodeTime = mp4.VideoTotalDuration - duration
mp4.VideoFullSample.Sample.Dur = uint32(duration)
isKF := mp4.PendingSampleIsKeyframe
err := mp4.MultiTrackFragment.AddFullSampleToTrack(*mp4.VideoFullSample, uint32(mp4.VideoTrack))
if err != nil {
log.Log.Error("mp4.flushPendingVideoSample(): error adding sample: " + err.Error())
}
if isKF {
mp4.TotalKeyframesWritten++
mp4.FragmentKeyframeCount++
log.Log.Debug(fmt.Sprintf("mp4.flushPendingVideoSample(): KEYFRAME WRITTEN to trun - totalWritten=%d, fragmentKF=%d, flags=0x%08x, dur=%d, DTS=%d",
mp4.TotalKeyframesWritten, mp4.FragmentKeyframeCount, mp4.VideoFullSample.Sample.Flags, duration, mp4.VideoFullSample.DecodeTime))
}
mp4.VideoFullSample = nil
mp4.PendingSampleIsKeyframe = false
return true
}
func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, pts uint64) error {
if isKeyframe && trackID == uint32(mp4.VideoTrack) {
log.Log.Debug(fmt.Sprintf("mp4.AddSampleToTrack(): KEYFRAME received - track=%d, PTS=%d, size=%d, sampleCount=%d",
trackID, pts, len(data), mp4.SampleCount))
mp4.TotalKeyframesReceived++
elapsedDbg := uint64(0)
if mp4.Start {
elapsedDbg = pts - mp4.FragmentStartRawPTS
}
log.Log.Debug(fmt.Sprintf("mp4.AddSampleToTrack(): KEYFRAME #%d received - PTS=%d, size=%d, elapsed=%dms, started=%t, segment=%d, fragKF=%d",
mp4.TotalKeyframesReceived, pts, len(data), elapsedDbg, mp4.Start, mp4.SegmentCount, mp4.FragmentKeyframeCount))
}
if isKeyframe {
@@ -215,6 +299,8 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
mp4.flushPendingVideoSample(pts)
}
log.Log.Debug(fmt.Sprintf("mp4.AddSampleToTrack(): FLUSHING segment #%d - keyframes_in_fragment=%d, totalKF_received=%d, totalKF_written=%d",
mp4.SegmentCount, mp4.FragmentKeyframeCount, mp4.TotalKeyframesReceived, mp4.TotalKeyframesWritten))
mp4.MoofBoxes = mp4.MoofBoxes + 1
mp4.MoofBoxSizes = append(mp4.MoofBoxSizes, int64(mp4.Segment.Size()))
// Track the segment's duration and base decode time for sidx.
@@ -253,12 +339,14 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
mp4.StartPTS = pts
mp4.FragmentStartRawPTS = pts
mp4.FragmentStartDTS = mp4.VideoTotalDuration
mp4.FragmentKeyframeCount = 0 // Reset keyframe counter for new fragment
}
}
if mp4.Start {
if trackID == uint32(mp4.VideoTrack) {
mp4.updateVideoParameterSetsFromAnnexB(data)
var lengthPrefixed []byte
var err error
@@ -290,6 +378,7 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
CompositionTimeOffset: 0, // No composition time offset for video
}
mp4.VideoFullSample = &fullSample
mp4.PendingSampleIsKeyframe = isKeyframe
mp4.SampleType = "video"
}
} else if trackID == uint32(mp4.AudioTrack) {
@@ -339,8 +428,16 @@ func (mp4 *MP4) AddSampleToTrack(trackID uint32, isKeyframe bool, data []byte, p
func (mp4 *MP4) Close(config *models.Config) {
log.Log.Info(fmt.Sprintf("mp4.Close(): KEYFRAME SUMMARY - totalReceived=%d, totalWritten=%d, segments=%d, lastFragmentKF=%d",
mp4.TotalKeyframesReceived, mp4.TotalKeyframesWritten, mp4.SegmentCount, mp4.FragmentKeyframeCount))
if mp4.VideoTotalDuration == 0 && mp4.AudioTotalDuration == 0 {
log.Log.Error("mp4.Close(): no video or audio samples added, cannot create MP4 file")
log.Log.Error("mp4.Close(): no video or audio samples added, removing empty MP4 file")
mp4.Writer.Flush()
_ = mp4.FileWriter.Sync()
_ = mp4.FileWriter.Close()
_ = os.Remove(mp4.FileName)
return
}
// Add final pending samples before closing
@@ -412,22 +509,50 @@ func (mp4 *MP4) Close(config *models.Config) {
moov := mp4ff.NewMoovBox()
init.AddChild(moov)
// Set the creation time and modification time for the moov box
// Compute the actual video duration by summing segment durations.
// This must exactly match the sum of sample durations in the trun boxes
// that were written to the file, ensuring QuickTime (which strictly trusts
// header durations) displays the correct value.
var actualVideoDuration uint64
for _, d := range mp4.SegmentDurations {
actualVideoDuration += d
}
if actualVideoDuration != mp4.VideoTotalDuration {
log.Log.Warning(fmt.Sprintf("mp4.Close(): duration mismatch: accumulated VideoTotalDuration=%d, sum of segment durations=%d (diff=%d ms)",
mp4.VideoTotalDuration, actualVideoDuration, int64(mp4.VideoTotalDuration)-int64(actualVideoDuration)))
}
// Set the creation time and modification time for the moov box.
// QuickTime requires timestamps in Mac HFS format (seconds since 1904-01-01),
// so we convert from Unix epoch by adding MacEpochOffset.
videoTimescale := uint32(1000)
audioTimescale := uint32(1000)
macTime := mp4.StartTime + MacEpochOffset
nextTrackID := uint32(len(mp4.TrackIDs) + 1)
// mvhd.Duration must be the duration of the longest track.
// Start with video; if audio is longer, we update below.
movDuration := actualVideoDuration
if mp4.AudioTotalDuration > movDuration {
movDuration = mp4.AudioTotalDuration
}
mvhd := &mp4ff.MvhdBox{
Version: 0,
Flags: 0,
CreationTime: mp4.StartTime,
ModificationTime: mp4.StartTime,
CreationTime: macTime,
ModificationTime: macTime,
Timescale: videoTimescale,
Duration: mp4.VideoTotalDuration,
Duration: movDuration,
Rate: 0x00010000, // 1.0 playback speed (16.16 fixed point)
Volume: 0x0100, // 1.0 full volume (8.8 fixed point)
NextTrackID: nextTrackID,
}
init.Moov.AddChild(mvhd)
// Set the total duration in the moov box
mvex := mp4ff.NewMvexBox()
mvex.AddChild(&mp4ff.MehdBox{FragmentDuration: int64(mp4.VideoTotalDuration)})
mvex.AddChild(&mp4ff.MehdBox{FragmentDuration: int64(movDuration)})
init.Moov.AddChild(mvex)
// Add a track for the video
@@ -435,29 +560,52 @@ func (mp4 *MP4) Close(config *models.Config) {
case "H264", "AVC1":
init.AddEmptyTrack(videoTimescale, "video", "und")
includePS := true
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", mp4.SPSNALUs, mp4.PPSNALUs, includePS)
spsNALUs, ppsNALUs := normalizeH264ParameterSets(mp4.SPSNALUs, mp4.PPSNALUs)
log.Log.Debug("mp4.Close(): AVC parameter sets: SPS=" + formatNaluDebug(spsNALUs) + ", PPS=" + formatNaluDebug(ppsNALUs))
err := init.Moov.Traks[0].SetAVCDescriptor("avc1", spsNALUs, ppsNALUs, includePS)
if err != nil {
log.Log.Error("mp4.Close(): error setting AVC descriptor: " + err.Error())
if fallbackErr := addAVCDescriptorFallback(init.Moov.Traks[0], spsNALUs, ppsNALUs, uint16(mp4.width), uint16(mp4.height)); fallbackErr != nil {
log.Log.Error("mp4.Close(): error setting AVC descriptor fallback: " + fallbackErr.Error())
} else {
log.Log.Warning("mp4.Close(): AVC descriptor fallback used due to SPS parse error")
}
}
init.Moov.Traks[0].Tkhd.Duration = mp4.VideoTotalDuration
init.Moov.Traks[0].Tkhd.Duration = actualVideoDuration
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
init.Moov.Traks[0].Tkhd.Height = mp4ff.Fixed32(uint32(mp4.height) << 16)
init.Moov.Traks[0].Tkhd.CreationTime = macTime
init.Moov.Traks[0].Tkhd.ModificationTime = macTime
init.Moov.Traks[0].Mdia.Hdlr.Name = "agent " + utils.VERSION
init.Moov.Traks[0].Mdia.Mdhd.Duration = mp4.VideoTotalDuration
// mdhd.Duration MUST be 0 for fragmented MP4. QuickTime adds mdhd.Duration
// to the fragment durations (mehd/sidx), so setting it non-zero doubles the
// reported duration. Leave it at 0 so the player derives duration from fragments.
init.Moov.Traks[0].Mdia.Mdhd.Duration = 0
init.Moov.Traks[0].Mdia.Mdhd.CreationTime = macTime
init.Moov.Traks[0].Mdia.Mdhd.ModificationTime = macTime
case "H265", "HVC1":
init.AddEmptyTrack(videoTimescale, "video", "und")
includePS := true
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs, [][]byte{}, includePS)
vpsNALUs, spsNALUs, ppsNALUs := normalizeH265ParameterSets(mp4.VPSNALUs, mp4.SPSNALUs, mp4.PPSNALUs)
log.Log.Debug("mp4.Close(): HEVC parameter sets: VPS=" + formatNaluDebug(vpsNALUs) + ", SPS=" + formatNaluDebug(spsNALUs) + ", PPS=" + formatNaluDebug(ppsNALUs))
err := init.Moov.Traks[0].SetHEVCDescriptor("hvc1", vpsNALUs, spsNALUs, ppsNALUs, [][]byte{}, includePS)
if err != nil {
log.Log.Error("mp4.Close(): error setting HEVC descriptor: " + err.Error())
}
init.Moov.Traks[0].Tkhd.Duration = mp4.VideoTotalDuration
init.Moov.Traks[0].Tkhd.Duration = actualVideoDuration
init.Moov.Traks[0].Tkhd.Width = mp4ff.Fixed32(uint32(mp4.width) << 16)
init.Moov.Traks[0].Tkhd.Height = mp4ff.Fixed32(uint32(mp4.height) << 16)
init.Moov.Traks[0].Tkhd.CreationTime = macTime
init.Moov.Traks[0].Tkhd.ModificationTime = macTime
init.Moov.Traks[0].Mdia.Hdlr.Name = "agent " + utils.VERSION
init.Moov.Traks[0].Mdia.Mdhd.Duration = mp4.VideoTotalDuration
// mdhd.Duration MUST be 0 for fragmented MP4 (see H264 case above).
init.Moov.Traks[0].Mdia.Mdhd.Duration = 0
init.Moov.Traks[0].Mdia.Mdhd.CreationTime = macTime
init.Moov.Traks[0].Mdia.Mdhd.ModificationTime = macTime
}
// Try adding audio track if available
if mp4.AudioTrackName == "AAC" || mp4.AudioTrackName == "MP4A" {
// Try adding audio track if available and samples were recorded.
if (mp4.AudioTrackName == "AAC" || mp4.AudioTrackName == "MP4A") && mp4.AudioTotalDuration > 0 {
// Add an audio track to the moov box
init.AddEmptyTrack(audioTimescale, "audio", "und")
@@ -471,8 +619,13 @@ func (mp4 *MP4) Close(config *models.Config) {
if err != nil {
}
init.Moov.Traks[1].Tkhd.Duration = mp4.AudioTotalDuration
init.Moov.Traks[1].Tkhd.CreationTime = macTime
init.Moov.Traks[1].Tkhd.ModificationTime = macTime
init.Moov.Traks[1].Mdia.Hdlr.Name = "agent " + utils.VERSION
init.Moov.Traks[1].Mdia.Mdhd.Duration = mp4.AudioTotalDuration
// mdhd.Duration MUST be 0 for fragmented MP4 (see video track comment).
init.Moov.Traks[1].Mdia.Mdhd.Duration = 0
init.Moov.Traks[1].Mdia.Mdhd.CreationTime = macTime
init.Moov.Traks[1].Mdia.Mdhd.ModificationTime = macTime
}
// Try adding subtitle track if available
@@ -503,9 +656,11 @@ func (mp4 *MP4) Close(config *models.Config) {
// and encrypted with the public key.
fingerprint := fmt.Sprintf("%d", init.Moov.Mvhd.CreationTime) + "_" +
fmt.Sprintf("%d", init.Moov.Mvhd.Duration) + "_" +
init.Moov.Trak.Mdia.Hdlr.Name + "_" +
fmt.Sprintf("%d", mp4.MoofBoxes) + "_" // Number of moof boxes
fmt.Sprintf("%d", init.Moov.Mvhd.Duration) + "_"
if init.Moov.Trak != nil {
fingerprint += init.Moov.Trak.Mdia.Hdlr.Name + "_"
}
fingerprint += fmt.Sprintf("%d", mp4.MoofBoxes) + "_" // Number of moof boxes
for i, size := range mp4.MoofBoxSizes {
fingerprint += fmt.Sprintf("%d", size)
@@ -519,7 +674,10 @@ func (mp4 *MP4) Close(config *models.Config) {
}
// Load the private key from the configuration
privateKey := config.Signing.PrivateKey
var privateKey string
if config.Signing != nil {
privateKey = config.Signing.PrivateKey
}
r := strings.NewReader(privateKey)
pemBytes, _ := ioutil.ReadAll(r)
block, _ := pem.Decode(pemBytes)
@@ -685,6 +843,172 @@ func removeAnnexBStartCode(nalu []byte) []byte {
return nalu
}
// sanitizeParameterSets removes Annex B start codes and drops empty NALUs.
func sanitizeParameterSets(nalus [][]byte) [][]byte {
if len(nalus) == 0 {
return nalus
}
clean := make([][]byte, 0, len(nalus))
for _, nalu := range nalus {
trimmed := removeAnnexBStartCode(nalu)
if len(trimmed) == 0 {
continue
}
clean = append(clean, trimmed)
}
return clean
}
// normalizeH264ParameterSets splits Annex B blobs and extracts SPS/PPS NALUs.
func normalizeH264ParameterSets(spsIn [][]byte, ppsIn [][]byte) ([][]byte, [][]byte) {
all := make([][]byte, 0, len(spsIn)+len(ppsIn))
all = append(all, spsIn...)
all = append(all, ppsIn...)
var spsOut [][]byte
var ppsOut [][]byte
for _, blob := range all {
for _, nalu := range splitParamSetNALUs(blob) {
nalu = removeAnnexBStartCode(nalu)
if len(nalu) == 0 {
continue
}
typ := nalu[0] & 0x1F
switch typ {
case 7:
spsOut = append(spsOut, nalu)
case 8:
ppsOut = append(ppsOut, nalu)
}
}
}
if len(spsOut) == 0 {
spsOut = sanitizeParameterSets(spsIn)
}
if len(ppsOut) == 0 {
ppsOut = sanitizeParameterSets(ppsIn)
}
return spsOut, ppsOut
}
// normalizeH265ParameterSets splits Annex B blobs and extracts VPS/SPS/PPS NALUs.
func normalizeH265ParameterSets(vpsIn [][]byte, spsIn [][]byte, ppsIn [][]byte) ([][]byte, [][]byte, [][]byte) {
all := make([][]byte, 0, len(vpsIn)+len(spsIn)+len(ppsIn))
all = append(all, vpsIn...)
all = append(all, spsIn...)
all = append(all, ppsIn...)
var vpsOut [][]byte
var spsOut [][]byte
var ppsOut [][]byte
for _, blob := range all {
for _, nalu := range splitParamSetNALUs(blob) {
nalu = removeAnnexBStartCode(nalu)
if len(nalu) == 0 {
continue
}
typ := (nalu[0] >> 1) & 0x3F
switch typ {
case 32:
vpsOut = append(vpsOut, nalu)
case 33:
spsOut = append(spsOut, nalu)
case 34:
ppsOut = append(ppsOut, nalu)
}
}
}
if len(vpsOut) == 0 {
vpsOut = sanitizeParameterSets(vpsIn)
}
if len(spsOut) == 0 {
spsOut = sanitizeParameterSets(spsIn)
}
if len(ppsOut) == 0 {
ppsOut = sanitizeParameterSets(ppsIn)
}
return vpsOut, spsOut, ppsOut
}
// splitParamSetNALUs splits Annex B parameter set blobs; raw NALUs are returned as-is.
func splitParamSetNALUs(blob []byte) [][]byte {
if len(blob) == 0 {
return nil
}
if findStartCode(blob, 0) >= 0 {
return splitNALUs(blob)
}
return [][]byte{blob}
}
func formatNaluDebug(nalus [][]byte) string {
if len(nalus) == 0 {
return "none"
}
parts := make([]string, 0, len(nalus))
for _, nalu := range nalus {
if len(nalu) == 0 {
parts = append(parts, "len=0")
continue
}
max := 8
if len(nalu) < max {
max = len(nalu)
}
parts = append(parts, fmt.Sprintf("len=%d head=%x", len(nalu), nalu[:max]))
}
return strings.Join(parts, "; ")
}
func addAVCDescriptorFallback(trak *mp4ff.TrakBox, spsNALUs, ppsNALUs [][]byte, width, height uint16) error {
if trak == nil || trak.Mdia == nil || trak.Mdia.Minf == nil || trak.Mdia.Minf.Stbl == nil || trak.Mdia.Minf.Stbl.Stsd == nil {
return fmt.Errorf("missing trak stsd")
}
if len(spsNALUs) == 0 {
return fmt.Errorf("no SPS NALU available")
}
decConfRec, err := buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs)
if err != nil {
return err
}
if width == 0 && trak.Tkhd != nil {
width = uint16(uint32(trak.Tkhd.Width) >> 16)
}
if height == 0 && trak.Tkhd != nil {
height = uint16(uint32(trak.Tkhd.Height) >> 16)
}
if width > 0 && height > 0 && trak.Tkhd != nil {
trak.Tkhd.Width = mp4ff.Fixed32(uint32(width) << 16)
trak.Tkhd.Height = mp4ff.Fixed32(uint32(height) << 16)
}
avcC := &mp4ff.AvcCBox{DecConfRec: *decConfRec}
avcx := mp4ff.CreateVisualSampleEntryBox("avc1", width, height, avcC)
trak.Mdia.Minf.Stbl.Stsd.AddChild(avcx)
return nil
}
func buildAVCDecConfRecFromSPS(spsNALUs, ppsNALUs [][]byte) (*avc.DecConfRec, error) {
if len(spsNALUs) == 0 {
return nil, fmt.Errorf("no SPS NALU available")
}
sps := spsNALUs[0]
if len(sps) < 4 {
return nil, fmt.Errorf("SPS too short: len=%d", len(sps))
}
// SPS NALU: byte 0 is NAL header, next 3 bytes are profile/compat/level.
dec := &avc.DecConfRec{
AVCProfileIndication: sps[1],
ProfileCompatibility: sps[2],
AVCLevelIndication: sps[3],
SPSnalus: spsNALUs,
PPSnalus: ppsNALUs,
ChromaFormat: 1,
BitDepthLumaMinus1: 0,
BitDepthChromaMinus1: 0,
NumSPSExt: 0,
NoTrailingInfo: true,
}
return dec, nil
}
// splitNALUs splits Annex B data into raw NAL units without start codes.
func splitNALUs(data []byte) [][]byte {
var nalus [][]byte

View File

@@ -0,0 +1,176 @@
package video
import (
"fmt"
"os"
"testing"
mp4ff "github.com/Eyevinn/mp4ff/mp4"
"github.com/kerberos-io/agent/machinery/src/models"
)
// TestMP4Duration creates an MP4 file simulating a 5-second video recording
// and verifies that the durations in all boxes match the sum of sample durations.
func TestMP4Duration(t *testing.T) {
tmpFile := "/tmp/test_duration.mp4"
defer os.Remove(tmpFile)
// Minimal SPS for H.264 (baseline, 640x480) - proper Annex B format with start code
sps := []byte{0x67, 0x42, 0xc0, 0x1e, 0xd9, 0x00, 0xa0, 0x47, 0xfe, 0xc8}
pps := []byte{0x68, 0xce, 0x38, 0x80}
mp4Video := NewMP4(tmpFile, [][]byte{sps}, [][]byte{pps}, nil, 10)
mp4Video.SetWidth(640)
mp4Video.SetHeight(480)
videoTrack := mp4Video.AddVideoTrack("H264")
// Simulate 5 seconds at 25fps (200 frames, keyframe every 50 frames = 2s)
// PTS in milliseconds (timescale=1000)
frameDuration := uint64(40) // 40ms per frame = 25fps
numFrames := 150
gopSize := 50
// Create a fake Annex B NAL unit (keyframe IDR = type 5, non-keyframe = type 1)
makeFrame := func(isKey bool) []byte {
nalType := byte(0x01) // non-IDR slice
if isKey {
nalType = 0x65 // IDR slice
}
// Start code (4 bytes) + NAL header + some data
frame := []byte{0x00, 0x00, 0x00, 0x01, nalType}
// Add some padding data
for i := 0; i < 100; i++ {
frame = append(frame, byte(i))
}
return frame
}
var expectedDuration uint64
for i := 0; i < numFrames; i++ {
pts := uint64(i) * frameDuration
isKeyframe := i%gopSize == 0
err := mp4Video.AddSampleToTrack(videoTrack, isKeyframe, makeFrame(isKeyframe), pts)
if err != nil {
t.Fatalf("AddSampleToTrack failed at frame %d: %v", i, err)
}
}
expectedDuration = uint64(numFrames) * frameDuration // Should be 6000ms (150 * 40)
// Close with config that has signing key to avoid nil panics
config := &models.Config{
Signing: &models.Signing{
PrivateKey: "",
},
}
mp4Video.Close(config)
// Log what the code computed
t.Logf("VideoTotalDuration: %d ms", mp4Video.VideoTotalDuration)
t.Logf("Expected duration: %d ms", expectedDuration)
t.Logf("Segments: %d", len(mp4Video.SegmentDurations))
var sumSegDur uint64
for i, d := range mp4Video.SegmentDurations {
t.Logf(" Segment %d: duration=%d ms", i, d)
sumSegDur += d
}
t.Logf("Sum of segment durations: %d ms", sumSegDur)
// Now read back the file and inspect the boxes
f, err := os.Open(tmpFile)
if err != nil {
t.Fatalf("Failed to open output file: %v", err)
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
t.Fatalf("Failed to stat output file: %v", err)
}
parsedFile, err := mp4ff.DecodeFile(f)
if err != nil {
t.Fatalf("Failed to decode MP4: %v", err)
}
t.Logf("File size: %d bytes", fi.Size())
// Check moov box
if parsedFile.Moov == nil {
t.Fatal("No moov box found")
}
// Check mvhd duration
mvhd := parsedFile.Moov.Mvhd
t.Logf("mvhd.Duration: %d (timescale=%d) = %.2f seconds", mvhd.Duration, mvhd.Timescale, float64(mvhd.Duration)/float64(mvhd.Timescale))
t.Logf("mvhd.Rate: 0x%08x", mvhd.Rate)
t.Logf("mvhd.Volume: 0x%04x", mvhd.Volume)
// Check each trak
for i, trak := range parsedFile.Moov.Traks {
t.Logf("Track %d:", i)
t.Logf(" tkhd.Duration: %d", trak.Tkhd.Duration)
t.Logf(" mdhd.Duration: %d (timescale=%d) = %.2f seconds", trak.Mdia.Mdhd.Duration, trak.Mdia.Mdhd.Timescale, float64(trak.Mdia.Mdhd.Duration)/float64(trak.Mdia.Mdhd.Timescale))
}
// Check mvex/mehd
if parsedFile.Moov.Mvex != nil && parsedFile.Moov.Mvex.Mehd != nil {
t.Logf("mehd.FragmentDuration: %d", parsedFile.Moov.Mvex.Mehd.FragmentDuration)
}
// Sum up actual sample durations from trun boxes in all segments
var actualTrunDuration uint64
var sampleCount int
for _, seg := range parsedFile.Segments {
for _, frag := range seg.Fragments {
for _, traf := range frag.Moof.Trafs {
// Only count video track (track 1)
if traf.Tfhd.TrackID == 1 {
for _, trun := range traf.Truns {
for _, s := range trun.Samples {
actualTrunDuration += uint64(s.Dur)
sampleCount++
}
}
}
}
}
}
t.Logf("Actual trun sample count: %d", sampleCount)
t.Logf("Actual trun total duration: %d ms", actualTrunDuration)
// Check sidx
if parsedFile.Sidx != nil {
var sidxDuration uint64
for _, ref := range parsedFile.Sidx.SidxRefs {
sidxDuration += uint64(ref.SubSegmentDuration)
}
t.Logf("sidx total duration: %d ms", sidxDuration)
}
// VERIFY: All duration values should be consistent
// The expected duration for 150 frames at 40ms each:
// - The sample-buffering pattern means the LAST sample uses LastVideoSampleDTS as duration
// - So all 150 samples should produce 150 * 40ms = 6000ms total
// But due to the pending sample pattern, the actual trun durations might differ
fmt.Println()
fmt.Println("=== DURATION CONSISTENCY CHECK ===")
fmt.Printf("Expected (150 * 40ms): %d ms\n", expectedDuration)
fmt.Printf("mvhd.Duration: %d ms\n", mvhd.Duration)
fmt.Printf("tkhd.Duration: %d ms\n", parsedFile.Moov.Traks[0].Tkhd.Duration)
fmt.Printf("mdhd.Duration: %d ms\n", parsedFile.Moov.Traks[0].Mdia.Mdhd.Duration)
fmt.Printf("Actual trun durations sum: %d ms\n", actualTrunDuration)
fmt.Printf("VideoTotalDuration: %d ms\n", mp4Video.VideoTotalDuration)
fmt.Printf("Sum of SegmentDurations: %d ms\n", sumSegDur)
fmt.Println()
// The key assertion: header duration must equal trun sum
if mvhd.Duration != actualTrunDuration {
t.Errorf("MISMATCH: mvhd.Duration (%d) != actual trun sum (%d), diff = %d ms",
mvhd.Duration, actualTrunDuration, int64(mvhd.Duration)-int64(actualTrunDuration))
}
if parsedFile.Moov.Traks[0].Mdia.Mdhd.Duration != 0 {
t.Errorf("MISMATCH: mdhd.Duration should be 0 for fragmented MP4, got %d",
parsedFile.Moov.Traks[0].Mdia.Mdhd.Duration)
}
}

View File

@@ -0,0 +1,270 @@
// AAC to G.711 µ-law transcoder using FFmpeg (libavcodec + libswresample).
// Build with: go build -tags ffmpeg ...
//
// Requires: libavcodec-dev, libavutil-dev, libswresample-dev (FFmpeg ≥ 5.x)
// and an AAC decoder compiled into the FFmpeg build (usually the default).
//
//go:build ffmpeg
package webrtc
/*
#cgo pkg-config: libavcodec libavutil libswresample
#cgo CFLAGS: -Wno-deprecated-declarations
#include <libavcodec/avcodec.h>
#include <libavutil/channel_layout.h>
#include <libavutil/frame.h>
#include <libavutil/mem.h>
#include <libavutil/opt.h>
#include <libswresample/swresample.h>
#include <stdlib.h>
#include <string.h>
// ── Transcoder handle ───────────────────────────────────────────────────
typedef struct {
AVCodecContext *codec_ctx;
AVCodecParserContext *parser;
SwrContext *swr_ctx;
AVFrame *frame;
AVPacket *pkt;
int swr_initialized;
int in_sample_rate;
int in_channels;
} aac_transcoder_t;
// ── Create / Destroy ────────────────────────────────────────────────────
static aac_transcoder_t* aac_transcoder_create(void) {
const AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_AAC);
if (!codec) return NULL;
aac_transcoder_t *t = (aac_transcoder_t*)calloc(1, sizeof(aac_transcoder_t));
if (!t) return NULL;
t->codec_ctx = avcodec_alloc_context3(codec);
if (!t->codec_ctx) { free(t); return NULL; }
if (avcodec_open2(t->codec_ctx, codec, NULL) < 0) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->parser = av_parser_init(AV_CODEC_ID_AAC);
if (!t->parser) {
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
t->frame = av_frame_alloc();
t->pkt = av_packet_alloc();
if (!t->frame || !t->pkt) {
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
av_parser_close(t->parser);
avcodec_free_context(&t->codec_ctx);
free(t);
return NULL;
}
return t;
}
static void aac_transcoder_destroy(aac_transcoder_t *t) {
if (!t) return;
if (t->swr_ctx) swr_free(&t->swr_ctx);
if (t->frame) av_frame_free(&t->frame);
if (t->pkt) av_packet_free(&t->pkt);
if (t->parser) av_parser_close(t->parser);
if (t->codec_ctx) avcodec_free_context(&t->codec_ctx);
free(t);
}
// ── Lazy resampler init (called after the first decoded frame) ──────────
static int aac_init_swr(aac_transcoder_t *t) {
int64_t in_ch_layout = (int64_t)t->codec_ctx->channel_layout;
if (in_ch_layout == 0)
in_ch_layout = av_get_default_channel_layout(t->codec_ctx->channels);
t->swr_ctx = swr_alloc_set_opts(
NULL,
AV_CH_LAYOUT_MONO, // out: mono
AV_SAMPLE_FMT_S16, // out: signed 16-bit
8000, // out: 8 kHz
in_ch_layout, // in: from decoder
t->codec_ctx->sample_fmt, // in: from decoder
t->codec_ctx->sample_rate, // in: from decoder
0, NULL);
if (!t->swr_ctx) return -1;
if (swr_init(t->swr_ctx) < 0) {
swr_free(&t->swr_ctx);
return -1;
}
t->in_sample_rate = t->codec_ctx->sample_rate;
t->in_channels = t->codec_ctx->channels;
t->swr_initialized = 1;
return 0;
}
// ── Transcode ADTS → 8 kHz mono S16 PCM ────────────────────────────────
// Caller must free *out_pcm with av_free() when non-NULL.
static int aac_transcode_to_pcm(aac_transcoder_t *t,
const uint8_t *data, int data_size,
uint8_t **out_pcm, int *out_size) {
*out_pcm = NULL;
*out_size = 0;
if (!data || data_size <= 0) return 0;
int buf_cap = 8192;
uint8_t *buf = (uint8_t*)av_malloc(buf_cap);
if (!buf) return -1;
int buf_len = 0;
while (data_size > 0) {
uint8_t *pout = NULL;
int pout_size = 0;
int used = av_parser_parse2(t->parser, t->codec_ctx,
&pout, &pout_size,
data, data_size,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, 0);
if (used < 0) break;
data += used;
data_size -= used;
if (pout_size == 0) continue;
// Feed parsed frame to decoder
t->pkt->data = pout;
t->pkt->size = pout_size;
if (avcodec_send_packet(t->codec_ctx, t->pkt) < 0) continue;
// Pull all decoded frames
while (avcodec_receive_frame(t->codec_ctx, t->frame) == 0) {
if (!t->swr_initialized) {
if (aac_init_swr(t) < 0) {
av_frame_unref(t->frame);
av_free(buf);
return -1;
}
}
int out_samples = swr_get_out_samples(t->swr_ctx,
t->frame->nb_samples);
if (out_samples <= 0) out_samples = t->frame->nb_samples;
int needed = buf_len + out_samples * 2; // S16 = 2 bytes/sample
if (needed > buf_cap) {
buf_cap = needed * 2;
uint8_t *tmp = (uint8_t*)av_realloc(buf, buf_cap);
if (!tmp) { av_frame_unref(t->frame); av_free(buf); return -1; }
buf = tmp;
}
uint8_t *dst = buf + buf_len;
int converted = swr_convert(t->swr_ctx,
&dst, out_samples,
(const uint8_t**)t->frame->extended_data,
t->frame->nb_samples);
if (converted > 0)
buf_len += converted * 2;
av_frame_unref(t->frame);
}
}
if (buf_len == 0) {
av_free(buf);
return 0;
}
*out_pcm = buf;
*out_size = buf_len;
return 0;
}
*/
import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/zaf/g711"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is compiled in (requires the "ffmpeg" build tag).
func AACTranscodingAvailable() bool { return true }
// AACTranscoder decodes ADTS-wrapped AAC audio to 8 kHz mono PCM
// and encodes it as G.711 µ-law for WebRTC transport.
type AACTranscoder struct {
handle *C.aac_transcoder_t
}
// NewAACTranscoder creates a transcoder backed by FFmpeg's AAC decoder.
func NewAACTranscoder() (*AACTranscoder, error) {
h := C.aac_transcoder_create()
if h == nil {
return nil, errors.New("failed to create AAC transcoder (FFmpeg AAC decoder not available?)")
}
log.Log.Info("webrtc.aac_transcoder: AAC → G.711 µ-law transcoder initialised (FFmpeg)")
return &AACTranscoder{handle: h}, nil
}
// Transcode converts an ADTS buffer (one or more AAC frames) into
// G.711 µ-law encoded audio suitable for a PCMU WebRTC track.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || t.handle == nil || len(adtsData) == 0 {
return nil, nil
}
var outPCM *C.uint8_t
var outSize C.int
ret := C.aac_transcode_to_pcm(
t.handle,
(*C.uint8_t)(unsafe.Pointer(&adtsData[0])),
C.int(len(adtsData)),
&outPCM, &outSize,
)
if ret < 0 {
return nil, errors.New("AAC decode/resample failed")
}
if outSize == 0 || outPCM == nil {
return nil, nil // decoder buffering, no output yet
}
defer C.av_free(unsafe.Pointer(outPCM))
// Copy S16LE PCM to Go slice, then encode to µ-law.
pcm := C.GoBytes(unsafe.Pointer(outPCM), outSize)
ulaw := g711.EncodeUlaw(pcm)
// Log resampler details once.
if t.handle.swr_initialized == 1 && t.handle.in_sample_rate != 0 {
log.Log.Info(fmt.Sprintf(
"webrtc.aac_transcoder: first output resampling %d Hz / %d ch → 8000 Hz mono → µ-law",
int(t.handle.in_sample_rate), int(t.handle.in_channels)))
// Prevent repeated logging by zeroing the field we check.
t.handle.in_sample_rate = 0
}
return ulaw, nil
}
// Close releases all FFmpeg resources held by the transcoder.
func (t *AACTranscoder) Close() {
if t != nil && t.handle != nil {
C.aac_transcoder_destroy(t.handle)
t.handle = nil
log.Log.Info("webrtc.aac_transcoder: transcoder closed")
}
}

View File

@@ -0,0 +1,205 @@
// AAC transcoding fallback that uses the ffmpeg binary at runtime.
// Build with -tags ffmpeg to use the in-process CGO implementation instead.
//
//go:build !ffmpeg
package webrtc
import (
"bytes"
"errors"
"io"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/kerberos-io/agent/machinery/src/log"
)
// AACTranscodingAvailable reports whether AAC→PCMU transcoding
// is available in the current runtime.
func AACTranscodingAvailable() bool {
_, err := exec.LookPath("ffmpeg")
return err == nil
}
// AACTranscoder uses an ffmpeg subprocess to convert ADTS AAC to raw PCMU.
type AACTranscoder struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
stderrBuf bytes.Buffer
mu sync.Mutex
outMu sync.Mutex
outBuf bytes.Buffer
closed bool
closeOnce sync.Once
}
// NewAACTranscoder creates a runtime ffmpeg-based transcoder.
func NewAACTranscoder() (*AACTranscoder, error) {
ffmpegPath, err := exec.LookPath("ffmpeg")
if err != nil {
return nil, errors.New("AAC transcoding not available: ffmpeg binary not found in PATH")
}
log.Log.Info("webrtc.aac_transcoder: using ffmpeg binary at " + ffmpegPath)
cmd := exec.Command(
ffmpegPath,
"-hide_banner",
"-loglevel", "error",
"-fflags", "+nobuffer",
"-flags", "low_delay",
"-f", "aac",
"-i", "pipe:0",
"-vn",
"-ac", "1",
"-ar", "8000",
"-acodec", "pcm_mulaw",
"-f", "mulaw",
"pipe:1",
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stderr = &bytes.Buffer{}
if err := cmd.Start(); err != nil {
return nil, err
}
t := &AACTranscoder{
cmd: cmd,
stdin: stdin,
stdout: stdout,
}
if stderrBuf, ok := cmd.Stderr.(*bytes.Buffer); ok {
t.stderrBuf = *stderrBuf
}
go func() {
buf := make([]byte, 4096)
for {
n, readErr := stdout.Read(buf)
if n > 0 {
t.outMu.Lock()
_, _ = t.outBuf.Write(buf[:n])
buffered := t.outBuf.Len()
t.outMu.Unlock()
if buffered <= 8192 || buffered%16000 == 0 {
log.Log.Info("webrtc.aac_transcoder: ffmpeg produced PCMU bytes, buffered=" + strconv.Itoa(buffered))
}
}
if readErr != nil {
if readErr != io.EOF {
log.Log.Warning("webrtc.aac_transcoder: stdout reader stopped: " + readErr.Error())
}
return
}
}
}()
log.Log.Info("webrtc.aac_transcoder: AAC → PCMU transcoder initialised (ffmpeg process)")
return t, nil
}
// Transcode writes ADTS AAC to ffmpeg and returns any PCMU bytes produced.
func (t *AACTranscoder) Transcode(adtsData []byte) ([]byte, error) {
if t == nil || len(adtsData) == 0 {
return nil, nil
}
t.mu.Lock()
defer t.mu.Unlock()
if t.closed {
return nil, errors.New("AAC transcoder is closed")
}
if _, err := t.stdin.Write(adtsData); err != nil {
return nil, err
}
if len(adtsData) <= 512 || len(adtsData)%1024 == 0 {
log.Log.Info("webrtc.aac_transcoder: wrote AAC bytes to ffmpeg, input=" + strconv.Itoa(len(adtsData)))
}
deadline := time.Now().Add(75 * time.Millisecond)
for {
data := t.readAvailable()
if len(data) > 0 {
log.Log.Info("webrtc.aac_transcoder: returning PCMU bytes=" + strconv.Itoa(len(data)))
return data, nil
}
if time.Now().After(deadline) {
if stderr := t.stderrString(); stderr != "" {
log.Log.Warning("webrtc.aac_transcoder: no output before deadline, ffmpeg stderr: " + stderr)
} else {
log.Log.Info("webrtc.aac_transcoder: no PCMU output before deadline")
}
return nil, nil
}
time.Sleep(5 * time.Millisecond)
}
}
func (t *AACTranscoder) readAvailable() []byte {
t.outMu.Lock()
defer t.outMu.Unlock()
if t.outBuf.Len() == 0 {
return nil
}
out := make([]byte, t.outBuf.Len())
copy(out, t.outBuf.Bytes())
t.outBuf.Reset()
return out
}
func (t *AACTranscoder) stderrString() string {
if t == nil {
return ""
}
if stderrBuf, ok := t.cmd.Stderr.(*bytes.Buffer); ok {
return strings.TrimSpace(stderrBuf.String())
}
return strings.TrimSpace(t.stderrBuf.String())
}
// Close stops the ffmpeg subprocess.
func (t *AACTranscoder) Close() {
if t == nil {
return
}
t.closeOnce.Do(func() {
t.mu.Lock()
t.closed = true
if t.stdin != nil {
_ = t.stdin.Close()
}
t.mu.Unlock()
if t.stdout != nil {
_ = t.stdout.Close()
}
if t.cmd != nil {
_ = t.cmd.Process.Kill()
_, _ = t.cmd.Process.Wait()
if stderr := t.stderrString(); stderr != "" {
log.Log.Info("webrtc.aac_transcoder: ffmpeg stderr on close: " + stderr)
}
}
})
}

View File

@@ -0,0 +1,137 @@
package webrtc
import (
"io"
"sync"
"github.com/kerberos-io/agent/machinery/src/log"
pionWebRTC "github.com/pion/webrtc/v4"
pionMedia "github.com/pion/webrtc/v4/pkg/media"
)
const (
// peerSampleBuffer controls how many samples can be buffered per peer before
// dropping. Keeps slow peers from blocking the broadcaster.
peerSampleBuffer = 60
)
// peerTrack is a per-peer track with its own non-blocking sample channel.
type peerTrack struct {
track *pionWebRTC.TrackLocalStaticSample
samples chan pionMedia.Sample
done chan struct{}
}
// TrackBroadcaster fans out media samples to multiple peer-specific tracks
// without blocking. Each peer gets its own TrackLocalStaticSample and a
// goroutine that drains samples independently, so a slow/congested peer
// cannot stall the others.
type TrackBroadcaster struct {
mu sync.RWMutex
peers map[string]*peerTrack
mimeType string
id string
streamID string
}
// NewTrackBroadcaster creates a new broadcaster for either video or audio.
func NewTrackBroadcaster(mimeType string, id string, streamID string) *TrackBroadcaster {
return &TrackBroadcaster{
peers: make(map[string]*peerTrack),
mimeType: mimeType,
id: id,
streamID: streamID,
}
}
// AddPeer creates a new per-peer track and starts a writer goroutine.
// Returns the track to be added to the PeerConnection via AddTrack().
func (b *TrackBroadcaster) AddPeer(sessionKey string) (*pionWebRTC.TrackLocalStaticSample, error) {
track, err := pionWebRTC.NewTrackLocalStaticSample(
pionWebRTC.RTPCodecCapability{MimeType: b.mimeType},
b.id,
b.streamID,
)
if err != nil {
return nil, err
}
pt := &peerTrack{
track: track,
samples: make(chan pionMedia.Sample, peerSampleBuffer),
done: make(chan struct{}),
}
b.mu.Lock()
b.peers[sessionKey] = pt
b.mu.Unlock()
// Per-peer writer goroutine — drains samples independently.
go func() {
defer close(pt.done)
for sample := range pt.samples {
if err := pt.track.WriteSample(sample); err != nil {
if err == io.ErrClosedPipe {
return
}
log.Log.Error("webrtc.broadcaster.peerWriter(): error writing sample for " + sessionKey + ": " + err.Error())
}
}
}()
log.Log.Info("webrtc.broadcaster.AddPeer(): added peer track for " + sessionKey)
return track, nil
}
// RemovePeer stops the writer goroutine and removes the peer.
func (b *TrackBroadcaster) RemovePeer(sessionKey string) {
b.mu.Lock()
pt, exists := b.peers[sessionKey]
if exists {
delete(b.peers, sessionKey)
}
b.mu.Unlock()
if exists {
close(pt.samples)
<-pt.done // wait for writer goroutine to finish
log.Log.Info("webrtc.broadcaster.RemovePeer(): removed peer track for " + sessionKey)
}
}
// WriteSample fans out a sample to all connected peers without blocking.
// If a peer's buffer is full (slow consumer), the sample is dropped for
// that peer only — other peers are unaffected.
func (b *TrackBroadcaster) WriteSample(sample pionMedia.Sample) {
b.mu.RLock()
defer b.mu.RUnlock()
for sessionKey, pt := range b.peers {
select {
case pt.samples <- sample:
default:
log.Log.Warning("webrtc.broadcaster.WriteSample(): dropping sample for slow peer " + sessionKey)
}
}
}
// PeerCount returns the current number of connected peers.
func (b *TrackBroadcaster) PeerCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.peers)
}
// Close removes all peers and stops all writer goroutines.
func (b *TrackBroadcaster) Close() {
b.mu.Lock()
keys := make([]string, 0, len(b.peers))
for k := range b.peers {
keys = append(keys, k)
}
b.mu.Unlock()
for _, key := range keys {
b.RemovePeer(key)
}
}

View File

@@ -4,13 +4,14 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
//"github.com/izern/go-fdkaac/fdkaac"
"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
@@ -29,8 +30,10 @@ const (
rtcpBufferSize = 1500
// Timeouts and intervals
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
keepAliveTimeout = 15 * time.Second
defaultTimeout = 10 * time.Second
maxLivePacketAge = 1500 * time.Millisecond
disconnectGracePeriod = 5 * time.Second
// Track identifiers
trackStreamID = "kerberos-stream"
@@ -46,10 +49,16 @@ type ConnectionManager struct {
// peerConnectionWrapper wraps a peer connection with additional metadata
type peerConnectionWrapper struct {
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
conn *pionWebRTC.PeerConnection
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
connected atomic.Bool
disconnectMu sync.Mutex
disconnectTimer *time.Timer
sessionKey string
videoBroadcaster *TrackBroadcaster
audioBroadcaster *TrackBroadcaster
}
var globalConnectionManager = NewConnectionManager()
@@ -88,22 +97,41 @@ func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
}
// AddPeerConnection adds a peer connection to the manager
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
func (cm *ConnectionManager) AddPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.peerConnections[sessionID] = wrapper
cm.peerConnections[sessionKey] = wrapper
}
// RemovePeerConnection removes a peer connection from the manager
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
func (cm *ConnectionManager) RemovePeerConnection(sessionKey string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if wrapper, exists := cm.peerConnections[sessionID]; exists {
if wrapper, exists := cm.peerConnections[sessionKey]; exists {
if wrapper.cancelCtx != nil {
wrapper.cancelCtx()
}
delete(cm.peerConnections, sessionID)
delete(cm.peerConnections, sessionKey)
}
}
// QueueCandidate safely queues a candidate for a session without racing with channel closure.
func (cm *ConnectionManager) QueueCandidate(sessionKey string, candidate string) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
ch, exists := cm.candidateChannels[sessionKey]
if !exists {
ch = make(chan string, candidateChannelBuffer)
cm.candidateChannels[sessionKey] = ch
}
select {
case ch <- candidate:
return true
default:
return false
}
}
@@ -122,6 +150,35 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, -1)
}
func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
wrapper.closeOnce.Do(func() {
if wrapper.connected.Swap(false) {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
}
// Remove per-peer tracks from broadcasters so the fan-out stops
// writing to this peer immediately.
if wrapper.videoBroadcaster != nil {
wrapper.videoBroadcaster.RemovePeer(sessionKey)
}
if wrapper.audioBroadcaster != nil {
wrapper.audioBroadcaster.RemovePeer(sessionKey)
}
globalConnectionManager.CloseCandidateChannel(sessionKey)
if wrapper.conn != nil {
if err := wrapper.conn.Close(); err != nil {
log.Log.Error("webrtc.main.cleanupPeerConnection(): error closing peer connection: " + err.Error())
}
}
globalConnectionManager.RemovePeerConnection(sessionKey)
close(wrapper.done)
})
}
type WebRTC struct {
Name string
StunServers []string
@@ -161,16 +218,27 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
}
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
select {
case ch <- candidate.Candidate:
default:
if !globalConnectionManager.QueueCandidate(key, candidate.Candidate) {
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
}
}
func decodeICECandidate(candidate string) (pionWebRTC.ICECandidateInit, error) {
if candidate == "" {
return pionWebRTC.ICECandidateInit{}, io.EOF
}
var candidateInit pionWebRTC.ICECandidateInit
if err := json.Unmarshal([]byte(candidate), &candidateInit); err == nil {
if candidateInit.Candidate != "" {
return candidateInit, nil
}
}
return pionWebRTC.ICECandidateInit{Candidate: candidate}, nil
}
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
if err := pionWebRTC.ConfigureNack(mediaEngine, interceptorRegistry); err != nil {
return err
@@ -184,7 +252,7 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
config := configuration.Config
deviceKey := config.Key
@@ -264,16 +332,27 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Create context for this connection
ctx, cancel := context.WithCancel(context.Background())
wrapper := &peerConnectionWrapper{
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
conn: peerConnection,
cancelCtx: cancel,
done: make(chan struct{}),
sessionKey: sessionKey,
videoBroadcaster: videoBroadcaster,
audioBroadcaster: audioBroadcaster,
}
// Create a per-peer video track from the broadcaster so writes
// to this peer are independent and non-blocking.
var videoSender *pionWebRTC.RTPSender = nil
if videoTrack != nil {
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
if videoBroadcaster != nil {
peerVideoTrack, trackErr := videoBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer video track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if videoSender, err = peerConnection.AddTrack(peerVideoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
cancel()
cleanupPeerConnection(sessionKey, wrapper)
return
}
} else {
@@ -302,11 +381,18 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Create a per-peer audio track from the broadcaster.
var audioSender *pionWebRTC.RTPSender = nil
if audioTrack != nil {
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
if audioBroadcaster != nil {
peerAudioTrack, trackErr := audioBroadcaster.AddPeer(sessionKey)
if trackErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error creating per-peer audio track: " + trackErr.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
if audioSender, err = peerConnection.AddTrack(peerAudioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
cancel()
cleanupPeerConnection(sessionKey, wrapper)
return
}
} else {
@@ -335,32 +421,65 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}()
}
// Log ICE connection state changes for diagnostics
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
" (session: " + handshake.SessionID + ")")
})
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
" (session: " + handshake.SessionID + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
wrapper.closeOnce.Do(func() {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
// Clean up resources
globalConnectionManager.CloseCandidateChannel(sessionKey)
if err := peerConnection.Close(); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
}
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
close(wrapper.done)
})
case pionWebRTC.PeerConnectionStateConnected:
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
case pionWebRTC.PeerConnectionStateDisconnected:
// Disconnected is a transient state that can recover.
// Start a grace period timer; if we don't recover, then cleanup.
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
cleanupPeerConnection(sessionKey, wrapper)
})
}
wrapper.disconnectMu.Unlock()
case pionWebRTC.PeerConnectionStateFailed:
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateClosed:
// Stop any pending disconnect timer
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
}
wrapper.disconnectMu.Unlock()
cleanupPeerConnection(sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateConnected:
// Cancel any pending disconnect timer — connection recovered
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
}
wrapper.disconnectMu.Unlock()
if wrapper.connected.CompareAndSwap(false, true) {
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
}
}
})
@@ -379,27 +498,21 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
}
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate
// The other peer will add this candidate by calling AddICECandidate.
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
var hasRelayCandidates bool
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
@@ -444,8 +557,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
candateBinary, err := json.Marshal(candidateJSON)
if err == nil {
valueMap["candidate"] = string(candateBinary)
// SDP is not needed to be send..
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
} else {
@@ -469,38 +580,95 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
})
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
cleanupPeerConnection(sessionKey, wrapper)
return
}
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
}
} else {
log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error())
globalConnectionManager.CloseCandidateChannel(sessionKey)
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to decode remote session description: " + err.Error())
}
}
func NewVideoBroadcaster(streams []packets.Stream) *TrackBroadcaster {
// Verify H264 is available (same check as NewVideoTrack)
for _, s := range streams {
if s.Name == "H264" {
return NewTrackBroadcaster(pionWebRTC.MimeTypeH264, "video", trackStreamID)
}
}
log.Log.Error("webrtc.main.NewVideoBroadcaster(): no H264 stream found")
return nil
}
func NewAudioBroadcaster(streams []packets.Stream) *TrackBroadcaster {
var audioCodecNames []string
hasAAC := false
for _, s := range streams {
if s.IsAudio {
audioCodecNames = append(audioCodecNames, s.Name)
}
switch s.Name {
case "OPUS":
return NewTrackBroadcaster(pionWebRTC.MimeTypeOpus, "audio", trackStreamID)
case "PCM_MULAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
case "PCM_ALAW":
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMA, "audio", trackStreamID)
case "AAC":
hasAAC = true
}
}
if hasAAC {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): AAC detected, creating PCMU audio track for transcoded output")
return NewTrackBroadcaster(pionWebRTC.MimeTypePCMU, "audio", trackStreamID)
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioBroadcaster(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
} else {
log.Log.Info("webrtc.main.NewAudioBroadcaster(): no audio stream found in camera feed")
}
return nil
}
func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
mimeType := pionWebRTC.MimeTypeH264
outboundVideoTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "video", trackStreamID)
@@ -513,18 +681,33 @@ func NewVideoTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample
func NewAudioTrack(streams []packets.Stream) *pionWebRTC.TrackLocalStaticSample {
var mimeType string
var audioCodecNames []string
hasAAC := false
for _, stream := range streams {
if stream.IsAudio {
audioCodecNames = append(audioCodecNames, stream.Name)
}
if stream.Name == "OPUS" {
mimeType = pionWebRTC.MimeTypeOpus
} else if stream.Name == "PCM_MULAW" {
mimeType = pionWebRTC.MimeTypePCMU
} else if stream.Name == "PCM_ALAW" {
mimeType = pionWebRTC.MimeTypePCMA
} else if stream.Name == "AAC" {
hasAAC = true
}
}
if mimeType == "" {
log.Log.Error("webrtc.main.NewAudioTrack(): no supported audio codec found")
return nil
if hasAAC {
mimeType = pionWebRTC.MimeTypePCMU
log.Log.Info("webrtc.main.NewAudioTrack(): AAC detected, creating PCMU audio track for transcoded output")
} else if len(audioCodecNames) > 0 {
log.Log.Error(fmt.Sprintf("webrtc.main.NewAudioTrack(): no supported audio codec found (detected: %s; supported: OPUS, PCM_MULAW, PCM_ALAW)", strings.Join(audioCodecNames, ", ")))
return nil
} else {
log.Log.Info("webrtc.main.NewAudioTrack(): no audio stream found in camera feed")
return nil
}
}
outboundAudioTrack, err := pionWebRTC.NewTrackLocalStaticSample(pionWebRTC.RTPCodecCapability{MimeType: mimeType}, "audio", trackStreamID)
if err != nil {
@@ -539,9 +722,15 @@ type streamState struct {
lastKeepAlive int64
peerCount int64
start bool
catchingUp bool
receivedKeyFrame bool
lastAudioSample *pionMedia.Sample
lastVideoSample *pionMedia.Sample
audioPacketsSeen int64
aacPacketsSeen int64
audioSamplesSent int64
aacNoOutput int64
aacErrors int64
}
// codecSupport tracks which codecs are available in the stream
@@ -613,23 +802,54 @@ func updateStreamState(communication *models.Communication, state *streamState)
}
// writeFinalSamples writes any remaining buffered samples
func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.TrackLocalStaticSample) {
if state.lastVideoSample != nil && videoTrack != nil {
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final video sample: " + err.Error())
}
func writeFinalSamples(state *streamState, videoBroadcaster, audioBroadcaster *TrackBroadcaster) {
if state.lastVideoSample != nil && videoBroadcaster != nil {
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
if state.lastAudioSample != nil && audioTrack != nil {
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.writeFinalSamples(): error writing final audio sample: " + err.Error())
}
if state.lastAudioSample != nil && audioBroadcaster != nil {
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
}
// processVideoPacket processes a video packet and writes samples to the track
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
if videoTrack == nil {
func sampleTimestamp(pkt packets.Packet) uint32 {
if pkt.TimeLegacy > 0 {
return uint32(pkt.TimeLegacy.Milliseconds())
}
if pkt.Time > 0 {
return uint32(pkt.Time)
}
return 0
}
func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback time.Duration) time.Duration {
if current.TimeLegacy > 0 {
currentDurationMs := current.TimeLegacy.Milliseconds()
previousDurationMs := int64(previousTimestamp)
if currentDurationMs > previousDurationMs {
duration := time.Duration(currentDurationMs-previousDurationMs) * time.Millisecond
if duration > 0 {
return duration
}
}
}
currentTimestamp := sampleTimestamp(current)
if currentTimestamp > previousTimestamp {
duration := time.Duration(currentTimestamp-previousTimestamp) * time.Millisecond
if duration > 0 {
return duration
}
}
return fallback
}
// processVideoPacket processes a video packet and writes samples to the broadcaster
func processVideoPacket(pkt packets.Packet, state *streamState, videoBroadcaster *TrackBroadcaster, config models.Config) {
if videoBroadcaster == nil {
return
}
@@ -642,7 +862,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
return
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
if config.Capture.ForwardWebRTC == "true" {
// Remote forwarding not yet implemented
@@ -651,50 +871,83 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
}
if state.lastVideoSample != nil {
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
}
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
videoBroadcaster.WriteSample(*state.lastVideoSample)
}
state.lastVideoSample = &sample
}
// processAudioPacket processes an audio packet and writes samples to the track
func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pionWebRTC.TrackLocalStaticSample, hasAAC bool) {
if audioTrack == nil {
// processAudioPacket processes an audio packet and writes samples to the broadcaster.
// When the packet carries AAC and a transcoder is provided, the audio is transcoded
// to G.711 µ-law on the fly so it can be sent over a PCMU WebRTC track.
func processAudioPacket(pkt packets.Packet, state *streamState, audioBroadcaster *TrackBroadcaster, transcoder *AACTranscoder) {
if audioBroadcaster == nil {
return
}
if hasAAC {
// AAC transcoding not yet implemented
// TODO: Implement AAC to PCM_MULAW transcoding
return
state.audioPacketsSeen++
audioData := pkt.Data
if pkt.Codec == "AAC" {
state.aacPacketsSeen++
if transcoder == nil {
state.aacErrors++
if state.aacErrors <= 3 || state.aacErrors%100 == 0 {
log.Log.Warning(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet dropped because transcoder is nil (aac_packets=%d, input_bytes=%d)", state.aacPacketsSeen, len(pkt.Data)))
}
return // no transcoder silently drop
}
pcmu, err := transcoder.Transcode(pkt.Data)
if err != nil {
state.aacErrors++
log.Log.Error("webrtc.main.processAudioPacket(): AAC transcode error: " + err.Error())
return
}
if len(pcmu) == 0 {
state.aacNoOutput++
if state.aacNoOutput <= 5 || state.aacNoOutput%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC packet produced no PCMU output yet (aac_packets=%d, no_output=%d, input_bytes=%d)", state.aacPacketsSeen, state.aacNoOutput, len(pkt.Data)))
}
return // decoder still buffering
}
if state.aacPacketsSeen <= 5 || state.aacPacketsSeen%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): AAC transcoded to PCMU (aac_packets=%d, input_bytes=%d, output_bytes=%d, peers=%d)", state.aacPacketsSeen, len(pkt.Data), len(pcmu), audioBroadcaster.PeerCount()))
}
audioData = pcmu
}
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
sample := pionMedia.Sample{Data: audioData, PacketTimestamp: sampleTimestamp(pkt)}
if state.lastAudioSample != nil {
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
state.audioSamplesSent++
if state.audioSamplesSent <= 5 || state.audioSamplesSent%100 == 0 {
log.Log.Info(fmt.Sprintf("webrtc.main.processAudioPacket(): queueing audio sample (samples=%d, codec=%s, bytes=%d, duration_ms=%d, peers=%d)", state.audioSamplesSent, pkt.Codec, len(state.lastAudioSample.Data), state.lastAudioSample.Duration.Milliseconds(), audioBroadcaster.PeerCount()))
}
audioBroadcaster.WriteSample(*state.lastAudioSample)
}
state.lastAudioSample = &sample
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
func shouldDropPacketForLatency(pkt packets.Packet) bool {
if pkt.CurrentTime == 0 {
return false
}
age := time.Since(time.UnixMilli(pkt.CurrentTime))
return age > maxLivePacketAge
}
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, rtspClient capture.RTSPClient) {
config := configuration.Config
// Check if at least one track is available
if videoTrack == nil && audioTrack == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio tracks are nil, cannot proceed")
// Check if at least one broadcaster is available
if videoBroadcaster == nil && audioBroadcaster == nil {
log.Log.Error("webrtc.main.WriteToTrack(): both video and audio broadcasters are nil, cannot proceed")
return
}
@@ -706,8 +959,22 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
return
}
// Create AAC transcoder if needed (AAC → G.711 µ-law).
var aacTranscoder *AACTranscoder
if codecs.hasAAC && audioBroadcaster != nil {
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): AAC audio detected, creating transcoder (audio_peers=%d)", audioBroadcaster.PeerCount()))
t, err := NewAACTranscoder()
if err != nil {
log.Log.Error("webrtc.main.WriteToTrack(): failed to create AAC transcoder: " + err.Error())
} else {
aacTranscoder = t
log.Log.Info("webrtc.main.WriteToTrack(): AAC transcoder created successfully")
defer aacTranscoder.Close()
}
}
if config.Capture.TranscodingWebRTC == "true" {
log.Log.Info("webrtc.main.WriteToTrack(): transcoding enabled but not yet implemented")
log.Log.Info("webrtc.main.WriteToTrack(): transcoding config enabled")
}
// Initialize streaming state
@@ -717,7 +984,13 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
}
defer func() {
writeFinalSamples(state, videoTrack, audioTrack)
log.Log.Info(fmt.Sprintf("webrtc.main.WriteToTrack(): audio summary packets=%d aac_packets=%d sent=%d aac_no_output=%d aac_errors=%d peers=%d", state.audioPacketsSeen, state.aacPacketsSeen, state.audioSamplesSent, state.aacNoOutput, state.aacErrors, func() int {
if audioBroadcaster == nil {
return 0
}
return audioBroadcaster.PeerCount()
}()))
writeFinalSamples(state, videoBroadcaster, audioBroadcaster)
log.Log.Info("webrtc.main.WriteToTrack(): stopped writing to track")
}()
@@ -747,6 +1020,31 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
continue
}
// Keep live WebRTC close to realtime.
// If audio+video load makes this consumer fall behind, skip old packets and
// wait for a recent keyframe before resuming video.
if shouldDropPacketForLatency(pkt) {
if !state.catchingUp {
log.Log.Warning("webrtc.main.WriteToTrack(): stream is lagging behind, dropping old packets until the next recent keyframe")
}
state.catchingUp = true
state.start = false
state.receivedKeyFrame = false
state.lastAudioSample = nil
state.lastVideoSample = nil
continue
}
if state.catchingUp {
if !(pkt.IsVideo && pkt.IsKeyFrame) {
continue
}
state.catchingUp = false
state.start = false
state.receivedKeyFrame = false
log.Log.Info("webrtc.main.WriteToTrack(): caught up with live stream at a recent keyframe")
}
// Wait for first keyframe before processing
if !state.receivedKeyFrame {
if pkt.IsKeyFrame {
@@ -758,9 +1056,9 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
// Process video or audio packets
if pkt.IsVideo {
processVideoPacket(pkt, state, videoTrack, config)
processVideoPacket(pkt, state, videoBroadcaster, config)
} else if pkt.IsAudio {
processAudioPacket(pkt, state, audioTrack, codecs.hasAAC)
processAudioPacket(pkt, state, audioBroadcaster, aacTranscoder)
}
}
}

View File

@@ -7,6 +7,7 @@ import './ImageCanvas.css';
class ImageCanvas extends React.Component {
componentDidMount() {
this.isUnmounted = false;
this.width = 0;
this.height = 0;
@@ -58,6 +59,9 @@ class ImageCanvas extends React.Component {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -71,6 +75,9 @@ class ImageCanvas extends React.Component {
componentDidUpdate() {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -82,11 +89,57 @@ class ImageCanvas extends React.Component {
});
}
componentWillUnmount() {
this.isUnmounted = true;
if (this.pendingImage) {
this.pendingImage.onload = null;
this.pendingImage.src = '';
this.pendingImage = null;
}
if (this.editor) {
this.editor.onSelectionEnd = null;
this.editor.onRegionMoveEnd = null;
this.editor.onRegionDelete = null;
if (this.editor.RM) {
this.editor.RM.deleteAllRegions();
}
if (typeof this.editor.dispose === 'function') {
this.editor.dispose();
} else if (typeof this.editor.destroy === 'function') {
this.editor.destroy();
}
this.editor = null;
}
if (this.toolbarContainer) {
this.toolbarContainer.innerHTML = '';
this.toolbarContainer = null;
}
if (this.editorContainer) {
this.editorContainer.innerHTML = '';
this.editorContainer = null;
}
}
loadData = (image) => {
if (!this.editor) {
return;
}
const w = image.width;
const h = image.height;
this.editor.addContentSource(image).then(() => {
if (this.isUnmounted || !this.editor) {
return;
}
// Add exisiting polygons
this.editor.RM.deleteAllRegions();
const { polygons } = this.props;
@@ -152,11 +205,19 @@ class ImageCanvas extends React.Component {
// eslint-disable-next-line class-methods-use-this
loadImage = (path, onready) => {
if (this.pendingImage) {
this.pendingImage.onload = null;
}
const image = new Image();
image.src = path;
image.addEventListener('load', (e) => {
this.pendingImage = image;
image.onload = (e) => {
if (this.pendingImage === image) {
this.pendingImage = null;
}
onready(e.target);
});
};
image.src = path;
};
// eslint-disable-next-line class-methods-use-this

View File

@@ -38,16 +38,14 @@ class Dashboard extends React.Component {
initialised: false,
};
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].addEventListener('load', () => {
this.setState({
liveviewLoaded: true,
});
});
[this.liveviewElement] = liveview;
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
}
this.initialiseLiveview();
}
@@ -57,13 +55,14 @@ class Dashboard extends React.Component {
}
componentWillUnmount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].remove();
if (this.liveviewElement) {
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
this.liveviewElement = null;
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {
@@ -72,6 +71,12 @@ class Dashboard extends React.Component {
dispatchSend(message);
}
handleLiveviewLoad() {
this.setState({
liveviewLoaded: true,
});
}
handleClose() {
this.setState({
open: false,

View File

@@ -159,7 +159,10 @@ class Settings extends React.Component {
componentWillUnmount() {
document.removeEventListener('keydown', this.escFunction, false);
clearInterval(this.interval);
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {